Source code for pki.crypto

# Authors:
#     Ade Lee <alee@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the Lesser GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
#  along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright (C) 2013 Red Hat, Inc.
# All rights reserved.
#
"""
Module containing crypto classes.
"""
from __future__ import absolute_import
import abc
import os
import shutil
import subprocess
import tempfile

import nss.nss as nss
import six
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import (
    Cipher, algorithms, modes
)
from cryptography.hazmat.primitives import keywrap
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
import cryptography.x509

# encryption algorithms OIDs
DES_EDE3_CBC_OID = "{1 2 840 113549 3 7}"
AES_128_CBC_OID = "{2 16 840 1 101 3 4 1 2}"

# Wrap Algorithm names as defined by JSS.
WRAP_AES_CBC_PAD = "AES/CBC/PKCS5Padding"
WRAP_AES_KEY_WRAP = "AES KeyWrap"
WRAP_AES_KEY_WRAP_PAD = "AES KeyWrap/Padding"
WRAP_DES3_CBC_PAD = "DES3/CBC/Pad"


[docs]class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)): """ Abstract class containing methods to do cryptographic operations. """ def __init__(self): """ Constructor """ pass
[docs] @abc.abstractmethod def initialize(self): """ Initialization code """
[docs] @abc.abstractmethod def get_supported_algorithm_keyset(self): """ returns highest supported algorithm keyset """
[docs] @abc.abstractmethod def set_algorithm_keyset(self, level): """ sets required keyset """
[docs] @abc.abstractmethod def generate_nonce_iv(self, mechanism): """ Create a random initialization vector """
[docs] @abc.abstractmethod def generate_symmetric_key(self, mechanism=None, size=0): """ Generate and return a symmetric key """
[docs] @abc.abstractmethod def generate_session_key(self): """ Generate a session key to be used for wrapping data to the DRM This must return a 3DES 168 bit key """
[docs] @abc.abstractmethod def symmetric_wrap(self, data, wrapping_key, mechanism=None, nonce_iv=None): """ encrypt data using a symmetric key (wrapping key)"""
[docs] @abc.abstractmethod def symmetric_unwrap(self, data, wrapping_key, mechanism=None, nonce_iv=None): """ decrypt data originally encrypted with symmetric key (wrapping key) We expect the data and nonce_iv values to be base64 encoded. The mechanism is the type of key used to do the wrapping. It defaults to a 56 bit DES3 key. """
[docs] @abc.abstractmethod def asymmetric_wrap(self, data, wrapping_cert, mechanism=None): """ encrypt a symmetric key with the public key of a transport cert. The mechanism is the type of symmetric key, which defaults to a 56 bit DES3 key. """
[docs] @abc.abstractmethod def key_unwrap(self, mechanism, data, wrapping_key, nonce_iv): """ Unwrap data that has been key wrapped using AES KeyWrap """
[docs] @abc.abstractmethod def get_cert(self, cert_nick): """ Get the certificate for the specified cert_nick. """
[docs]class NSSCryptoProvider(CryptoProvider): """ Class that defines NSS implementation of CryptoProvider. Requires an NSS database to have been set up and initialized. Note that all inputs and outputs are unencoded. """
[docs] @staticmethod def setup_database( db_dir, password=None, over_write=False, password_file=None): """ Create an NSS database """ if os.path.exists(db_dir): if not over_write: raise IOError("Directory already exists.") if os.path.isdir(db_dir): shutil.rmtree(db_dir) else: os.remove(db_dir) os.makedirs(db_dir) try: if password: (f, password_file) = tempfile.mkstemp() os.write(f, password) os.close(f) command = ['certutil', '-N', '-d', db_dir, '-f', password_file] subprocess.check_call(command) finally: if password and password_file: os.remove(password_file)
def __init__(self, certdb_dir, certdb_password=None, password_file=None): """ Initialize nss and nss related parameters This method expects a NSS database to have already been created at certdb_dir with password certdb_password. """ CryptoProvider.__init__(self) self.certdb_dir = certdb_dir if certdb_password: self.certdb_password = certdb_password elif password_file: with open(password_file, 'r') as f: self.certdb_password = f.readline().strip() self.nonce_iv = "e4:bb:3b:d3:c3:71:2e:58"
[docs] def initialize(self): """ Initialize the nss db. Must be done before any crypto operations """ nss.nss_init(self.certdb_dir)
[docs] def get_supported_algorithm_keyset(self): """ returns highest supported algorithm keyset """ return 0
[docs] def set_algorithm_keyset(self, level): """ sets required keyset """ if level > 0: raise Exception("Invalid keyset")
# basically, do what we have always done, no need to set anything # special here.
[docs] def import_cert(self, cert_nick, cert, trust=',,'): """ Import a certificate into the nss database """ # accept both CertData object or cert actual data if type(cert).__name__ == 'CertData': content = cert.encoded else: content = cert # certutil -A -d db_dir -n cert_nick -t trust -i cert_file with tempfile.NamedTemporaryFile() as cert_file: cert_file.write(content) cert_file.flush() command = ['certutil', '-A', '-d', self.certdb_dir, '-n', cert_nick, '-t', trust, '-i', cert_file.name] subprocess.check_call(command)
[docs] def generate_nonce_iv(self, mechanism=nss.CKM_DES3_CBC_PAD): """ Create a random initialization vector """ iv_length = nss.get_iv_length(mechanism) if iv_length > 0: iv_data = nss.generate_random(iv_length) return iv_data else: return None
[docs] @classmethod def setup_contexts(cls, mechanism, sym_key, nonce_iv): """ Set up contexts to do wrapping/unwrapping by symmetric keys. """ # Get a PK11 slot based on the cipher slot = nss.get_best_slot(mechanism) if sym_key is None: sym_key = slot.key_gen(mechanism, None, slot.get_best_key_length(mechanism)) # If initialization vector was supplied use it, otherwise set it to # None if nonce_iv: iv_si = nss.SecItem(nonce_iv) iv_param = nss.param_from_iv(mechanism, iv_si) else: iv_data = cls.generate_nonce_iv(mechanism) if iv_data is not None: iv_si = nss.SecItem(iv_data) iv_param = nss.param_from_iv(mechanism, iv_si) else: iv_param = None # Create an encoding context encoding_ctx = nss.create_context_by_sym_key(mechanism, nss.CKA_ENCRYPT, sym_key, iv_param) # Create a decoding context decoding_ctx = nss.create_context_by_sym_key(mechanism, nss.CKA_DECRYPT, sym_key, iv_param) return encoding_ctx, decoding_ctx
[docs] def generate_symmetric_key(self, mechanism=nss.CKM_DES3_CBC_PAD, size=0): """ Returns a symmetric key. Note that for fixed length keys, this length should be 0. If no length is provided, then the function will either use 0 (for fixed length keys) or the maximum available length for that algorithm and the token. """ slot = nss.get_best_slot(mechanism) if size == 0: size = slot.get_best_key_length(mechanism) return slot.key_gen(mechanism, None, size)
[docs] def generate_session_key(self): """ Returns a session key to be used when wrapping secrets for the DRM This will return a 168 bit 3DES key. """ return self.generate_symmetric_key(mechanism=nss.CKM_DES3_CBC_PAD)
[docs] def symmetric_wrap(self, data, wrapping_key, mechanism=nss.CKM_DES3_CBC_PAD, nonce_iv=None): """ :param data Data to be wrapped :param wrapping_key Symmetric key to wrap data :param mechanism Mechanism to user when wrapping :param nonce_iv Nonce to use when wrapping Wrap (encrypt) data using the supplied symmetric key """ if nonce_iv is None: nonce_iv = nss.read_hex(self.nonce_iv) encoding_ctx, _decoding_ctx = self.setup_contexts(mechanism, wrapping_key, nonce_iv) wrapped_data = encoding_ctx.cipher_op(data) +\ encoding_ctx.digest_final() return wrapped_data
[docs] def symmetric_unwrap(self, data, wrapping_key, mechanism=nss.CKM_DES3_CBC_PAD, nonce_iv=None): """ :param data Data to be unwrapped :param wrapping_key Symmetric key to unwrap data :param mechanism Mechanism to use when wrapping :param nonce_iv iv data Unwrap (decrypt) data using the supplied symmetric key """ if nonce_iv is None: nonce_iv = nss.read_hex(self.nonce_iv) _encoding_ctx, decoding_ctx = self.setup_contexts(mechanism, wrapping_key, nonce_iv) unwrapped_data = decoding_ctx.cipher_op(data) \ + decoding_ctx.digest_final() return unwrapped_data
[docs] def asymmetric_wrap(self, data, wrapping_cert, mechanism=nss.CKM_DES3_CBC_PAD): """ :param data Data to be wrapped :param wrapping_cert Public key to wrap data :param mechanism algorithm of symmetric key to be wrapped Wrap (encrypt) data using the supplied asymmetric key """ public_key = wrapping_cert.subject_public_key_info.public_key return nss.pub_wrap_sym_key(mechanism, public_key, data)
[docs] def key_unwrap(self, mechanism, data, wrapping_key, nonce_iv): """ :param mechanism Key wrapping mechanism :param data: Data to be unwrapped :param wrapping_key: Wrapping Key :param nonce_iv Nonce data :return: Unwrapped data Return unwrapped data for data that has been keywrapped. For NSS, we only support 3DES - so something that has been keywrapped can be decrypted. This is precisely what we used to do before. """ return self.symmetric_unwrap( data, wrapping_key, mechanism=nss.CKM_DES3_CBC_PAD, nonce_iv=nonce_iv )
[docs] def get_cert(self, cert_nick): """ :param cert_nick Nickname for the certificate to be returned Searches NSS database and returns SecItem object for this certificate. """ return nss.find_cert_from_nickname(cert_nick)
[docs]class CryptographyCryptoProvider(CryptoProvider): """ Class that defines python-cryptography implementation of CryptoProvider. Requires a PEM file containing the agent cert to be initialized. Note that all inputs and outputs are unencoded. """ def __init__(self, transport_cert_nick, transport_cert, backend=default_backend()): """ Initialize python-cryptography """ super(CryptographyCryptoProvider, self).__init__() self.certs = {} if not isinstance(transport_cert, cryptography.x509.Certificate): # it's a file name with open(transport_cert, 'r') as f: transport_pem = f.read() transport_cert = cryptography.x509.load_pem_x509_certificate( transport_pem, backend) self.certs[transport_cert_nick] = transport_cert # default to AES self.encrypt_alg = algorithms.AES self.encrypt_mode = modes.CBC self.encrypt_size = 128 self.backend = backend
[docs] def initialize(self): """ Any operations here that need to be performed before crypto operations. """ pass
[docs] def get_supported_algorithm_keyset(self): """ returns highest supported algorithm keyset """ return 1
[docs] def set_algorithm_keyset(self, level): """ sets required keyset """ if level > 1: raise ValueError("Invalid keyset") elif level == 1: self.encrypt_alg = algorithms.AES self.encrypt_mode = modes.CBC self.encrypt_size = 128 elif level == 0: # note that 3DES keys are actually 192 bits long, even # though only 168 bits are used internally. See # https://tools.ietf.org/html/rfc4949 # Using 168 here will cause python-cryptography key verification # checks to fail. self.encrypt_alg = algorithms.TripleDES self.encrypt_mode = modes.CBC self.encrypt_size = 192
[docs] def generate_nonce_iv(self, mechanism='AES'): """ Create a random initialization vector """ return os.urandom(self.encrypt_alg.block_size // 8)
[docs] def generate_symmetric_key(self, mechanism=None, size=0): """ Returns a symmetric key. """ if mechanism is None: size = self.encrypt_size // 8 return os.urandom(size)
[docs] def generate_session_key(self): """ Returns a session key to be used when wrapping secrets for the DRM. """ return self.generate_symmetric_key()
[docs] def symmetric_wrap(self, data, wrapping_key, mechanism=None, nonce_iv=None): """ :param data Data to be wrapped :param wrapping_key Symmetric key to wrap data :param mechanism Mechanism to use for wrapping key :param nonce_iv Nonce for initialization vector Wrap (encrypt) data using the supplied symmetric key """ # TODO(alee) Not sure yet how to handle non-default mechanisms # For now, lets just ignore them if wrapping_key is None: raise ValueError("Wrapping key must be provided") if self.encrypt_mode.name == "CBC": padder = padding.PKCS7(self.encrypt_alg.block_size).padder() padded_data = padder.update(data) + padder.finalize() data = padded_data else: raise ValueError('Only CBC mode is currently supported') cipher = Cipher(self.encrypt_alg(wrapping_key), self.encrypt_mode(nonce_iv), backend=self.backend) encryptor = cipher.encryptor() ct = encryptor.update(data) + encryptor.finalize() return ct
[docs] def symmetric_unwrap(self, data, wrapping_key, mechanism=None, nonce_iv=None): """ :param data Data to be unwrapped :param wrapping_key Symmetric key to unwrap data :param mechanism Mechanism to use when unwrapping :param nonce_iv iv data Unwrap (decrypt) data using the supplied symmetric key """ # TODO(alee) As above, no idea what to do with mechanism # ignoring for now. if wrapping_key is None: raise ValueError("Wrapping key must be provided") cipher = Cipher(self.encrypt_alg(wrapping_key), self.encrypt_mode(nonce_iv), backend=self.backend) decryptor = cipher.decryptor() unwrapped = decryptor.update(data) + decryptor.finalize() if self.encrypt_mode.name == 'CBC': unpadder = padding.PKCS7(self.encrypt_alg.block_size).unpadder() unpadded = unpadder.update(unwrapped) + unpadder.finalize() unwrapped = unpadded else: raise ValueError('Only CBC mode is currently supported') return unwrapped
[docs] def asymmetric_wrap(self, data, wrapping_cert, mechanism=None): """ :param data Data to be wrapped :param wrapping_cert Public key to wrap data :param mechanism algorithm of symmetric key to be wrapped Wrap (encrypt) data using the supplied asymmetric key """ public_key = wrapping_cert.public_key() return public_key.encrypt( data, PKCS1v15() )
[docs] def key_unwrap(self, mechanism, data, wrapping_key, nonce_iv): """ :param mechanism key wrapping mechanism :param data: data to unwrap :param wrapping_key: AES key used to wrap data :param nonce_iv Nonce data :return: unwrapped data Unwrap the encrypted data which has been wrapped using a KeyWrap mechanism. """ if mechanism == WRAP_AES_CBC_PAD or mechanism == WRAP_DES3_CBC_PAD: return self.symmetric_unwrap( data, wrapping_key, nonce_iv=nonce_iv) if mechanism == WRAP_AES_KEY_WRAP: return keywrap.aes_key_unwrap(wrapping_key, data, self.backend) raise ValueError("Unsupported key wrap algorithm: " + mechanism)
[docs] def get_cert(self, cert_nick): """ :param cert_nick Nickname for the certificate to be returned. """ return self.certs[cert_nick]