From: kalken Date: Tue, 21 Jan 2025 08:50:08 +0000 (+0100) Subject: initial commit X-Git-Url: http://git.99rst.org/?a=commitdiff_plain;h=b1fe9a136c79f1f07937baeea60306534748e66f;p=mullvad-wg-tools.git initial commit --- b1fe9a136c79f1f07937baeea60306534748e66f diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..b3788b8 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +extend-ignore = E265,E266 +max-line-length = 120 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8efb3db --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +# Custom +*.swp +.DS_Store +config/* +__pycache__ +.env* diff --git a/README.md b/README.md new file mode 100644 index 0000000..54fc8a1 --- /dev/null +++ b/README.md @@ -0,0 +1,8 @@ +# mullvad-wg.py + +This repository contains a python script that generates wireguard configuration files to use Mullvad with wireguard. + +Use --help to see what can be configured. + +The script can generate and reuse the private wireguard key to generate/update files + diff --git a/wg-mullvad.py b/wg-mullvad.py new file mode 100755 index 0000000..5addaef --- /dev/null +++ b/wg-mullvad.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 + +import urllib.request +import configparser +import argparse +import pathlib +import json +import sys +import ipaddress +import base64 +from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey +from cryptography.hazmat.primitives import serialization + + +_version = '0.1 alfa' + + +def generate_publickey(privatekey): + private_key_bytes = base64.b64decode(privatekey) + private_key = X25519PrivateKey.from_private_bytes(private_key_bytes) + public_key = private_key.public_key() + public_key_bytes = public_key.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw + ) + wgpublickey = base64.b64encode(public_key_bytes).decode('utf-8') + return wgpublickey + + +def generate_privatekey(): + privatekey = X25519PrivateKey.generate() + private_key_bytes = privatekey.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption() + ) + wgprivatekey = base64.b64encode(private_key_bytes).decode('utf-8') + return wgprivatekey + + +class Mullvad: + def __init__(self, args): + self._account_number = args.account_number + self._output_dir = args.output_dir + self._settings_file = args.settings_file + self._wg_relay_port = args.wg_relay_port + self._wg_relay_ipv6 = args.wg_relay_ipv6 + self._wg_dns = args.wg_dns + self._wg_hijack_dns = args.wg_hijack_dns + self._webtoken = None + + self._config = configparser.ConfigParser() + self._settings_file = pathlib.Path(self._settings_file).expanduser() + + def run(self): + if self._settings_file.is_file(): + privatekey = self.get_privatekey() + else: + privatekey = generate_privatekey() + self.save_privatekey(privatekey) + + publickey = generate_publickey(privatekey) + device = self.get_device(publickey) or self.create_device(publickey) + if device: + self.create_wg_configs(device, privatekey) + + def get_privatekey(self): + print(f'Reading settings from: {self._settings_file}') + self._config.read(self._settings_file) + try: + return self._config.get('Interface', 'privatekey') + except (configparser.NoOptionError, configparser.NoSectionError): + print('Error: No private key found in settings file') + print('Solution: add it or remove the file completely to generate a new device') + sys.exit(1) + + def save_privatekey(self, privatekey): + self._settings_file.parent.mkdir(parents=True, exist_ok=True) + self._settings_file.touch(mode=0o600, exist_ok=True) + with self._settings_file.open('w') as _file: + print(f'Writing settings to: {self._settings_file}') + self._config.add_section('Interface') + self._config.set('Interface', 'privatekey', privatekey) + self._config.write(_file) + return True + + def get_webtoken(self): + if not self._webtoken: + self.generate_webtoken() + return self._webtoken + + def generate_webtoken(self): + body = {} + body['account_number'] = self._account_number + req = urllib.request.Request('https://api.mullvad.net/auth/v1/webtoken') + req.add_header('Content-Type', 'application/json') + response = urllib.request.urlopen(req, json.dumps(body).encode()).read() + data = json.loads(response) + self._webtoken = data['access_token'] + + def api(self, url, body=None): + webtoken = self.get_webtoken() + req = urllib.request.Request(url) + req.add_header('Authorization', f'Bearer {webtoken}') + if body: + req.add_header('Content-Type', 'application/json') + response = urllib.request.urlopen(req, json.dumps(body).encode()) + else: + response = urllib.request.urlopen(req) + data = json.loads(response.read()) + return data + + def get_device(self, publickey): + print(f'Trying to find device: {publickey}') + try: + for device in self.api('https://api.mullvad.net/accounts/v1/devices'): + if publickey == device['pubkey']: + _name = device['name'] + _pubkey = device['pubkey'] + print(f'Device found: ({_name}) {_pubkey}') + return device + print(f'Device is not registered: {publickey}') + return None + except urllib.error.HTTPError as e: + error_message = json.loads(e.read()) + _code = error_message.get('code') + _message = error_message.get('detail') + if _message: + print(_message) + if _code == 'INVALID_ACCOUNT': + print(f'Invalid account: {self._account_number}') + sys.exit(1) + + def create_device(self, publickey): + print(f'Trying to create device: {publickey}') + body = {} + body['pubkey'] = publickey + body['hijack_dns'] = self._wg_hijack_dns + try: + response = self.api('https://api.mullvad.net/accounts/v1/devices', body) + return response + except urllib.error.HTTPError as e: + error_message = json.loads(e.read()) + _code = error_message.get('code') + _message = error_message.get('detail') + if _message: + print(_message) + if _code == 'PUBKEY_IN_USE': + print(f'Error: Private key settings exits in {self._settings_file} but device has been removed') + print('Solution 1: Wait for grace period to pass before using this key (5 min)') + print('Solution 2: Remove private key from setting file if you want to create a new device') + sys.exit(1) + + def get_wireguard_info(self): + try: + response = urllib.request.urlopen('https://api.mullvad.net/public/relays/wireguard/v2/') + data = json.loads(response.read()) + return data['wireguard'] + except urllib.error.HTTPError as e: + error_message = json.loads(e.read()) + print(error_message) + sys.exit(1) + + def create_wg_configs(self, device, privatekey): + wg = self.get_wireguard_info() + output_dir = pathlib.Path(self._output_dir).expanduser() + if not output_dir.is_dir(): + output_dir.mkdir(exist_ok=True, parents=True) + print(f'Creating files in: {output_dir}') + for relay in wg['relays']: + _hostname = relay['hostname'] + _filepath = pathlib.Path.joinpath(output_dir, f'{_hostname}.conf') + _filepath.touch(mode=0o600, exist_ok=True) + with _filepath.open('w') as _file: + config = configparser.ConfigParser() + config.add_section('Interface') + config.set('Interface', '#device', device['name']) + config.set('Interface', 'privateKey', privatekey) + config.set('Interface', 'address', ','.join([device['ipv4_address'], device['ipv6_address']])) + if self._wg_dns: + config.set('Interface', 'dns', self._wg_dns) + else: + config.set('Interface', 'dns', ','.join([wg['ipv4_gateway'], wg['ipv6_gateway']])) + config.add_section('Peer') + config.set('Peer', '#owned', str(relay['owned'])) + config.set('Peer', '#provider', relay['provider']) + config.set('Peer', 'publickey', relay['public_key']) + config.set('Peer', 'allowedips', '0.0.0.0/0,::/0') + if self._wg_relay_ipv6: + wg_relay_address = relay['ipv6_addr_in'] + else: + wg_relay_address = relay['ipv4_addr_in'] + config.set('Peer', 'endpoint', f'{wg_relay_address}:{self._wg_relay_port}') + config.write(_file) + + +def sanity(args): + if args.wg_dns: + for _ip in args.wg_dns.split(','): + try: + ipaddress.ip_address(_ip.strip()) + except ValueError: + print('Dns option has to contain valid ip numbers separated by comma') + return False + if args.account_number: + try: + int(args.account_number) + except ValueError: + print('Account is not a number') + return False + if args.wg_relay_port: + try: + int(args.wg_relay_port) + except ValueError: + print('Port is not a number') + return False + return True + + +def main(): + parser = argparse.ArgumentParser( + description=f'{__file__}', + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument( + '--account', dest='account_number', action='store', help='mullvad account number') + parser.add_argument( + '--settings-file', dest='settings_file', action='store', + default='~/.config/mullvad/wg0.conf', help='settings file to use') + parser.add_argument( + '--output-dir', dest='output_dir', action='store', + default='~/.config/mullvad/wg0', help='directory to write settings') + parser.add_argument( + '--wg-relay-port', dest='wg_relay_port', action='store', + default=51820, help='use custom port for relays in wireguard configs') + parser.add_argument( + '--dns', dest='wg_dns', action='store', + default='', help='use custom dns server in wireguard configs') + parser.add_argument( + '--hijack-dns', dest='wg_hijack_dns', help='activate hijack dns when creating device', action='store_true') + parser.add_argument( + '--ipv6', dest='wg_relay_ipv6', help='use ipv6 address for relays in wireguard configs', action='store_true') + parser.add_argument( + '--version', help='show version information', action='store_true') + args = parser.parse_args() + + if not sanity(args): + sys.exit(1) + + if args.version: + print('Version:', _version) + sys.exit() + if args.account_number: + mullvad = Mullvad(args) + mullvad.run() + else: + parser.print_help(sys.stderr) + print('error: No account specified') + + +if __name__ == '__main__': + main()