import sys
import ipaddress
import base64
+import gzip
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
from cryptography.hazmat.primitives import serialization
-_version = '1.0'
+_version = '1.1'
def generate_publickey(privatekey):
self._wg_relay_ipv6 = args.wg_relay_ipv6
self._wg_dns = args.wg_dns
self._wg_hijack_dns = args.wg_hijack_dns
+ self._wg_multihop_server = args.wg_multihop_server
self._webtoken = None
+ self._filter = args.filter
self._config = configparser.ConfigParser()
self._settings_file = pathlib.Path(self._settings_file).expanduser()
def run(self):
+ multihop_server = False
+ if self._wg_multihop_server:
+ multihop_servers = self.filter(self.get_multihop_info(), self._wg_multihop_server)
+ if len(multihop_servers) == 1:
+ multihop_server = multihop_servers[0]
+ elif len(multihop_servers) >= 1:
+ print('Select one of the following multihop servers:')
+ for server in multihop_servers:
+ print(f'{server["hostname"]}')
+ sys.exit(1)
+ else:
+ print(f'No multihop-server matching hostname: {self._wg_multihop_server}')
+ sys.exit(1)
+
if self._settings_file.is_file():
privatekey = self.get_privatekey()
else:
publickey = generate_publickey(privatekey)
device = self.get_device(publickey) or self.create_device(publickey)
if device:
- self.create_wg_configs(device, privatekey)
+ self.create_wg_configs(device, privatekey, multihop_server)
def get_privatekey(self):
print(f'Reading settings from: {self._settings_file}')
webtoken = self.get_webtoken()
req = urllib.request.Request(url)
req.add_header('Authorization', f'Bearer {webtoken}')
+ req.add_header('Accept-Encoding', 'gzip')
+
if body:
req.add_header('Content-Type', 'application/json')
response = urllib.request.urlopen(req, json.dumps(body).encode())
else:
response = urllib.request.urlopen(req)
- data = json.loads(response.read())
+
+ return self.get_response(response)
+
+ def get_response(self, response):
+ if response.headers.get('Content-Encoding') == 'gzip':
+ data = json.loads(gzip.decompress(response.read()))
+ else:
+ data = json.loads(response.read())
return data
def get_device(self, publickey):
body['hijack_dns'] = self._wg_hijack_dns
try:
response = self.api('https://api.mullvad.net/accounts/v1/devices', body)
+ print(f'Device created: ({response['name']}) {response['pubkey']}')
return response
except urllib.error.HTTPError as e:
error_message = json.loads(e.read())
def get_wireguard_info(self):
try:
- response = urllib.request.urlopen('https://api.mullvad.net/public/relays/wireguard/v2/')
- data = json.loads(response.read())
+ req = urllib.request.Request('https://api.mullvad.net/public/relays/wireguard/v2/')
+ req.add_header('Accept-Encoding', 'gzip')
+ response = urllib.request.urlopen(req)
+ data = self.get_response(response)
return data['wireguard']
except urllib.error.HTTPError as e:
- error_message = json.loads(e.read())
+ error_message = self.get_response(e)
+ print(error_message)
+ sys.exit(1)
+
+ def get_multihop_info(self):
+ try:
+ req = urllib.request.Request('https://api.mullvad.net/www/relays/all')
+ req.add_header('Accept-Encoding', 'gzip')
+ response = urllib.request.urlopen(req)
+ data = self.get_response(response)
+ return [i for i in data if i['type'] == 'wireguard']
+ except urllib.error.HTTPError as e:
+ error_message = self.get_response(e)
print(error_message)
sys.exit(1)
- def create_wg_configs(self, device, privatekey):
+ def filter(self, data, _filter):
+ if _filter:
+ return [d for d in data if _filter in d['hostname']]
+ return data
+
+ def create_wg_configs(self, device, privatekey, multihop_server):
wg = self.get_wireguard_info()
output_dir = pathlib.Path(self._output_dir).expanduser()
output_dir.mkdir(exist_ok=True, parents=True)
- print(f'Creating files in: {output_dir}')
- for relay in wg['relays']:
- _hostname = relay['hostname']
- _filepath = pathlib.Path.joinpath(output_dir, f'{_hostname}.conf')
- _filepath.touch(mode=0o600, exist_ok=True)
- with _filepath.open('w') as _file:
- config = configparser.ConfigParser()
- config.add_section('Interface')
- config.set('Interface', '#device', device['name'])
- config.set('Interface', 'privateKey', privatekey)
- config.set('Interface', 'address', ','.join([device['ipv4_address'], device['ipv6_address']]))
- if self._wg_dns:
- config.set('Interface', 'dns', ','.join([str(x) for x in self._wg_dns]))
- else:
- config.set('Interface', 'dns', ','.join([wg['ipv4_gateway'], wg['ipv6_gateway']]))
- config.add_section('Peer')
- config.set('Peer', '#owned', str(relay['owned']))
- config.set('Peer', '#provider', relay['provider'])
- config.set('Peer', 'publickey', relay['public_key'])
- config.set('Peer', 'allowedips', '0.0.0.0/0,::/0')
- if self._wg_relay_ipv6:
- wg_relay_address = relay['ipv6_addr_in']
- else:
- wg_relay_address = relay['ipv4_addr_in']
- config.set('Peer', 'endpoint', f'{wg_relay_address}:{self._wg_relay_port}')
- config.write(_file)
+ config = configparser.ConfigParser()
+ config.add_section('Interface')
+ config.set('Interface', '#device', device['name'])
+ config.set('Interface', 'privateKey', privatekey)
+ config.set('Interface', 'address', ','.join([device['ipv4_address'], device['ipv6_address']]))
+ if self._wg_dns:
+ config.set('Interface', 'dns', ','.join([str(x) for x in self._wg_dns]))
+ else:
+ config.set('Interface', 'dns', ','.join([wg['ipv4_gateway'], wg['ipv6_gateway']]))
+ config.add_section('Peer')
+
+ if multihop_server:
+ _servername = multihop_server["hostname"]
+ relays = self.filter(self.get_multihop_info(), self._filter)
+ if relays:
+ print(f'Creating files in: {output_dir}')
+ for relay in relays:
+ _hostname = relay['hostname']
+ _filepath = pathlib.Path.joinpath(output_dir, f'{_hostname}-via-{_servername}.conf')
+ _filepath.touch(mode=0o600, exist_ok=True)
+ with _filepath.open('w') as _file:
+ config.set('Peer', '#owned', str(relay['owned']))
+ config.set('Peer', '#provider', relay['provider'])
+ config.set('Peer', 'publickey', relay['pubkey'])
+ config.set('Peer', 'allowedips', '0.0.0.0/0,::/0')
+ if self._wg_relay_ipv6:
+ wg_relay_address = multihop_server['ipv6_addr_in']
+ else:
+ wg_relay_address = multihop_server['ipv4_addr_in']
+ config.set('Peer', 'endpoint', f'{wg_relay_address}:{relay["multihop_port"]}')
+ config.write(_file)
+ else:
+ print(f'No relays matching filter: {self._filter}')
+ else:
+ relays = self.filter(wg['relays'], self._filter)
+ if relays:
+ print(f'Creating files in: {output_dir}')
+ for relay in relays:
+ _hostname = relay['hostname']
+ _filepath = pathlib.Path.joinpath(output_dir, f'{_hostname}.conf')
+ _filepath.touch(mode=0o600, exist_ok=True)
+ with _filepath.open('w') as _file:
+ config.set('Peer', '#owned', str(relay['owned']))
+ config.set('Peer', '#provider', relay['provider'])
+ config.set('Peer', 'publickey', relay['public_key'])
+ config.set('Peer', 'allowedips', '0.0.0.0/0,::/0')
+ if self._wg_relay_ipv6:
+ wg_relay_address = relay['ipv6_addr_in']
+ else:
+ wg_relay_address = relay['ipv4_addr_in']
+ config.set('Peer', 'endpoint', f'{wg_relay_address}:{self._wg_relay_port}')
+ config.write(_file)
+ else:
+ print(f'No relays matching filter: {self._filter}')
def validate_account(value):
parser.add_argument(
'--ipv6', dest='wg_relay_ipv6', help='use ipv6 address for relays in wireguard configs', action='store_true')
parser.add_argument(
- '--version', help='show version information', action='version', version=f'%(prog)s-{_version}')
+ '--multihop-server', dest='wg_multihop_server', action='store', default=None, help='use multihop server')
+ parser.add_argument(
+ '--filter', action='store', default=None,
+ help='filter relay list before creating configuration files')
+ parser.add_argument(
+ '--version', help='show version information', action='version', version=f'%(prog)s-{_version}')
args = parser.parse_args()
- mullvad = Mullvad(args)
- mullvad.run()
+ try:
+ mullvad = Mullvad(args)
+ mullvad.run()
+ except Exception as e:
+ print('Error:', e)
+ sys.exit(1)
if __name__ == '__main__':