import glob import json import os import os.path import re import shutil from enum import Enum from subprocess import call class VpnGenError(Enum): Success = 0, VpnAlreadyExists = 1, VpnDoesNotExists = 2, ClientAlreadyExists = 3, ClientDoesNotExists = 4 class VpnGen: default_config_base_dir = "" default_config_file = "" default_client_config_file = "" ovpn_config_path = "" def __init__(self, default_config_path, ovpn_config_path): self.default_config_base_dir = os.path.abspath(default_config_path) self.default_config_file = "%s.conf" % self.default_config_base_dir self.default_client_config_file = "%s%sclients%sclient.conf" % (self.default_config_base_dir, os.sep, os.sep) self.ovpn_config_path = os.path.abspath(ovpn_config_path) def f7(self, seq): seen = set() seen_add = seen.add return [x for x in seq if not (x in seen or seen_add(x))] def get_vpn_vars(self): with open(self.default_config_file, "r") as f: default_config = f.read() variables = re.findall('\$\{([^}]+)}', default_config) variables += ["KEY_COUNTRY", "KEY_PROVINCE", "KEY_CITY", "KEY_ORG", "KEY_EMAIL"] variables = self.f7(variables) return variables def get_client_vars(self, vpn_name): default_client_config_path = self.get_client_default_config_path(vpn_name) if not os.path.exists(default_client_config_path): return None with open(default_client_config_path, "r") as f: default_config = f.read() variables = re.findall('\$\{([^}]+)}', default_config) variables = self.f7(variables) return variables def get_base_dir(self, vpn_name): return "%s%s%s%s" % (self.ovpn_config_path, os.sep, vpn_name, os.sep) def get_config_path(self, vpn_name): return "%s%s%s.conf" % (self.ovpn_config_path, os.sep, vpn_name) def get_vpn_variables_path(self, vpn_name): base_dir = self.get_base_dir(vpn_name) return "%svpngen.json" % base_dir def get_easy_rsa_dir(self, vpn_name): base_dir = self.get_base_dir(vpn_name) return "%seasy-rsa%s" % (base_dir, os.sep) def get_easy_rsa_key_dir(self, vpn_name): easyrsadir = self.get_easy_rsa_dir(vpn_name) return "%skeys%s" % (easyrsadir, os.sep) def get_pkitool_path(self, vpn_name): easyrsadir = self.get_easy_rsa_dir(vpn_name) return "%spkitool" % easyrsadir def get_client_default_config_path(self, vpn_name): base_dir = self.get_base_dir(vpn_name) return "%s%sclients/client.conf" % (base_dir, os.sep) def get_client_dir(self, vpn_name, client_name): base_dir = self.get_base_dir(vpn_name) return "%sclients%s%s-%s%s" % (base_dir, os.sep, client_name, vpn_name, os.sep) def get_client_config_path(self, vpn_name, client_name): client_dir = self.get_client_dir(vpn_name, client_name) return "%s%s-%s.conf" % (client_dir, client_name, vpn_name) def get_client_generated_files_paths(self, vpn_name, client_name): keys_dir = self.get_easy_rsa_key_dir(vpn_name,) return [ "%s%s.crt" % (keys_dir, client_name), "%s%s.key" % (keys_dir, client_name) ] def get_client_tarball_path(self, vpn_name, client_name): base_dir = self.get_base_dir(vpn_name) return "%sclients%s%s-%s.tar.bz2" % (base_dir, os.sep, client_name, vpn_name) def get_server_needed_files_paths(self, vpn_name): keys_dir = self.get_easy_rsa_key_dir(vpn_name) return [ "%sca.crt" % keys_dir, "%sta.key" % keys_dir ] def get_all_needed_files_paths(self, vpn_name, client_name): return self.get_client_generated_files_paths(vpn_name, client_name) +\ self.get_server_needed_files_paths(vpn_name) def setup_vars(self, vpn_name, variables): easyrsadir = self.get_easy_rsa_dir(vpn_name) os.environ["KEY_COUNTRY"] = variables['KEY_COUNTRY'] os.environ["KEY_PROVINCE"] = variables['KEY_PROVINCE'] os.environ["KEY_CITY"] = variables['KEY_CITY'] os.environ["KEY_ORG"] = variables['KEY_ORG'] os.environ["KEY_OU"] = variables['KEY_ORG'] os.environ["KEY_CN"] = variables['KEY_ORG'] os.environ["KEY_NAME"] = variables['KEY_ORG'] os.environ["KEY_EMAIL"] = variables['KEY_EMAIL'] os.environ["KEY_SIZE"] = variables['KEY_SIZE'] os.environ["CA_EXPIRE"] = variables['CA_EXPIRE'] os.environ["KEY_EXPIRE"] = variables['KEY_EXPIRE'] os.environ["EASY_RSA"] = easyrsadir os.environ["OPENSSL"] = "openssl" os.environ["PKCS11TOOL"] = "pkcs11-tool" os.environ["GREP"] = "grep" os.environ["KEY_CONFIG"] = "%s%s" % (easyrsadir, "openssl.cnf") os.environ["KEY_DIR"] = "%s%s" % (easyrsadir, "keys") os.environ["PKCS11_MODULE_PATH"] = "dummy" os.environ["PKCS11_PIN"] = "dummy" def create_vpn(self, vpn_name, variables): base_dir = self.get_base_dir(vpn_name) conf_file = self.get_config_path(vpn_name) conf_vpngen_file = self.get_vpn_variables_path(vpn_name) if os.path.exists(base_dir) or os.path.exists(conf_file): return VpnGenError.VpnAlreadyExists with open(self.default_config_file, "r") as f: default_config = f.read() variables['name'] = vpn_name for variable in variables: default_config = default_config.replace("${%s}" % variable, variables[variable]) os.makedirs(base_dir) with open(conf_file, "w") as f: f.write(default_config) os.rmdir(base_dir) shutil.copytree(self.default_config_base_dir, base_dir) curdir = os.curdir easyrsadir = self.get_easy_rsa_dir(vpn_name) pkitool = self.get_pkitool_path(vpn_name) os.chdir(easyrsadir) self.setup_vars(vpn_name, variables) call([".%sclean-all" % os.sep]) call([pkitool, "--initca", "-batch"]) call([pkitool, "--server", "server", "-batch"]) call([".%sbuild-dh" % os.sep]) call(["openssl", "ca", "-gencrl", "-keyfile", "keys%sca.key" % os.sep, "-cert", "keys%sca.crt" % os.sep, "-out", "keys%scrl.pem" % os.sep, "-config", "openssl.cnf"]) del os.environ["KEY_OU"] del os.environ["KEY_CN"] del os.environ["KEY_NAME"] call(["openvpn", "--genkey", "--secret", "keys%sta.key" % os.sep]) with open(conf_vpngen_file, "w") as f: json.dump({'variables': variables}, f, indent=4, separators=(',', ': ')) os.chdir(curdir) return VpnGenError.Success def remove_vpn(self, vpn_name): base_dir = self.get_base_dir(vpn_name) conf_file = self.get_config_path(vpn_name) if not os.path.exists(base_dir) and not os.path.exists(conf_file): return VpnGenError.VpnDoesNotExists os.remove(conf_file) shutil.rmtree(base_dir) return VpnGenError.Success def create_client(self, vpn_name, client_name, variables): base_dir = self.get_base_dir(vpn_name) if not os.path.exists(base_dir): return VpnGenError.VpnDoesNotExists client_dir = self.get_client_dir(vpn_name, client_name) if os.path.exists(client_dir): return VpnGenError.ClientAlreadyExists curdir = os.curdir easyrsadir = self.get_easy_rsa_dir(vpn_name) pkitool = self.get_pkitool_path(vpn_name) os.chdir(easyrsadir) self.setup_vars(vpn_name, variables) os.environ["KEY_CN"] = client_name os.environ["KEY_NAME"] = client_name call([pkitool, client_name]) os.chdir(curdir) self.build_client(vpn_name, client_name, variables) return VpnGenError.Success def remove_client(self, vpn_name, client_name): base_dir = self.get_base_dir(vpn_name) if not os.path.exists(base_dir): return VpnGenError.VpnDoesNotExists client_dir = self.get_client_dir(vpn_name, client_name) if not os.path.exists(client_dir): return VpnGenError.ClientDoesNotExists return VpnGenError.Success def rebuild_clients(self, vpn_name): base_dir = self.get_base_dir(vpn_name) return VpnGenError.Success def build_client(self, vpn_name, client_name, variables): client_dir = self.get_client_dir(vpn_name, client_name) client_conf_file = self.get_client_config_path(vpn_name, client_name) client_default_config_path = self.get_client_default_config_path(vpn_name) with open(client_default_config_path, "r") as f: client_default_config = f.read() variables['name'] = vpn_name variables['client'] = client_name for variable in variables: client_default_config = client_default_config.replace("${%s}" % variable, variables[variable]) os.makedirs(client_dir) with open(client_conf_file, "w") as f: f.write(client_default_config) files_paths = self.get_all_needed_files_paths(vpn_name, client_name) for file_path in files_paths: split = os.path.splitext(file_path) dest = "%s%s-%s%s" % (client_dir, os.path.basename(split[0]), vpn_name, split[1]) shutil.copy(file_path, dest) files_names = glob.glob("%s%s*" % (client_dir, os.sep)) files_names = list(map(lambda file_path: os.path.basename(file_path), files_names)) call(["tar", "cfj", self.get_client_tarball_path(vpn_name, client_name), "-C", client_dir] + files_names)