You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

PdnsApiAuthenticator.py 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import json
  2. import logging
  3. import time
  4. from certbot import errors
  5. from certbot_pdns.pdnsapi import PdnsApi
  6. logger = logging.getLogger(__name__)
  7. class PdnsApiAuthenticator:
  8. api = None
  9. zones = None
  10. axfr_time = None
  11. def find_best_matching_zone(self, domain):
  12. if domain is None or domain == "":
  13. return None
  14. for zone in self.zones:
  15. if zone['name'] == domain + ".":
  16. return zone
  17. return self.find_best_matching_zone(domain[domain.index(".") + 1:]) if "." in domain else None
  18. def find_soa(self, zone):
  19. for rrset in zone["rrsets"]:
  20. if rrset["type"] == "SOA":
  21. return rrset, rrset["records"][0]
  22. return None
  23. def flush_zone(self, zone_name):
  24. res = self.api.flush_zone_cache(zone_name)
  25. if res is None or "result" not in res or res["result"] != "Flushed cache.":
  26. raise errors.PluginError("Bad return from PDNS API when flushing cache: %s" % res)
  27. def notify_zone(self, zone_name):
  28. res = self.api.notify_zone(zone_name)
  29. if res is None or "result" not in res or res["result"] != "Notification queued":
  30. raise errors.PluginError("Bad return from PDNS API when notifying: %s" % res)
  31. def update_soa(self, zone_name):
  32. zone = self.api.get_zone(zone_name)
  33. if zone is None or "error" in zone:
  34. raise errors.PluginError("Bad return from PDNS API when getting zone %s: %s" % (zone_name, zone))
  35. rrset, soa = self.find_soa(zone)
  36. split = soa["content"].split(" ")
  37. split[2] = str(int(split[2]) + 1)
  38. soa["content"] = ' '.join(split)
  39. res = self.api.replace_record(zone_name, zone_name, rrset["type"], rrset["ttl"], soa["content"],
  40. soa["disabled"], False)
  41. if res is not None:
  42. raise errors.PluginError("Bad return from PDNS API when updating SOA: %s" % res)
  43. def prepare(self, conf_path):
  44. self.api = PdnsApi()
  45. with open(conf_path) as f:
  46. config = json.load(f)
  47. self.api.set_api_key(config["api-key"])
  48. self.api.set_base_url(config["base-url"])
  49. self.api.set_api_pass(config["api-pass"])
  50. self.api.set_http_auth_user(config["http-auth-user"])
  51. self.api.set_http_auth_pass(config["http-auth-pas"])
  52. self.axfr_time = config["axfr-time"]
  53. self.zones = self.api.list_zones()
  54. # print(self.zones)
  55. # raw_input('Press <ENTER> to continue')
  56. if self.zones is None or "error" in self.zones:
  57. raise errors.PluginError("Could not list zones %s" % self.zones)
  58. def perform_single(self, achall, response, validation):
  59. domain = achall.domain
  60. token = validation.encode()
  61. zone = self.find_best_matching_zone(domain)
  62. if zone is None:
  63. raise errors.PluginError("Could not find zone for %s" % domain)
  64. logger.debug("Found zone %s for domain %s" % (zone["name"], domain))
  65. res = self.api.replace_record(zone["name"], "_acme-challenge." + domain + ".", "TXT", 1, "\"" + token.decode('utf-8') + "\"", False, False)
  66. if res is not None:
  67. raise errors.PluginError("Bad return from PDNS API when adding record: %s" % res)
  68. return response
  69. def perform_notify(self, zone):
  70. logger.info("Notifying zone %s..." % zone["name"])
  71. self.update_soa(zone["name"])
  72. self.flush_zone(zone["name"])
  73. self.notify_zone(zone["name"])
  74. def wait_for_propagation(self, achalls):
  75. # TODO search zones authoritative servers and check for TXT record on each of them
  76. # raw_input('Press <ENTER> to continue')
  77. logger.info("Waiting %i seconds..." % self.axfr_time)
  78. time.sleep(self.axfr_time)
  79. def cleanup(self, achall):
  80. domain = achall.domain
  81. zone = self.find_best_matching_zone(domain)
  82. if zone is None:
  83. return
  84. res = self.api.delete_record(zone["name"], "_acme-challenge." + domain + ".", "TXT", 1, None, False, False)
  85. if res is not None:
  86. raise errors.PluginError("Bad return from PDNS API when deleting record: %s" % res)
  87. self.update_soa(zone["name"])
  88. self.flush_zone(zone["name"])
  89. self.notify_zone(zone["name"])