initial commit
authorkalken <redacted>
Tue, 21 Jan 2025 08:50:08 +0000 (09:50 +0100)
committerkalken <redacted>
Tue, 21 Jan 2025 08:50:08 +0000 (09:50 +0100)
.flake8 [new file with mode: 0644]
.gitignore [new file with mode: 0644]
README.md [new file with mode: 0644]
wg-mullvad.py [new file with mode: 0755]

diff --git a/.flake8 b/.flake8
new file mode 100644 (file)
index 0000000..b3788b8
--- /dev/null
+++ b/.flake8
@@ -0,0 +1,3 @@
+[flake8]
+extend-ignore = E265,E266
+max-line-length = 120
diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..8efb3db
--- /dev/null
@@ -0,0 +1,6 @@
+# Custom
+*.swp
+.DS_Store
+config/*
+__pycache__
+.env*
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..54fc8a1
--- /dev/null
+++ b/README.md
@@ -0,0 +1,8 @@
+# mullvad-wg.py
+
+This repository contains a python script that generates wireguard configuration files to use Mullvad with wireguard.
+
+Use --help to see what can be configured.
+
+The script can generate and reuse the private wireguard key to generate/update files
+
diff --git a/wg-mullvad.py b/wg-mullvad.py
new file mode 100755 (executable)
index 0000000..5addaef
--- /dev/null
@@ -0,0 +1,262 @@
+#!/usr/bin/env python3
+
+import urllib.request
+import configparser
+import argparse
+import pathlib
+import json
+import sys
+import ipaddress
+import base64
+from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
+from cryptography.hazmat.primitives import serialization
+
+
+_version = '0.1 alfa'
+
+
+def generate_publickey(privatekey):
+    private_key_bytes = base64.b64decode(privatekey)
+    private_key = X25519PrivateKey.from_private_bytes(private_key_bytes)
+    public_key = private_key.public_key()
+    public_key_bytes = public_key.public_bytes(
+        encoding=serialization.Encoding.Raw,
+        format=serialization.PublicFormat.Raw
+    )
+    wgpublickey = base64.b64encode(public_key_bytes).decode('utf-8')
+    return wgpublickey
+
+
+def generate_privatekey():
+    privatekey = X25519PrivateKey.generate()
+    private_key_bytes = privatekey.private_bytes(
+        encoding=serialization.Encoding.Raw,
+        format=serialization.PrivateFormat.Raw,
+        encryption_algorithm=serialization.NoEncryption()
+    )
+    wgprivatekey = base64.b64encode(private_key_bytes).decode('utf-8')
+    return wgprivatekey
+
+
+class Mullvad:
+    def __init__(self, args):
+        self._account_number = args.account_number
+        self._output_dir = args.output_dir
+        self._settings_file = args.settings_file
+        self._wg_relay_port = args.wg_relay_port
+        self._wg_relay_ipv6 = args.wg_relay_ipv6
+        self._wg_dns = args.wg_dns
+        self._wg_hijack_dns = args.wg_hijack_dns
+        self._webtoken = None
+
+        self._config = configparser.ConfigParser()
+        self._settings_file = pathlib.Path(self._settings_file).expanduser()
+
+    def run(self):
+        if self._settings_file.is_file():
+            privatekey = self.get_privatekey()
+        else:
+            privatekey = generate_privatekey()
+            self.save_privatekey(privatekey)
+
+        publickey = generate_publickey(privatekey)
+        device = self.get_device(publickey) or self.create_device(publickey)
+        if device:
+            self.create_wg_configs(device, privatekey)
+
+    def get_privatekey(self):
+        print(f'Reading settings from: {self._settings_file}')
+        self._config.read(self._settings_file)
+        try:
+            return self._config.get('Interface', 'privatekey')
+        except (configparser.NoOptionError, configparser.NoSectionError):
+            print('Error: No private key found in settings file')
+            print('Solution: add it or remove the file completely to generate a new device')
+            sys.exit(1)
+
+    def save_privatekey(self, privatekey):
+        self._settings_file.parent.mkdir(parents=True, exist_ok=True)
+        self._settings_file.touch(mode=0o600, exist_ok=True)
+        with self._settings_file.open('w') as _file:
+            print(f'Writing settings to: {self._settings_file}')
+            self._config.add_section('Interface')
+            self._config.set('Interface', 'privatekey', privatekey)
+            self._config.write(_file)
+        return True
+
+    def get_webtoken(self):
+        if not self._webtoken:
+            self.generate_webtoken()
+        return self._webtoken
+
+    def generate_webtoken(self):
+        body = {}
+        body['account_number'] = self._account_number
+        req = urllib.request.Request('https://api.mullvad.net/auth/v1/webtoken')
+        req.add_header('Content-Type', 'application/json')
+        response = urllib.request.urlopen(req, json.dumps(body).encode()).read()
+        data = json.loads(response)
+        self._webtoken = data['access_token']
+
+    def api(self, url, body=None):
+        webtoken = self.get_webtoken()
+        req = urllib.request.Request(url)
+        req.add_header('Authorization', f'Bearer {webtoken}')
+        if body:
+            req.add_header('Content-Type', 'application/json')
+            response = urllib.request.urlopen(req, json.dumps(body).encode())
+        else:
+            response = urllib.request.urlopen(req)
+        data = json.loads(response.read())
+        return data
+
+    def get_device(self, publickey):
+        print(f'Trying to find device: {publickey}')
+        try:
+            for device in self.api('https://api.mullvad.net/accounts/v1/devices'):
+                if publickey == device['pubkey']:
+                    _name = device['name']
+                    _pubkey = device['pubkey']
+                    print(f'Device found: ({_name}) {_pubkey}')
+                    return device
+            print(f'Device is not registered: {publickey}')
+            return None
+        except urllib.error.HTTPError as e:
+            error_message = json.loads(e.read())
+            _code = error_message.get('code')
+            _message = error_message.get('detail')
+            if _message:
+                print(_message)
+            if _code == 'INVALID_ACCOUNT':
+                print(f'Invalid account: {self._account_number}')
+            sys.exit(1)
+
+    def create_device(self, publickey):
+        print(f'Trying to create device: {publickey}')
+        body = {}
+        body['pubkey'] = publickey
+        body['hijack_dns'] = self._wg_hijack_dns
+        try:
+            response = self.api('https://api.mullvad.net/accounts/v1/devices', body)
+            return response
+        except urllib.error.HTTPError as e:
+            error_message = json.loads(e.read())
+            _code = error_message.get('code')
+            _message = error_message.get('detail')
+            if _message:
+                print(_message)
+            if _code == 'PUBKEY_IN_USE':
+                print(f'Error: Private key settings exits in {self._settings_file} but device has been removed')
+                print('Solution 1: Wait for grace period to pass before using this key (5 min)')
+                print('Solution 2: Remove private key from setting file if you want to create a new device')
+                sys.exit(1)
+
+    def get_wireguard_info(self):
+        try:
+            response = urllib.request.urlopen('https://api.mullvad.net/public/relays/wireguard/v2/')
+            data = json.loads(response.read())
+            return data['wireguard']
+        except urllib.error.HTTPError as e:
+            error_message = json.loads(e.read())
+            print(error_message)
+            sys.exit(1)
+
+    def create_wg_configs(self, device, privatekey):
+        wg = self.get_wireguard_info()
+        output_dir = pathlib.Path(self._output_dir).expanduser()
+        if not output_dir.is_dir():
+            output_dir.mkdir(exist_ok=True, parents=True)
+        print(f'Creating files in: {output_dir}')
+        for relay in wg['relays']:
+            _hostname = relay['hostname']
+            _filepath = pathlib.Path.joinpath(output_dir, f'{_hostname}.conf')
+            _filepath.touch(mode=0o600, exist_ok=True)
+            with _filepath.open('w') as _file:
+                config = configparser.ConfigParser()
+                config.add_section('Interface')
+                config.set('Interface', '#device', device['name'])
+                config.set('Interface', 'privateKey', privatekey)
+                config.set('Interface', 'address',  ','.join([device['ipv4_address'], device['ipv6_address']]))
+                if self._wg_dns:
+                    config.set('Interface', 'dns', self._wg_dns)
+                else:
+                    config.set('Interface', 'dns', ','.join([wg['ipv4_gateway'], wg['ipv6_gateway']]))
+                config.add_section('Peer')
+                config.set('Peer', '#owned', str(relay['owned']))
+                config.set('Peer', '#provider', relay['provider'])
+                config.set('Peer', 'publickey', relay['public_key'])
+                config.set('Peer', 'allowedips', '0.0.0.0/0,::/0')
+                if self._wg_relay_ipv6:
+                    wg_relay_address = relay['ipv6_addr_in']
+                else:
+                    wg_relay_address = relay['ipv4_addr_in']
+                config.set('Peer', 'endpoint', f'{wg_relay_address}:{self._wg_relay_port}')
+                config.write(_file)
+
+
+def sanity(args):
+    if args.wg_dns:
+        for _ip in args.wg_dns.split(','):
+            try:
+                ipaddress.ip_address(_ip.strip())
+            except ValueError:
+                print('Dns option has to contain valid ip numbers separated by comma')
+                return False
+    if args.account_number:
+        try:
+            int(args.account_number)
+        except ValueError:
+            print('Account is not a number')
+            return False
+    if args.wg_relay_port:
+        try:
+            int(args.wg_relay_port)
+        except ValueError:
+            print('Port is not a number')
+            return False
+    return True
+
+
+def main():
+    parser = argparse.ArgumentParser(
+            description=f'{__file__}',
+            formatter_class=argparse.ArgumentDefaultsHelpFormatter
+    )
+    parser.add_argument(
+        '--account', dest='account_number', action='store', help='mullvad account number')
+    parser.add_argument(
+        '--settings-file', dest='settings_file', action='store',
+        default='~/.config/mullvad/wg0.conf', help='settings file to use')
+    parser.add_argument(
+        '--output-dir', dest='output_dir', action='store',
+        default='~/.config/mullvad/wg0', help='directory to write settings')
+    parser.add_argument(
+        '--wg-relay-port', dest='wg_relay_port', action='store',
+        default=51820, help='use custom port for relays in wireguard configs')
+    parser.add_argument(
+        '--dns', dest='wg_dns', action='store',
+        default='', help='use custom dns server in wireguard configs')
+    parser.add_argument(
+        '--hijack-dns', dest='wg_hijack_dns', help='activate hijack dns when creating device', action='store_true')
+    parser.add_argument(
+        '--ipv6', dest='wg_relay_ipv6', help='use ipv6 address for relays in wireguard configs', action='store_true')
+    parser.add_argument(
+        '--version', help='show version information', action='store_true')
+    args = parser.parse_args()
+
+    if not sanity(args):
+        sys.exit(1)
+
+    if args.version:
+        print('Version:', _version)
+        sys.exit()
+    if args.account_number:
+        mullvad = Mullvad(args)
+        mullvad.run()
+    else:
+        parser.print_help(sys.stderr)
+        print('error: No account specified')
+
+
+if __name__ == '__main__':
+    main()
git clone https://git.99rst.org/PROJECT