Source code for letsencrypt.plugins.common

"""Plugin common functions."""
import os
import pkg_resources
import re
import shutil
import tempfile

import OpenSSL
import zope.interface

from acme.jose import util as jose_util

from letsencrypt import constants
from letsencrypt import interfaces
from letsencrypt import le_util


[docs]def option_namespace(name): """ArgumentParser options namespace (prefix of all options).""" return name + "-"
[docs]def dest_namespace(name): """ArgumentParser dest namespace (prefix of all destinations).""" return name.replace("-", "_") + "_"
private_ips_regex = re.compile( r"(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|" r"(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)") hostname_regex = re.compile( r"^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*[a-z]+$", re.IGNORECASE)
[docs]class Plugin(object): """Generic plugin.""" zope.interface.implements(interfaces.IPlugin) # classProvides is not inherited, subclasses must define it on their own #zope.interface.classProvides(interfaces.IPluginFactory) def __init__(self, config, name): self.config = config self.name = name @property def option_namespace(self): """ArgumentParser options namespace (prefix of all options).""" return option_namespace(self.name)
[docs] def option_name(self, name): """Option name (include plugin namespace).""" return self.option_namespace + name
@property def dest_namespace(self): """ArgumentParser dest namespace (prefix of all destinations).""" return dest_namespace(self.name)
[docs] def dest(self, var): """Find a destination for given variable ``var``.""" # this should do exactly the same what ArgumentParser(arg), # does to "arg" to compute "dest" return self.dest_namespace + var.replace("-", "_")
[docs] def conf(self, var): """Find a configuration value for variable ``var``.""" return getattr(self.config, self.dest(var))
@classmethod
[docs] def inject_parser_options(cls, parser, name): """Inject parser options. See `~.IPlugin.inject_parser_options` for docs. """ # dummy function, doesn't check if dest.startswith(self.dest_namespace) def add(arg_name_no_prefix, *args, **kwargs): # pylint: disable=missing-docstring return parser.add_argument( "--{0}{1}".format(option_namespace(name), arg_name_no_prefix), *args, **kwargs) return cls.add_parser_arguments(add)
@jose_util.abstractclassmethod
[docs] def add_parser_arguments(cls, add): """Add plugin arguments to the CLI argument parser. :param callable add: Function that proxies calls to `argparse.ArgumentParser.add_argument` prepending options with unique plugin name prefix. """
# other
[docs]class Addr(object): r"""Represents an virtual host address. :param str addr: addr part of vhost address :param str port: port number or \*, or "" """ def __init__(self, tup): self.tup = tup @classmethod
[docs] def fromstring(cls, str_addr): """Initialize Addr from string.""" tup = str_addr.partition(':') return cls((tup[0], tup[2]))
def __str__(self): if self.tup[1]: return "%s:%s" % self.tup return self.tup[0] def __eq__(self, other): if isinstance(other, self.__class__): return self.tup == other.tup return False def __hash__(self): return hash(self.tup)
[docs] def get_addr(self): """Return addr part of Addr object.""" return self.tup[0]
[docs] def get_port(self): """Return port.""" return self.tup[1]
[docs] def get_addr_obj(self, port): """Return new address object with same addr and new port.""" return self.__class__((self.tup[0], port))
[docs]class TLSSNI01(object): """Abstract base for TLS-SNI-01 challenge performers""" def __init__(self, configurator): self.configurator = configurator self.achalls = [] self.indices = [] self.challenge_conf = os.path.join( configurator.config.config_dir, "le_tls_sni_01_cert_challenge.conf") # self.completed = 0
[docs] def add_chall(self, achall, idx=None): """Add challenge to TLSSNI01 object to perform at once. :param .KeyAuthorizationAnnotatedChallenge achall: Annotated TLSSNI01 challenge. :param int idx: index to challenge in a larger array """ self.achalls.append(achall) if idx is not None: self.indices.append(idx)
[docs] def get_cert_path(self, achall): """Returns standardized name for challenge certificate. :param .KeyAuthorizationAnnotatedChallenge achall: Annotated tls-sni-01 challenge. :returns: certificate file name :rtype: str """ return os.path.join(self.configurator.config.work_dir, achall.chall.encode("token") + ".crt")
[docs] def get_key_path(self, achall): """Get standardized path to challenge key.""" return os.path.join(self.configurator.config.work_dir, achall.chall.encode("token") + '.pem')
[docs] def _setup_challenge_cert(self, achall, cert_key=None):
"""Generate and write out challenge certificate.""" cert_path = self.get_cert_path(achall) key_path = self.get_key_path(achall) # Register the path before you write out the file self.configurator.reverter.register_file_creation(True, key_path) self.configurator.reverter.register_file_creation(True, cert_path) response, (cert, key) = achall.response_and_validation( cert_key=cert_key) cert_pem = OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_PEM, cert) key_pem = OpenSSL.crypto.dump_privatekey( OpenSSL.crypto.FILETYPE_PEM, key) # Write out challenge cert and key with open(cert_path, "wb") as cert_chall_fd: cert_chall_fd.write(cert_pem) with le_util.safe_open(key_path, 'wb', chmod=0o400) as key_file: key_file.write(key_pem) return response
# test utils used by letsencrypt_apache/letsencrypt_nginx (hence # "pragma: no cover") TODO: this might quickly lead to dead code (also # c.f. #383)
[docs]def setup_ssl_options(config_dir, src, dest): # pragma: no cover """Move the ssl_options into position and return the path.""" option_path = os.path.join(config_dir, dest) shutil.copyfile(src, option_path) return option_path
[docs]def dir_setup(test_dir, pkg): # pragma: no cover """Setup the directories necessary for the configurator.""" temp_dir = tempfile.mkdtemp("temp") config_dir = tempfile.mkdtemp("config") work_dir = tempfile.mkdtemp("work") os.chmod(temp_dir, constants.CONFIG_DIRS_MODE) os.chmod(config_dir, constants.CONFIG_DIRS_MODE) os.chmod(work_dir, constants.CONFIG_DIRS_MODE) test_configs = pkg_resources.resource_filename( pkg, os.path.join("testdata", test_dir)) shutil.copytree( test_configs, os.path.join(temp_dir, test_dir), symlinks=True) return temp_dir, config_dir, work_dir