123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- import glob
- import json
- import os
- import os.path
- import re
- import shutil
- from enum import Enum
- from subprocess import call
-
-
- class VpnGenError(Enum):
- Success = 0,
- VpnAlreadyExists = 1,
- VpnDoesNotExists = 2,
- ClientAlreadyExists = 3,
- ClientDoesNotExists = 4
-
-
- class VpnGen:
- default_config_base_dir = ""
- default_config_file = ""
- default_client_config_file = ""
- ovpn_config_path = ""
-
- def __init__(self, default_config_path, ovpn_config_path):
- self.default_config_base_dir = os.path.abspath(default_config_path)
- self.default_config_file = "%s.conf" % self.default_config_base_dir
- self.default_client_config_file = "%s%sclients%sclient.conf" % (self.default_config_base_dir, os.sep, os.sep)
- self.ovpn_config_path = os.path.abspath(ovpn_config_path)
-
- def f7(self, seq):
- seen = set()
- seen_add = seen.add
- return [x for x in seq if not (x in seen or seen_add(x))]
-
- def get_vpn_vars(self):
- with open(self.default_config_file, "r") as f:
- default_config = f.read()
-
- variables = re.findall('\$\{([^}]+)}', default_config)
- variables += ["KEY_COUNTRY", "KEY_PROVINCE", "KEY_CITY", "KEY_ORG", "KEY_EMAIL"]
- variables = self.f7(variables)
-
- return variables
-
- def get_client_vars(self, vpn_name):
- default_client_config_path = self.get_client_default_config_path(vpn_name)
- if not os.path.exists(default_client_config_path):
- return None
-
- with open(default_client_config_path, "r") as f:
- default_config = f.read()
- variables = re.findall('\$\{([^}]+)}', default_config)
- variables = self.f7(variables)
-
- vpn_variables = self.get_vpn_vars()
- real_variables = []
-
- for var in variables:
- if var not in vpn_variables and var != "client":
- real_variables.append(var)
-
- return real_variables
-
- def get_base_dir(self, vpn_name):
- return "%s%s%s%s" % (self.ovpn_config_path, os.sep, vpn_name, os.sep)
-
- def get_config_path(self, vpn_name):
- return "%s%s%s.conf" % (self.ovpn_config_path, os.sep, vpn_name)
-
- def get_vpn_variables_path(self, vpn_name):
- base_dir = self.get_base_dir(vpn_name)
- return "%svpngen.json" % base_dir
-
- def get_easy_rsa_dir(self, vpn_name):
- base_dir = self.get_base_dir(vpn_name)
- return "%seasy-rsa%s" % (base_dir, os.sep)
-
- def get_easy_rsa_key_dir(self, vpn_name):
- easyrsadir = self.get_easy_rsa_dir(vpn_name)
- return "%skeys%s" % (easyrsadir, os.sep)
-
- def get_pkitool_path(self, vpn_name):
- easyrsadir = self.get_easy_rsa_dir(vpn_name)
- return "%spkitool" % easyrsadir
-
- def get_client_default_config_path(self, vpn_name):
- base_dir = self.get_base_dir(vpn_name)
- return "%s%sclients%sclient.conf" % (base_dir, os.sep, os.sep)
-
- def get_client_dir(self, vpn_name, client_name):
- base_dir = self.get_base_dir(vpn_name)
- return "%sclients%s%s-%s%s" % (base_dir, os.sep, client_name, vpn_name, os.sep)
-
- def get_client_config_path(self, vpn_name, client_name):
- client_dir = self.get_client_dir(vpn_name, client_name)
- return "%s%s-%s.conf" % (client_dir, client_name, vpn_name)
-
- def get_client_variables_path(self, vpn_name, client_name):
- client_dir = self.get_client_dir(vpn_name, client_name)
- return "%svpngen.json" % client_dir
-
- def get_client_generated_files_paths(self, vpn_name, client_name):
- keys_dir = self.get_easy_rsa_key_dir(vpn_name,)
- return [
- "%s%s.crt" % (keys_dir, client_name),
- "%s%s.key" % (keys_dir, client_name)
- ]
-
- def get_client_tarball_path(self, vpn_name, client_name):
- base_dir = self.get_base_dir(vpn_name)
- return "%sclients%s%s-%s.tar.bz2" % (base_dir, os.sep, client_name, vpn_name)
-
- def get_server_needed_files_paths(self, vpn_name):
- keys_dir = self.get_easy_rsa_key_dir(vpn_name)
- return [
- "%sca.crt" % keys_dir,
- "%sta.key" % keys_dir
- ]
-
- def get_client_misc_files_paths(self, vpn_name):
- base_dir = self.get_base_dir(vpn_name)
- return glob.glob("%smisc-files%s*" % (base_dir, os.sep))
-
- def get_all_needed_files_paths(self, vpn_name, client_name):
- return self.get_client_generated_files_paths(vpn_name, client_name) +\
- self.get_server_needed_files_paths(vpn_name) +\
- self.get_client_misc_files_paths(vpn_name)
-
- def get_server_variables(self, vpn_name):
- with open(self.get_vpn_variables_path(vpn_name), "r") as f:
- return json.load(f)['variables']
-
- def get_client_variables(self, vpn_name, client_name):
- with open(self.get_client_variables_path(vpn_name, client_name), "r") as f:
- return json.load(f)['variables']
-
- def get_client_list(self, vpn_name):
- base_dir = self.get_base_dir(vpn_name)
- files_paths = glob.glob("%sclients%s*" % (base_dir, os.sep))
- files_names = list(map(lambda file_path: os.path.basename(file_path), files_paths))
-
- clients = []
- for file_name in files_names:
- if file_name != 'client.conf' and not file_name.endswith(".tar.bz2"):
- clients.append(file_name[0:len(file_name) - len(vpn_name) - 1])
- return clients
-
- def setup_vars(self, vpn_name, variables):
- easyrsadir = self.get_easy_rsa_dir(vpn_name)
-
- os.environ["KEY_COUNTRY"] = variables['KEY_COUNTRY']
- os.environ["KEY_PROVINCE"] = variables['KEY_PROVINCE']
- os.environ["KEY_CITY"] = variables['KEY_CITY']
- os.environ["KEY_ORG"] = variables['KEY_ORG']
- os.environ["KEY_OU"] = variables['KEY_ORG']
- os.environ["KEY_CN"] = variables['KEY_ORG']
- os.environ["KEY_NAME"] = variables['KEY_ORG']
- os.environ["KEY_EMAIL"] = variables['KEY_EMAIL']
- os.environ["KEY_SIZE"] = variables['KEY_SIZE']
- os.environ["CA_EXPIRE"] = variables['CA_EXPIRE']
- os.environ["KEY_EXPIRE"] = variables['KEY_EXPIRE']
-
- os.environ["EASY_RSA"] = easyrsadir
- os.environ["OPENSSL"] = "openssl"
- os.environ["PKCS11TOOL"] = "pkcs11-tool"
- os.environ["GREP"] = "grep"
- os.environ["KEY_CONFIG"] = "%s%s" % (easyrsadir, "openssl.cnf")
- os.environ["KEY_DIR"] = "%s%s" % (easyrsadir, "keys")
- os.environ["PKCS11_MODULE_PATH"] = "dummy"
- os.environ["PKCS11_PIN"] = "dummy"
-
- def create_vpn(self, vpn_name, variables):
- base_dir = self.get_base_dir(vpn_name)
- conf_file = self.get_config_path(vpn_name)
- conf_vpngen_file = self.get_vpn_variables_path(vpn_name)
- if os.path.exists(base_dir) or os.path.exists(conf_file):
- return VpnGenError.VpnAlreadyExists
-
- with open(self.default_config_file, "r") as f:
- default_config = f.read()
-
- variables['name'] = vpn_name
- for variable in variables:
- default_config = default_config.replace("${%s}" % variable, variables[variable])
-
- os.makedirs(base_dir)
- with open(conf_file, "w") as f:
- f.write(default_config)
-
- os.rmdir(base_dir)
- shutil.copytree(self.default_config_base_dir, base_dir)
-
- curdir = os.curdir
- easyrsadir = self.get_easy_rsa_dir(vpn_name)
- pkitool = self.get_pkitool_path(vpn_name)
- os.chdir(easyrsadir)
-
- self.setup_vars(vpn_name, variables)
-
- call([".%sclean-all" % os.sep])
- call([pkitool, "--initca", "-batch"])
- call([pkitool, "--server", "server", "-batch"])
- call([".%sbuild-dh" % os.sep])
- call(["openssl", "ca", "-gencrl",
- "-keyfile", "keys%sca.key" % os.sep,
- "-cert", "keys%sca.crt" % os.sep,
- "-out", "keys%scrl.pem" % os.sep,
- "-config", "openssl.cnf"])
-
- del os.environ["KEY_OU"]
- del os.environ["KEY_CN"]
- del os.environ["KEY_NAME"]
-
- call(["openvpn", "--genkey", "--secret", "keys%sta.key" % os.sep])
-
- with open(conf_vpngen_file, "w") as f:
- json.dump({'variables': variables}, f, indent=4, separators=(',', ': '))
-
- os.chdir(curdir)
-
- return VpnGenError.Success
-
- def remove_vpn(self, vpn_name):
- base_dir = self.get_base_dir(vpn_name)
- conf_file = self.get_config_path(vpn_name)
- if not os.path.exists(base_dir) and not os.path.exists(conf_file):
- return VpnGenError.VpnDoesNotExists
- os.remove(conf_file)
- shutil.rmtree(base_dir)
- return VpnGenError.Success
-
- def create_client(self, vpn_name, client_name, variables):
- base_dir = self.get_base_dir(vpn_name)
- if not os.path.exists(base_dir):
- return VpnGenError.VpnDoesNotExists
- client_dir = self.get_client_dir(vpn_name, client_name)
- if os.path.exists(client_dir):
- return VpnGenError.ClientAlreadyExists
-
- curdir = os.curdir
- easyrsadir = self.get_easy_rsa_dir(vpn_name)
- pkitool = self.get_pkitool_path(vpn_name)
- os.chdir(easyrsadir)
-
- self.setup_vars(vpn_name, variables)
- os.environ["KEY_CN"] = client_name
- os.environ["KEY_NAME"] = client_name
-
- call([pkitool, client_name])
-
- os.chdir(curdir)
-
- os.makedirs(client_dir)
-
- return self.rebuild_client(vpn_name, client_name, variables)
-
- def remove_client(self, vpn_name, client_name):
- base_dir = self.get_base_dir(vpn_name)
- if not os.path.exists(base_dir):
- return VpnGenError.VpnDoesNotExists
- client_dir = self.get_client_dir(vpn_name, client_name)
- if not os.path.exists(client_dir):
- return VpnGenError.ClientDoesNotExists
- return VpnGenError.Success
-
- def rebuild_client(self, vpn_name, client_name, variables):
- client_dir = self.get_client_dir(vpn_name, client_name)
- client_conf_file = self.get_client_config_path(vpn_name, client_name)
- client_default_config_path = self.get_client_default_config_path(vpn_name)
- with open(client_default_config_path, "r") as f:
- client_default_config = f.read()
-
- for variable in variables:
- client_default_config = client_default_config.replace("${%s}" % variable, variables[variable])
-
- files_names = glob.glob("%s%s*" % (client_dir, os.sep))
- for file_name in files_names:
- os.remove(file_name)
-
- with open(client_conf_file, "w") as f:
- f.write(client_default_config)
-
- files_paths = self.get_all_needed_files_paths(vpn_name, client_name)
- for file_path in files_paths:
- split = os.path.splitext(file_path)
- dest = "%s%s-%s%s" % (client_dir, os.path.basename(split[0]), vpn_name, split[1])
- shutil.copy(file_path, dest)
-
- split = os.path.splitext(client_conf_file)
- client_ovpn_file = "%s.ovpn" % split[0]
- shutil.copy(client_conf_file, client_ovpn_file)
-
- files_names = glob.glob("%s%s*" % (client_dir, os.sep))
- files_names = list(map(lambda file_path: os.path.basename(file_path), files_names))
-
- call(["tar", "cfj", self.get_client_tarball_path(vpn_name, client_name),
- "-C", client_dir] + files_names)
-
- client_variables = {}
- for variable in self.get_client_vars(vpn_name):
- client_variables[variable] = variables[variable]
-
- with open(self.get_client_variables_path(vpn_name, client_name), "w") as f:
- json.dump({'variables': client_variables}, f, indent=4, separators=(',', ': '))
-
- return VpnGenError.Success
|