This commit is contained in:
2025-04-06 03:14:47 +02:00
parent aaf9ab523b
commit b9c99befab
2263 changed files with 401112 additions and 20 deletions

View File

@ -0,0 +1,3 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.

View File

@ -0,0 +1,19 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
# This exists to break an import cycle. It is normally accessible from the
# asymmetric padding module.
class AsymmetricPadding(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def name(self) -> str:
"""
A string naming this padding (e.g. "PSS", "PKCS1").
"""

View File

@ -0,0 +1,58 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
from cryptography import utils
# This exists to break an import cycle. It is normally accessible from the
# ciphers module.
class CipherAlgorithm(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def name(self) -> str:
"""
A string naming this mode (e.g. "AES", "Camellia").
"""
@property
@abc.abstractmethod
def key_sizes(self) -> frozenset[int]:
"""
Valid key sizes for this algorithm in bits
"""
@property
@abc.abstractmethod
def key_size(self) -> int:
"""
The size of the key being used as an integer in bits (e.g. 128, 256).
"""
class BlockCipherAlgorithm(CipherAlgorithm):
key: bytes
@property
@abc.abstractmethod
def block_size(self) -> int:
"""
The size of a block as an integer in bits (e.g. 64, 128).
"""
def _verify_key_size(algorithm: CipherAlgorithm, key: bytes) -> bytes:
# Verify that the key is instance of bytes
utils._check_byteslike("key", key)
# Verify that the key size matches the expected key size
if len(key) * 8 not in algorithm.key_sizes:
raise ValueError(
f"Invalid key size ({len(key) * 8}) for {algorithm.name}."
)
return key

View File

@ -0,0 +1,169 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
from cryptography import utils
from cryptography.hazmat.primitives.hashes import HashAlgorithm
# This exists to break an import cycle. These classes are normally accessible
# from the serialization module.
class PBES(utils.Enum):
PBESv1SHA1And3KeyTripleDESCBC = "PBESv1 using SHA1 and 3-Key TripleDES"
PBESv2SHA256AndAES256CBC = "PBESv2 using SHA256 PBKDF2 and AES256 CBC"
class Encoding(utils.Enum):
PEM = "PEM"
DER = "DER"
OpenSSH = "OpenSSH"
Raw = "Raw"
X962 = "ANSI X9.62"
SMIME = "S/MIME"
class PrivateFormat(utils.Enum):
PKCS8 = "PKCS8"
TraditionalOpenSSL = "TraditionalOpenSSL"
Raw = "Raw"
OpenSSH = "OpenSSH"
PKCS12 = "PKCS12"
def encryption_builder(self) -> KeySerializationEncryptionBuilder:
if self not in (PrivateFormat.OpenSSH, PrivateFormat.PKCS12):
raise ValueError(
"encryption_builder only supported with PrivateFormat.OpenSSH"
" and PrivateFormat.PKCS12"
)
return KeySerializationEncryptionBuilder(self)
class PublicFormat(utils.Enum):
SubjectPublicKeyInfo = "X.509 subjectPublicKeyInfo with PKCS#1"
PKCS1 = "Raw PKCS#1"
OpenSSH = "OpenSSH"
Raw = "Raw"
CompressedPoint = "X9.62 Compressed Point"
UncompressedPoint = "X9.62 Uncompressed Point"
class ParameterFormat(utils.Enum):
PKCS3 = "PKCS3"
class KeySerializationEncryption(metaclass=abc.ABCMeta):
pass
class BestAvailableEncryption(KeySerializationEncryption):
def __init__(self, password: bytes):
if not isinstance(password, bytes) or len(password) == 0:
raise ValueError("Password must be 1 or more bytes.")
self.password = password
class NoEncryption(KeySerializationEncryption):
pass
class KeySerializationEncryptionBuilder:
def __init__(
self,
format: PrivateFormat,
*,
_kdf_rounds: int | None = None,
_hmac_hash: HashAlgorithm | None = None,
_key_cert_algorithm: PBES | None = None,
) -> None:
self._format = format
self._kdf_rounds = _kdf_rounds
self._hmac_hash = _hmac_hash
self._key_cert_algorithm = _key_cert_algorithm
def kdf_rounds(self, rounds: int) -> KeySerializationEncryptionBuilder:
if self._kdf_rounds is not None:
raise ValueError("kdf_rounds already set")
if not isinstance(rounds, int):
raise TypeError("kdf_rounds must be an integer")
if rounds < 1:
raise ValueError("kdf_rounds must be a positive integer")
return KeySerializationEncryptionBuilder(
self._format,
_kdf_rounds=rounds,
_hmac_hash=self._hmac_hash,
_key_cert_algorithm=self._key_cert_algorithm,
)
def hmac_hash(
self, algorithm: HashAlgorithm
) -> KeySerializationEncryptionBuilder:
if self._format is not PrivateFormat.PKCS12:
raise TypeError(
"hmac_hash only supported with PrivateFormat.PKCS12"
)
if self._hmac_hash is not None:
raise ValueError("hmac_hash already set")
return KeySerializationEncryptionBuilder(
self._format,
_kdf_rounds=self._kdf_rounds,
_hmac_hash=algorithm,
_key_cert_algorithm=self._key_cert_algorithm,
)
def key_cert_algorithm(
self, algorithm: PBES
) -> KeySerializationEncryptionBuilder:
if self._format is not PrivateFormat.PKCS12:
raise TypeError(
"key_cert_algorithm only supported with "
"PrivateFormat.PKCS12"
)
if self._key_cert_algorithm is not None:
raise ValueError("key_cert_algorithm already set")
return KeySerializationEncryptionBuilder(
self._format,
_kdf_rounds=self._kdf_rounds,
_hmac_hash=self._hmac_hash,
_key_cert_algorithm=algorithm,
)
def build(self, password: bytes) -> KeySerializationEncryption:
if not isinstance(password, bytes) or len(password) == 0:
raise ValueError("Password must be 1 or more bytes.")
return _KeySerializationEncryption(
self._format,
password,
kdf_rounds=self._kdf_rounds,
hmac_hash=self._hmac_hash,
key_cert_algorithm=self._key_cert_algorithm,
)
class _KeySerializationEncryption(KeySerializationEncryption):
def __init__(
self,
format: PrivateFormat,
password: bytes,
*,
kdf_rounds: int | None,
hmac_hash: HashAlgorithm | None,
key_cert_algorithm: PBES | None,
):
self._format = format
self.password = password
self._kdf_rounds = kdf_rounds
self._hmac_hash = hmac_hash
self._key_cert_algorithm = key_cert_algorithm

View File

@ -0,0 +1,3 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.

View File

@ -0,0 +1,135 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives import _serialization
generate_parameters = rust_openssl.dh.generate_parameters
DHPrivateNumbers = rust_openssl.dh.DHPrivateNumbers
DHPublicNumbers = rust_openssl.dh.DHPublicNumbers
DHParameterNumbers = rust_openssl.dh.DHParameterNumbers
class DHParameters(metaclass=abc.ABCMeta):
@abc.abstractmethod
def generate_private_key(self) -> DHPrivateKey:
"""
Generates and returns a DHPrivateKey.
"""
@abc.abstractmethod
def parameter_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.ParameterFormat,
) -> bytes:
"""
Returns the parameters serialized as bytes.
"""
@abc.abstractmethod
def parameter_numbers(self) -> DHParameterNumbers:
"""
Returns a DHParameterNumbers.
"""
DHParametersWithSerialization = DHParameters
DHParameters.register(rust_openssl.dh.DHParameters)
class DHPublicKey(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def key_size(self) -> int:
"""
The bit length of the prime modulus.
"""
@abc.abstractmethod
def parameters(self) -> DHParameters:
"""
The DHParameters object associated with this public key.
"""
@abc.abstractmethod
def public_numbers(self) -> DHPublicNumbers:
"""
Returns a DHPublicNumbers.
"""
@abc.abstractmethod
def public_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PublicFormat,
) -> bytes:
"""
Returns the key serialized as bytes.
"""
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
DHPublicKeyWithSerialization = DHPublicKey
DHPublicKey.register(rust_openssl.dh.DHPublicKey)
class DHPrivateKey(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def key_size(self) -> int:
"""
The bit length of the prime modulus.
"""
@abc.abstractmethod
def public_key(self) -> DHPublicKey:
"""
The DHPublicKey associated with this private key.
"""
@abc.abstractmethod
def parameters(self) -> DHParameters:
"""
The DHParameters object associated with this private key.
"""
@abc.abstractmethod
def exchange(self, peer_public_key: DHPublicKey) -> bytes:
"""
Given peer's DHPublicKey, carry out the key exchange and
return shared key as bytes.
"""
@abc.abstractmethod
def private_numbers(self) -> DHPrivateNumbers:
"""
Returns a DHPrivateNumbers.
"""
@abc.abstractmethod
def private_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PrivateFormat,
encryption_algorithm: _serialization.KeySerializationEncryption,
) -> bytes:
"""
Returns the key serialized as bytes.
"""
DHPrivateKeyWithSerialization = DHPrivateKey
DHPrivateKey.register(rust_openssl.dh.DHPrivateKey)

View File

@ -0,0 +1,154 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
import typing
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives import _serialization, hashes
from cryptography.hazmat.primitives.asymmetric import utils as asym_utils
class DSAParameters(metaclass=abc.ABCMeta):
@abc.abstractmethod
def generate_private_key(self) -> DSAPrivateKey:
"""
Generates and returns a DSAPrivateKey.
"""
@abc.abstractmethod
def parameter_numbers(self) -> DSAParameterNumbers:
"""
Returns a DSAParameterNumbers.
"""
DSAParametersWithNumbers = DSAParameters
DSAParameters.register(rust_openssl.dsa.DSAParameters)
class DSAPrivateKey(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def key_size(self) -> int:
"""
The bit length of the prime modulus.
"""
@abc.abstractmethod
def public_key(self) -> DSAPublicKey:
"""
The DSAPublicKey associated with this private key.
"""
@abc.abstractmethod
def parameters(self) -> DSAParameters:
"""
The DSAParameters object associated with this private key.
"""
@abc.abstractmethod
def sign(
self,
data: bytes,
algorithm: asym_utils.Prehashed | hashes.HashAlgorithm,
) -> bytes:
"""
Signs the data
"""
@abc.abstractmethod
def private_numbers(self) -> DSAPrivateNumbers:
"""
Returns a DSAPrivateNumbers.
"""
@abc.abstractmethod
def private_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PrivateFormat,
encryption_algorithm: _serialization.KeySerializationEncryption,
) -> bytes:
"""
Returns the key serialized as bytes.
"""
DSAPrivateKeyWithSerialization = DSAPrivateKey
DSAPrivateKey.register(rust_openssl.dsa.DSAPrivateKey)
class DSAPublicKey(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def key_size(self) -> int:
"""
The bit length of the prime modulus.
"""
@abc.abstractmethod
def parameters(self) -> DSAParameters:
"""
The DSAParameters object associated with this public key.
"""
@abc.abstractmethod
def public_numbers(self) -> DSAPublicNumbers:
"""
Returns a DSAPublicNumbers.
"""
@abc.abstractmethod
def public_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PublicFormat,
) -> bytes:
"""
Returns the key serialized as bytes.
"""
@abc.abstractmethod
def verify(
self,
signature: bytes,
data: bytes,
algorithm: asym_utils.Prehashed | hashes.HashAlgorithm,
) -> None:
"""
Verifies the signature of the data.
"""
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
DSAPublicKeyWithSerialization = DSAPublicKey
DSAPublicKey.register(rust_openssl.dsa.DSAPublicKey)
DSAPrivateNumbers = rust_openssl.dsa.DSAPrivateNumbers
DSAPublicNumbers = rust_openssl.dsa.DSAPublicNumbers
DSAParameterNumbers = rust_openssl.dsa.DSAParameterNumbers
def generate_parameters(
key_size: int, backend: typing.Any = None
) -> DSAParameters:
if key_size not in (1024, 2048, 3072, 4096):
raise ValueError("Key size must be 1024, 2048, 3072, or 4096 bits.")
return rust_openssl.dsa.generate_parameters(key_size)
def generate_private_key(
key_size: int, backend: typing.Any = None
) -> DSAPrivateKey:
parameters = generate_parameters(key_size)
return parameters.generate_private_key()

View File

@ -0,0 +1,403 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
import typing
from cryptography import utils
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat._oid import ObjectIdentifier
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives import _serialization, hashes
from cryptography.hazmat.primitives.asymmetric import utils as asym_utils
class EllipticCurveOID:
SECP192R1 = ObjectIdentifier("1.2.840.10045.3.1.1")
SECP224R1 = ObjectIdentifier("1.3.132.0.33")
SECP256K1 = ObjectIdentifier("1.3.132.0.10")
SECP256R1 = ObjectIdentifier("1.2.840.10045.3.1.7")
SECP384R1 = ObjectIdentifier("1.3.132.0.34")
SECP521R1 = ObjectIdentifier("1.3.132.0.35")
BRAINPOOLP256R1 = ObjectIdentifier("1.3.36.3.3.2.8.1.1.7")
BRAINPOOLP384R1 = ObjectIdentifier("1.3.36.3.3.2.8.1.1.11")
BRAINPOOLP512R1 = ObjectIdentifier("1.3.36.3.3.2.8.1.1.13")
SECT163K1 = ObjectIdentifier("1.3.132.0.1")
SECT163R2 = ObjectIdentifier("1.3.132.0.15")
SECT233K1 = ObjectIdentifier("1.3.132.0.26")
SECT233R1 = ObjectIdentifier("1.3.132.0.27")
SECT283K1 = ObjectIdentifier("1.3.132.0.16")
SECT283R1 = ObjectIdentifier("1.3.132.0.17")
SECT409K1 = ObjectIdentifier("1.3.132.0.36")
SECT409R1 = ObjectIdentifier("1.3.132.0.37")
SECT571K1 = ObjectIdentifier("1.3.132.0.38")
SECT571R1 = ObjectIdentifier("1.3.132.0.39")
class EllipticCurve(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def name(self) -> str:
"""
The name of the curve. e.g. secp256r1.
"""
@property
@abc.abstractmethod
def key_size(self) -> int:
"""
Bit size of a secret scalar for the curve.
"""
class EllipticCurveSignatureAlgorithm(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def algorithm(
self,
) -> asym_utils.Prehashed | hashes.HashAlgorithm:
"""
The digest algorithm used with this signature.
"""
class EllipticCurvePrivateKey(metaclass=abc.ABCMeta):
@abc.abstractmethod
def exchange(
self, algorithm: ECDH, peer_public_key: EllipticCurvePublicKey
) -> bytes:
"""
Performs a key exchange operation using the provided algorithm with the
provided peer's public key.
"""
@abc.abstractmethod
def public_key(self) -> EllipticCurvePublicKey:
"""
The EllipticCurvePublicKey for this private key.
"""
@property
@abc.abstractmethod
def curve(self) -> EllipticCurve:
"""
The EllipticCurve that this key is on.
"""
@property
@abc.abstractmethod
def key_size(self) -> int:
"""
Bit size of a secret scalar for the curve.
"""
@abc.abstractmethod
def sign(
self,
data: bytes,
signature_algorithm: EllipticCurveSignatureAlgorithm,
) -> bytes:
"""
Signs the data
"""
@abc.abstractmethod
def private_numbers(self) -> EllipticCurvePrivateNumbers:
"""
Returns an EllipticCurvePrivateNumbers.
"""
@abc.abstractmethod
def private_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PrivateFormat,
encryption_algorithm: _serialization.KeySerializationEncryption,
) -> bytes:
"""
Returns the key serialized as bytes.
"""
EllipticCurvePrivateKeyWithSerialization = EllipticCurvePrivateKey
EllipticCurvePrivateKey.register(rust_openssl.ec.ECPrivateKey)
class EllipticCurvePublicKey(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def curve(self) -> EllipticCurve:
"""
The EllipticCurve that this key is on.
"""
@property
@abc.abstractmethod
def key_size(self) -> int:
"""
Bit size of a secret scalar for the curve.
"""
@abc.abstractmethod
def public_numbers(self) -> EllipticCurvePublicNumbers:
"""
Returns an EllipticCurvePublicNumbers.
"""
@abc.abstractmethod
def public_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PublicFormat,
) -> bytes:
"""
Returns the key serialized as bytes.
"""
@abc.abstractmethod
def verify(
self,
signature: bytes,
data: bytes,
signature_algorithm: EllipticCurveSignatureAlgorithm,
) -> None:
"""
Verifies the signature of the data.
"""
@classmethod
def from_encoded_point(
cls, curve: EllipticCurve, data: bytes
) -> EllipticCurvePublicKey:
utils._check_bytes("data", data)
if len(data) == 0:
raise ValueError("data must not be an empty byte string")
if data[0] not in [0x02, 0x03, 0x04]:
raise ValueError("Unsupported elliptic curve point type")
return rust_openssl.ec.from_public_bytes(curve, data)
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
EllipticCurvePublicKeyWithSerialization = EllipticCurvePublicKey
EllipticCurvePublicKey.register(rust_openssl.ec.ECPublicKey)
EllipticCurvePrivateNumbers = rust_openssl.ec.EllipticCurvePrivateNumbers
EllipticCurvePublicNumbers = rust_openssl.ec.EllipticCurvePublicNumbers
class SECT571R1(EllipticCurve):
name = "sect571r1"
key_size = 570
class SECT409R1(EllipticCurve):
name = "sect409r1"
key_size = 409
class SECT283R1(EllipticCurve):
name = "sect283r1"
key_size = 283
class SECT233R1(EllipticCurve):
name = "sect233r1"
key_size = 233
class SECT163R2(EllipticCurve):
name = "sect163r2"
key_size = 163
class SECT571K1(EllipticCurve):
name = "sect571k1"
key_size = 571
class SECT409K1(EllipticCurve):
name = "sect409k1"
key_size = 409
class SECT283K1(EllipticCurve):
name = "sect283k1"
key_size = 283
class SECT233K1(EllipticCurve):
name = "sect233k1"
key_size = 233
class SECT163K1(EllipticCurve):
name = "sect163k1"
key_size = 163
class SECP521R1(EllipticCurve):
name = "secp521r1"
key_size = 521
class SECP384R1(EllipticCurve):
name = "secp384r1"
key_size = 384
class SECP256R1(EllipticCurve):
name = "secp256r1"
key_size = 256
class SECP256K1(EllipticCurve):
name = "secp256k1"
key_size = 256
class SECP224R1(EllipticCurve):
name = "secp224r1"
key_size = 224
class SECP192R1(EllipticCurve):
name = "secp192r1"
key_size = 192
class BrainpoolP256R1(EllipticCurve):
name = "brainpoolP256r1"
key_size = 256
class BrainpoolP384R1(EllipticCurve):
name = "brainpoolP384r1"
key_size = 384
class BrainpoolP512R1(EllipticCurve):
name = "brainpoolP512r1"
key_size = 512
_CURVE_TYPES: dict[str, EllipticCurve] = {
"prime192v1": SECP192R1(),
"prime256v1": SECP256R1(),
"secp192r1": SECP192R1(),
"secp224r1": SECP224R1(),
"secp256r1": SECP256R1(),
"secp384r1": SECP384R1(),
"secp521r1": SECP521R1(),
"secp256k1": SECP256K1(),
"sect163k1": SECT163K1(),
"sect233k1": SECT233K1(),
"sect283k1": SECT283K1(),
"sect409k1": SECT409K1(),
"sect571k1": SECT571K1(),
"sect163r2": SECT163R2(),
"sect233r1": SECT233R1(),
"sect283r1": SECT283R1(),
"sect409r1": SECT409R1(),
"sect571r1": SECT571R1(),
"brainpoolP256r1": BrainpoolP256R1(),
"brainpoolP384r1": BrainpoolP384R1(),
"brainpoolP512r1": BrainpoolP512R1(),
}
class ECDSA(EllipticCurveSignatureAlgorithm):
def __init__(
self,
algorithm: asym_utils.Prehashed | hashes.HashAlgorithm,
deterministic_signing: bool = False,
):
from cryptography.hazmat.backends.openssl.backend import backend
if (
deterministic_signing
and not backend.ecdsa_deterministic_supported()
):
raise UnsupportedAlgorithm(
"ECDSA with deterministic signature (RFC 6979) is not "
"supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM,
)
self._algorithm = algorithm
self._deterministic_signing = deterministic_signing
@property
def algorithm(
self,
) -> asym_utils.Prehashed | hashes.HashAlgorithm:
return self._algorithm
@property
def deterministic_signing(
self,
) -> bool:
return self._deterministic_signing
generate_private_key = rust_openssl.ec.generate_private_key
def derive_private_key(
private_value: int,
curve: EllipticCurve,
backend: typing.Any = None,
) -> EllipticCurvePrivateKey:
if not isinstance(private_value, int):
raise TypeError("private_value must be an integer type.")
if private_value <= 0:
raise ValueError("private_value must be a positive integer.")
return rust_openssl.ec.derive_private_key(private_value, curve)
class ECDH:
pass
_OID_TO_CURVE = {
EllipticCurveOID.SECP192R1: SECP192R1,
EllipticCurveOID.SECP224R1: SECP224R1,
EllipticCurveOID.SECP256K1: SECP256K1,
EllipticCurveOID.SECP256R1: SECP256R1,
EllipticCurveOID.SECP384R1: SECP384R1,
EllipticCurveOID.SECP521R1: SECP521R1,
EllipticCurveOID.BRAINPOOLP256R1: BrainpoolP256R1,
EllipticCurveOID.BRAINPOOLP384R1: BrainpoolP384R1,
EllipticCurveOID.BRAINPOOLP512R1: BrainpoolP512R1,
EllipticCurveOID.SECT163K1: SECT163K1,
EllipticCurveOID.SECT163R2: SECT163R2,
EllipticCurveOID.SECT233K1: SECT233K1,
EllipticCurveOID.SECT233R1: SECT233R1,
EllipticCurveOID.SECT283K1: SECT283K1,
EllipticCurveOID.SECT283R1: SECT283R1,
EllipticCurveOID.SECT409K1: SECT409K1,
EllipticCurveOID.SECT409R1: SECT409R1,
EllipticCurveOID.SECT571K1: SECT571K1,
EllipticCurveOID.SECT571R1: SECT571R1,
}
def get_curve_for_oid(oid: ObjectIdentifier) -> type[EllipticCurve]:
try:
return _OID_TO_CURVE[oid]
except KeyError:
raise LookupError(
"The provided object identifier has no matching elliptic "
"curve class"
)

View File

@ -0,0 +1,116 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives import _serialization
class Ed25519PublicKey(metaclass=abc.ABCMeta):
@classmethod
def from_public_bytes(cls, data: bytes) -> Ed25519PublicKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.ed25519_supported():
raise UnsupportedAlgorithm(
"ed25519 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM,
)
return rust_openssl.ed25519.from_public_bytes(data)
@abc.abstractmethod
def public_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PublicFormat,
) -> bytes:
"""
The serialized bytes of the public key.
"""
@abc.abstractmethod
def public_bytes_raw(self) -> bytes:
"""
The raw bytes of the public key.
Equivalent to public_bytes(Raw, Raw).
"""
@abc.abstractmethod
def verify(self, signature: bytes, data: bytes) -> None:
"""
Verify the signature.
"""
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
Ed25519PublicKey.register(rust_openssl.ed25519.Ed25519PublicKey)
class Ed25519PrivateKey(metaclass=abc.ABCMeta):
@classmethod
def generate(cls) -> Ed25519PrivateKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.ed25519_supported():
raise UnsupportedAlgorithm(
"ed25519 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM,
)
return rust_openssl.ed25519.generate_key()
@classmethod
def from_private_bytes(cls, data: bytes) -> Ed25519PrivateKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.ed25519_supported():
raise UnsupportedAlgorithm(
"ed25519 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM,
)
return rust_openssl.ed25519.from_private_bytes(data)
@abc.abstractmethod
def public_key(self) -> Ed25519PublicKey:
"""
The Ed25519PublicKey derived from the private key.
"""
@abc.abstractmethod
def private_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PrivateFormat,
encryption_algorithm: _serialization.KeySerializationEncryption,
) -> bytes:
"""
The serialized bytes of the private key.
"""
@abc.abstractmethod
def private_bytes_raw(self) -> bytes:
"""
The raw bytes of the private key.
Equivalent to private_bytes(Raw, Raw, NoEncryption()).
"""
@abc.abstractmethod
def sign(self, data: bytes) -> bytes:
"""
Signs the data.
"""
Ed25519PrivateKey.register(rust_openssl.ed25519.Ed25519PrivateKey)

View File

@ -0,0 +1,118 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives import _serialization
class Ed448PublicKey(metaclass=abc.ABCMeta):
@classmethod
def from_public_bytes(cls, data: bytes) -> Ed448PublicKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.ed448_supported():
raise UnsupportedAlgorithm(
"ed448 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM,
)
return rust_openssl.ed448.from_public_bytes(data)
@abc.abstractmethod
def public_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PublicFormat,
) -> bytes:
"""
The serialized bytes of the public key.
"""
@abc.abstractmethod
def public_bytes_raw(self) -> bytes:
"""
The raw bytes of the public key.
Equivalent to public_bytes(Raw, Raw).
"""
@abc.abstractmethod
def verify(self, signature: bytes, data: bytes) -> None:
"""
Verify the signature.
"""
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
if hasattr(rust_openssl, "ed448"):
Ed448PublicKey.register(rust_openssl.ed448.Ed448PublicKey)
class Ed448PrivateKey(metaclass=abc.ABCMeta):
@classmethod
def generate(cls) -> Ed448PrivateKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.ed448_supported():
raise UnsupportedAlgorithm(
"ed448 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM,
)
return rust_openssl.ed448.generate_key()
@classmethod
def from_private_bytes(cls, data: bytes) -> Ed448PrivateKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.ed448_supported():
raise UnsupportedAlgorithm(
"ed448 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM,
)
return rust_openssl.ed448.from_private_bytes(data)
@abc.abstractmethod
def public_key(self) -> Ed448PublicKey:
"""
The Ed448PublicKey derived from the private key.
"""
@abc.abstractmethod
def sign(self, data: bytes) -> bytes:
"""
Signs the data.
"""
@abc.abstractmethod
def private_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PrivateFormat,
encryption_algorithm: _serialization.KeySerializationEncryption,
) -> bytes:
"""
The serialized bytes of the private key.
"""
@abc.abstractmethod
def private_bytes_raw(self) -> bytes:
"""
The raw bytes of the private key.
Equivalent to private_bytes(Raw, Raw, NoEncryption()).
"""
if hasattr(rust_openssl, "x448"):
Ed448PrivateKey.register(rust_openssl.ed448.Ed448PrivateKey)

View File

@ -0,0 +1,113 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives._asymmetric import (
AsymmetricPadding as AsymmetricPadding,
)
from cryptography.hazmat.primitives.asymmetric import rsa
class PKCS1v15(AsymmetricPadding):
name = "EMSA-PKCS1-v1_5"
class _MaxLength:
"Sentinel value for `MAX_LENGTH`."
class _Auto:
"Sentinel value for `AUTO`."
class _DigestLength:
"Sentinel value for `DIGEST_LENGTH`."
class PSS(AsymmetricPadding):
MAX_LENGTH = _MaxLength()
AUTO = _Auto()
DIGEST_LENGTH = _DigestLength()
name = "EMSA-PSS"
_salt_length: int | _MaxLength | _Auto | _DigestLength
def __init__(
self,
mgf: MGF,
salt_length: int | _MaxLength | _Auto | _DigestLength,
) -> None:
self._mgf = mgf
if not isinstance(
salt_length, (int, _MaxLength, _Auto, _DigestLength)
):
raise TypeError(
"salt_length must be an integer, MAX_LENGTH, "
"DIGEST_LENGTH, or AUTO"
)
if isinstance(salt_length, int) and salt_length < 0:
raise ValueError("salt_length must be zero or greater.")
self._salt_length = salt_length
@property
def mgf(self) -> MGF:
return self._mgf
class OAEP(AsymmetricPadding):
name = "EME-OAEP"
def __init__(
self,
mgf: MGF,
algorithm: hashes.HashAlgorithm,
label: bytes | None,
):
if not isinstance(algorithm, hashes.HashAlgorithm):
raise TypeError("Expected instance of hashes.HashAlgorithm.")
self._mgf = mgf
self._algorithm = algorithm
self._label = label
@property
def algorithm(self) -> hashes.HashAlgorithm:
return self._algorithm
@property
def mgf(self) -> MGF:
return self._mgf
class MGF(metaclass=abc.ABCMeta):
_algorithm: hashes.HashAlgorithm
class MGF1(MGF):
MAX_LENGTH = _MaxLength()
def __init__(self, algorithm: hashes.HashAlgorithm):
if not isinstance(algorithm, hashes.HashAlgorithm):
raise TypeError("Expected instance of hashes.HashAlgorithm.")
self._algorithm = algorithm
def calculate_max_pss_salt_length(
key: rsa.RSAPrivateKey | rsa.RSAPublicKey,
hash_algorithm: hashes.HashAlgorithm,
) -> int:
if not isinstance(key, (rsa.RSAPrivateKey, rsa.RSAPublicKey)):
raise TypeError("key must be an RSA public or private key")
# bit length - 1 per RFC 3447
emlen = (key.key_size + 6) // 8
salt_length = emlen - hash_algorithm.digest_size - 2
assert salt_length >= 0
return salt_length

View File

@ -0,0 +1,263 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
import random
import typing
from math import gcd
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives import _serialization, hashes
from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
from cryptography.hazmat.primitives.asymmetric import utils as asym_utils
class RSAPrivateKey(metaclass=abc.ABCMeta):
@abc.abstractmethod
def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes:
"""
Decrypts the provided ciphertext.
"""
@property
@abc.abstractmethod
def key_size(self) -> int:
"""
The bit length of the public modulus.
"""
@abc.abstractmethod
def public_key(self) -> RSAPublicKey:
"""
The RSAPublicKey associated with this private key.
"""
@abc.abstractmethod
def sign(
self,
data: bytes,
padding: AsymmetricPadding,
algorithm: asym_utils.Prehashed | hashes.HashAlgorithm,
) -> bytes:
"""
Signs the data.
"""
@abc.abstractmethod
def private_numbers(self) -> RSAPrivateNumbers:
"""
Returns an RSAPrivateNumbers.
"""
@abc.abstractmethod
def private_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PrivateFormat,
encryption_algorithm: _serialization.KeySerializationEncryption,
) -> bytes:
"""
Returns the key serialized as bytes.
"""
RSAPrivateKeyWithSerialization = RSAPrivateKey
RSAPrivateKey.register(rust_openssl.rsa.RSAPrivateKey)
class RSAPublicKey(metaclass=abc.ABCMeta):
@abc.abstractmethod
def encrypt(self, plaintext: bytes, padding: AsymmetricPadding) -> bytes:
"""
Encrypts the given plaintext.
"""
@property
@abc.abstractmethod
def key_size(self) -> int:
"""
The bit length of the public modulus.
"""
@abc.abstractmethod
def public_numbers(self) -> RSAPublicNumbers:
"""
Returns an RSAPublicNumbers
"""
@abc.abstractmethod
def public_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PublicFormat,
) -> bytes:
"""
Returns the key serialized as bytes.
"""
@abc.abstractmethod
def verify(
self,
signature: bytes,
data: bytes,
padding: AsymmetricPadding,
algorithm: asym_utils.Prehashed | hashes.HashAlgorithm,
) -> None:
"""
Verifies the signature of the data.
"""
@abc.abstractmethod
def recover_data_from_signature(
self,
signature: bytes,
padding: AsymmetricPadding,
algorithm: hashes.HashAlgorithm | None,
) -> bytes:
"""
Recovers the original data from the signature.
"""
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
RSAPublicKeyWithSerialization = RSAPublicKey
RSAPublicKey.register(rust_openssl.rsa.RSAPublicKey)
RSAPrivateNumbers = rust_openssl.rsa.RSAPrivateNumbers
RSAPublicNumbers = rust_openssl.rsa.RSAPublicNumbers
def generate_private_key(
public_exponent: int,
key_size: int,
backend: typing.Any = None,
) -> RSAPrivateKey:
_verify_rsa_parameters(public_exponent, key_size)
return rust_openssl.rsa.generate_private_key(public_exponent, key_size)
def _verify_rsa_parameters(public_exponent: int, key_size: int) -> None:
if public_exponent not in (3, 65537):
raise ValueError(
"public_exponent must be either 3 (for legacy compatibility) or "
"65537. Almost everyone should choose 65537 here!"
)
if key_size < 1024:
raise ValueError("key_size must be at least 1024-bits.")
def _modinv(e: int, m: int) -> int:
"""
Modular Multiplicative Inverse. Returns x such that: (x*e) mod m == 1
"""
x1, x2 = 1, 0
a, b = e, m
while b > 0:
q, r = divmod(a, b)
xn = x1 - q * x2
a, b, x1, x2 = b, r, x2, xn
return x1 % m
def rsa_crt_iqmp(p: int, q: int) -> int:
"""
Compute the CRT (q ** -1) % p value from RSA primes p and q.
"""
return _modinv(q, p)
def rsa_crt_dmp1(private_exponent: int, p: int) -> int:
"""
Compute the CRT private_exponent % (p - 1) value from the RSA
private_exponent (d) and p.
"""
return private_exponent % (p - 1)
def rsa_crt_dmq1(private_exponent: int, q: int) -> int:
"""
Compute the CRT private_exponent % (q - 1) value from the RSA
private_exponent (d) and q.
"""
return private_exponent % (q - 1)
def rsa_recover_private_exponent(e: int, p: int, q: int) -> int:
"""
Compute the RSA private_exponent (d) given the public exponent (e)
and the RSA primes p and q.
This uses the Carmichael totient function to generate the
smallest possible working value of the private exponent.
"""
# This lambda_n is the Carmichael totient function.
# The original RSA paper uses the Euler totient function
# here: phi_n = (p - 1) * (q - 1)
# Either version of the private exponent will work, but the
# one generated by the older formulation may be larger
# than necessary. (lambda_n always divides phi_n)
#
# TODO: Replace with lcm(p - 1, q - 1) once the minimum
# supported Python version is >= 3.9.
lambda_n = (p - 1) * (q - 1) // gcd(p - 1, q - 1)
return _modinv(e, lambda_n)
# Controls the number of iterations rsa_recover_prime_factors will perform
# to obtain the prime factors.
_MAX_RECOVERY_ATTEMPTS = 500
def rsa_recover_prime_factors(n: int, e: int, d: int) -> tuple[int, int]:
"""
Compute factors p and q from the private exponent d. We assume that n has
no more than two factors. This function is adapted from code in PyCrypto.
"""
# reject invalid values early
if 17 != pow(17, e * d, n):
raise ValueError("n, d, e don't match")
# See 8.2.2(i) in Handbook of Applied Cryptography.
ktot = d * e - 1
# The quantity d*e-1 is a multiple of phi(n), even,
# and can be represented as t*2^s.
t = ktot
while t % 2 == 0:
t = t // 2
# Cycle through all multiplicative inverses in Zn.
# The algorithm is non-deterministic, but there is a 50% chance
# any candidate a leads to successful factoring.
# See "Digitalized Signatures and Public Key Functions as Intractable
# as Factorization", M. Rabin, 1979
spotted = False
tries = 0
while not spotted and tries < _MAX_RECOVERY_ATTEMPTS:
a = random.randint(2, n - 1)
tries += 1
k = t
# Cycle through all values a^{t*2^i}=a^k
while k < ktot:
cand = pow(a, k, n)
# Check if a^k is a non-trivial root of unity (mod n)
if cand != 1 and cand != (n - 1) and pow(cand, 2, n) == 1:
# We have found a number such that (cand-1)(cand+1)=0 (mod n).
# Either of the terms divides n.
p = gcd(cand + 1, n)
spotted = True
break
k *= 2
if not spotted:
raise ValueError("Unable to compute factors p and q from exponent d.")
# Found !
q, r = divmod(n, p)
assert r == 0
p, q = sorted((p, q), reverse=True)
return (p, q)

View File

@ -0,0 +1,111 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import typing
from cryptography import utils
from cryptography.hazmat.primitives.asymmetric import (
dh,
dsa,
ec,
ed448,
ed25519,
rsa,
x448,
x25519,
)
# Every asymmetric key type
PublicKeyTypes = typing.Union[
dh.DHPublicKey,
dsa.DSAPublicKey,
rsa.RSAPublicKey,
ec.EllipticCurvePublicKey,
ed25519.Ed25519PublicKey,
ed448.Ed448PublicKey,
x25519.X25519PublicKey,
x448.X448PublicKey,
]
PUBLIC_KEY_TYPES = PublicKeyTypes
utils.deprecated(
PUBLIC_KEY_TYPES,
__name__,
"Use PublicKeyTypes instead",
utils.DeprecatedIn40,
name="PUBLIC_KEY_TYPES",
)
# Every asymmetric key type
PrivateKeyTypes = typing.Union[
dh.DHPrivateKey,
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
rsa.RSAPrivateKey,
dsa.DSAPrivateKey,
ec.EllipticCurvePrivateKey,
x25519.X25519PrivateKey,
x448.X448PrivateKey,
]
PRIVATE_KEY_TYPES = PrivateKeyTypes
utils.deprecated(
PRIVATE_KEY_TYPES,
__name__,
"Use PrivateKeyTypes instead",
utils.DeprecatedIn40,
name="PRIVATE_KEY_TYPES",
)
# Just the key types we allow to be used for x509 signing. This mirrors
# the certificate public key types
CertificateIssuerPrivateKeyTypes = typing.Union[
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
rsa.RSAPrivateKey,
dsa.DSAPrivateKey,
ec.EllipticCurvePrivateKey,
]
CERTIFICATE_PRIVATE_KEY_TYPES = CertificateIssuerPrivateKeyTypes
utils.deprecated(
CERTIFICATE_PRIVATE_KEY_TYPES,
__name__,
"Use CertificateIssuerPrivateKeyTypes instead",
utils.DeprecatedIn40,
name="CERTIFICATE_PRIVATE_KEY_TYPES",
)
# Just the key types we allow to be used for x509 signing. This mirrors
# the certificate private key types
CertificateIssuerPublicKeyTypes = typing.Union[
dsa.DSAPublicKey,
rsa.RSAPublicKey,
ec.EllipticCurvePublicKey,
ed25519.Ed25519PublicKey,
ed448.Ed448PublicKey,
]
CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES = CertificateIssuerPublicKeyTypes
utils.deprecated(
CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES,
__name__,
"Use CertificateIssuerPublicKeyTypes instead",
utils.DeprecatedIn40,
name="CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES",
)
# This type removes DHPublicKey. x448/x25519 can be a public key
# but cannot be used in signing so they are allowed here.
CertificatePublicKeyTypes = typing.Union[
dsa.DSAPublicKey,
rsa.RSAPublicKey,
ec.EllipticCurvePublicKey,
ed25519.Ed25519PublicKey,
ed448.Ed448PublicKey,
x25519.X25519PublicKey,
x448.X448PublicKey,
]
CERTIFICATE_PUBLIC_KEY_TYPES = CertificatePublicKeyTypes
utils.deprecated(
CERTIFICATE_PUBLIC_KEY_TYPES,
__name__,
"Use CertificatePublicKeyTypes instead",
utils.DeprecatedIn40,
name="CERTIFICATE_PUBLIC_KEY_TYPES",
)

View File

@ -0,0 +1,24 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
from cryptography.hazmat.bindings._rust import asn1
from cryptography.hazmat.primitives import hashes
decode_dss_signature = asn1.decode_dss_signature
encode_dss_signature = asn1.encode_dss_signature
class Prehashed:
def __init__(self, algorithm: hashes.HashAlgorithm):
if not isinstance(algorithm, hashes.HashAlgorithm):
raise TypeError("Expected instance of HashAlgorithm.")
self._algorithm = algorithm
self._digest_size = algorithm.digest_size
@property
def digest_size(self) -> int:
return self._digest_size

View File

@ -0,0 +1,109 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives import _serialization
class X25519PublicKey(metaclass=abc.ABCMeta):
@classmethod
def from_public_bytes(cls, data: bytes) -> X25519PublicKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.x25519_supported():
raise UnsupportedAlgorithm(
"X25519 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM,
)
return rust_openssl.x25519.from_public_bytes(data)
@abc.abstractmethod
def public_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PublicFormat,
) -> bytes:
"""
The serialized bytes of the public key.
"""
@abc.abstractmethod
def public_bytes_raw(self) -> bytes:
"""
The raw bytes of the public key.
Equivalent to public_bytes(Raw, Raw).
"""
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
X25519PublicKey.register(rust_openssl.x25519.X25519PublicKey)
class X25519PrivateKey(metaclass=abc.ABCMeta):
@classmethod
def generate(cls) -> X25519PrivateKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.x25519_supported():
raise UnsupportedAlgorithm(
"X25519 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM,
)
return rust_openssl.x25519.generate_key()
@classmethod
def from_private_bytes(cls, data: bytes) -> X25519PrivateKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.x25519_supported():
raise UnsupportedAlgorithm(
"X25519 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM,
)
return rust_openssl.x25519.from_private_bytes(data)
@abc.abstractmethod
def public_key(self) -> X25519PublicKey:
"""
Returns the public key associated with this private key
"""
@abc.abstractmethod
def private_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PrivateFormat,
encryption_algorithm: _serialization.KeySerializationEncryption,
) -> bytes:
"""
The serialized bytes of the private key.
"""
@abc.abstractmethod
def private_bytes_raw(self) -> bytes:
"""
The raw bytes of the private key.
Equivalent to private_bytes(Raw, Raw, NoEncryption()).
"""
@abc.abstractmethod
def exchange(self, peer_public_key: X25519PublicKey) -> bytes:
"""
Performs a key exchange operation using the provided peer's public key.
"""
X25519PrivateKey.register(rust_openssl.x25519.X25519PrivateKey)

View File

@ -0,0 +1,112 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives import _serialization
class X448PublicKey(metaclass=abc.ABCMeta):
@classmethod
def from_public_bytes(cls, data: bytes) -> X448PublicKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.x448_supported():
raise UnsupportedAlgorithm(
"X448 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM,
)
return rust_openssl.x448.from_public_bytes(data)
@abc.abstractmethod
def public_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PublicFormat,
) -> bytes:
"""
The serialized bytes of the public key.
"""
@abc.abstractmethod
def public_bytes_raw(self) -> bytes:
"""
The raw bytes of the public key.
Equivalent to public_bytes(Raw, Raw).
"""
@abc.abstractmethod
def __eq__(self, other: object) -> bool:
"""
Checks equality.
"""
if hasattr(rust_openssl, "x448"):
X448PublicKey.register(rust_openssl.x448.X448PublicKey)
class X448PrivateKey(metaclass=abc.ABCMeta):
@classmethod
def generate(cls) -> X448PrivateKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.x448_supported():
raise UnsupportedAlgorithm(
"X448 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM,
)
return rust_openssl.x448.generate_key()
@classmethod
def from_private_bytes(cls, data: bytes) -> X448PrivateKey:
from cryptography.hazmat.backends.openssl.backend import backend
if not backend.x448_supported():
raise UnsupportedAlgorithm(
"X448 is not supported by this version of OpenSSL.",
_Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM,
)
return rust_openssl.x448.from_private_bytes(data)
@abc.abstractmethod
def public_key(self) -> X448PublicKey:
"""
Returns the public key associated with this private key
"""
@abc.abstractmethod
def private_bytes(
self,
encoding: _serialization.Encoding,
format: _serialization.PrivateFormat,
encryption_algorithm: _serialization.KeySerializationEncryption,
) -> bytes:
"""
The serialized bytes of the private key.
"""
@abc.abstractmethod
def private_bytes_raw(self) -> bytes:
"""
The raw bytes of the private key.
Equivalent to private_bytes(Raw, Raw, NoEncryption()).
"""
@abc.abstractmethod
def exchange(self, peer_public_key: X448PublicKey) -> bytes:
"""
Performs a key exchange operation using the provided peer's public key.
"""
if hasattr(rust_openssl, "x448"):
X448PrivateKey.register(rust_openssl.x448.X448PrivateKey)

View File

@ -0,0 +1,27 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
from cryptography.hazmat.primitives._cipheralgorithm import (
BlockCipherAlgorithm,
CipherAlgorithm,
)
from cryptography.hazmat.primitives.ciphers.base import (
AEADCipherContext,
AEADDecryptionContext,
AEADEncryptionContext,
Cipher,
CipherContext,
)
__all__ = [
"AEADCipherContext",
"AEADDecryptionContext",
"AEADEncryptionContext",
"BlockCipherAlgorithm",
"Cipher",
"CipherAlgorithm",
"CipherContext",
]

View File

@ -0,0 +1,23 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
__all__ = [
"AESCCM",
"AESGCM",
"AESGCMSIV",
"AESOCB3",
"AESSIV",
"ChaCha20Poly1305",
]
AESGCM = rust_openssl.aead.AESGCM
ChaCha20Poly1305 = rust_openssl.aead.ChaCha20Poly1305
AESCCM = rust_openssl.aead.AESCCM
AESSIV = rust_openssl.aead.AESSIV
AESOCB3 = rust_openssl.aead.AESOCB3
AESGCMSIV = rust_openssl.aead.AESGCMSIV

View File

@ -0,0 +1,183 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
from cryptography import utils
from cryptography.hazmat.decrepit.ciphers.algorithms import (
ARC4 as ARC4,
)
from cryptography.hazmat.decrepit.ciphers.algorithms import (
CAST5 as CAST5,
)
from cryptography.hazmat.decrepit.ciphers.algorithms import (
IDEA as IDEA,
)
from cryptography.hazmat.decrepit.ciphers.algorithms import (
SEED as SEED,
)
from cryptography.hazmat.decrepit.ciphers.algorithms import (
Blowfish as Blowfish,
)
from cryptography.hazmat.decrepit.ciphers.algorithms import (
TripleDES as TripleDES,
)
from cryptography.hazmat.primitives._cipheralgorithm import _verify_key_size
from cryptography.hazmat.primitives.ciphers import (
BlockCipherAlgorithm,
CipherAlgorithm,
)
class AES(BlockCipherAlgorithm):
name = "AES"
block_size = 128
# 512 added to support AES-256-XTS, which uses 512-bit keys
key_sizes = frozenset([128, 192, 256, 512])
def __init__(self, key: bytes):
self.key = _verify_key_size(self, key)
@property
def key_size(self) -> int:
return len(self.key) * 8
class AES128(BlockCipherAlgorithm):
name = "AES"
block_size = 128
key_sizes = frozenset([128])
key_size = 128
def __init__(self, key: bytes):
self.key = _verify_key_size(self, key)
class AES256(BlockCipherAlgorithm):
name = "AES"
block_size = 128
key_sizes = frozenset([256])
key_size = 256
def __init__(self, key: bytes):
self.key = _verify_key_size(self, key)
class Camellia(BlockCipherAlgorithm):
name = "camellia"
block_size = 128
key_sizes = frozenset([128, 192, 256])
def __init__(self, key: bytes):
self.key = _verify_key_size(self, key)
@property
def key_size(self) -> int:
return len(self.key) * 8
utils.deprecated(
ARC4,
__name__,
"ARC4 has been moved to "
"cryptography.hazmat.decrepit.ciphers.algorithms.ARC4 and "
"will be removed from "
"cryptography.hazmat.primitives.ciphers.algorithms in 48.0.0.",
utils.DeprecatedIn43,
name="ARC4",
)
utils.deprecated(
TripleDES,
__name__,
"TripleDES has been moved to "
"cryptography.hazmat.decrepit.ciphers.algorithms.TripleDES and "
"will be removed from "
"cryptography.hazmat.primitives.ciphers.algorithms in 48.0.0.",
utils.DeprecatedIn43,
name="TripleDES",
)
utils.deprecated(
Blowfish,
__name__,
"Blowfish has been moved to "
"cryptography.hazmat.decrepit.ciphers.algorithms.Blowfish and "
"will be removed from "
"cryptography.hazmat.primitives.ciphers.algorithms in 45.0.0.",
utils.DeprecatedIn37,
name="Blowfish",
)
utils.deprecated(
CAST5,
__name__,
"CAST5 has been moved to "
"cryptography.hazmat.decrepit.ciphers.algorithms.CAST5 and "
"will be removed from "
"cryptography.hazmat.primitives.ciphers.algorithms in 45.0.0.",
utils.DeprecatedIn37,
name="CAST5",
)
utils.deprecated(
IDEA,
__name__,
"IDEA has been moved to "
"cryptography.hazmat.decrepit.ciphers.algorithms.IDEA and "
"will be removed from "
"cryptography.hazmat.primitives.ciphers.algorithms in 45.0.0.",
utils.DeprecatedIn37,
name="IDEA",
)
utils.deprecated(
SEED,
__name__,
"SEED has been moved to "
"cryptography.hazmat.decrepit.ciphers.algorithms.SEED and "
"will be removed from "
"cryptography.hazmat.primitives.ciphers.algorithms in 45.0.0.",
utils.DeprecatedIn37,
name="SEED",
)
class ChaCha20(CipherAlgorithm):
name = "ChaCha20"
key_sizes = frozenset([256])
def __init__(self, key: bytes, nonce: bytes):
self.key = _verify_key_size(self, key)
utils._check_byteslike("nonce", nonce)
if len(nonce) != 16:
raise ValueError("nonce must be 128-bits (16 bytes)")
self._nonce = nonce
@property
def nonce(self) -> bytes:
return self._nonce
@property
def key_size(self) -> int:
return len(self.key) * 8
class SM4(BlockCipherAlgorithm):
name = "SM4"
block_size = 128
key_sizes = frozenset([128])
def __init__(self, key: bytes):
self.key = _verify_key_size(self, key)
@property
def key_size(self) -> int:
return len(self.key) * 8

View File

@ -0,0 +1,145 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
import typing
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
from cryptography.hazmat.primitives.ciphers import modes
class CipherContext(metaclass=abc.ABCMeta):
@abc.abstractmethod
def update(self, data: bytes) -> bytes:
"""
Processes the provided bytes through the cipher and returns the results
as bytes.
"""
@abc.abstractmethod
def update_into(self, data: bytes, buf: bytes) -> int:
"""
Processes the provided bytes and writes the resulting data into the
provided buffer. Returns the number of bytes written.
"""
@abc.abstractmethod
def finalize(self) -> bytes:
"""
Returns the results of processing the final block as bytes.
"""
@abc.abstractmethod
def reset_nonce(self, nonce: bytes) -> None:
"""
Resets the nonce for the cipher context to the provided value.
Raises an exception if it does not support reset or if the
provided nonce does not have a valid length.
"""
class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta):
@abc.abstractmethod
def authenticate_additional_data(self, data: bytes) -> None:
"""
Authenticates the provided bytes.
"""
class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
@abc.abstractmethod
def finalize_with_tag(self, tag: bytes) -> bytes:
"""
Returns the results of processing the final block as bytes and allows
delayed passing of the authentication tag.
"""
class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def tag(self) -> bytes:
"""
Returns tag bytes. This is only available after encryption is
finalized.
"""
Mode = typing.TypeVar(
"Mode", bound=typing.Optional[modes.Mode], covariant=True
)
class Cipher(typing.Generic[Mode]):
def __init__(
self,
algorithm: CipherAlgorithm,
mode: Mode,
backend: typing.Any = None,
) -> None:
if not isinstance(algorithm, CipherAlgorithm):
raise TypeError("Expected interface of CipherAlgorithm.")
if mode is not None:
# mypy needs this assert to narrow the type from our generic
# type. Maybe it won't some time in the future.
assert isinstance(mode, modes.Mode)
mode.validate_for_algorithm(algorithm)
self.algorithm = algorithm
self.mode = mode
@typing.overload
def encryptor(
self: Cipher[modes.ModeWithAuthenticationTag],
) -> AEADEncryptionContext: ...
@typing.overload
def encryptor(
self: _CIPHER_TYPE,
) -> CipherContext: ...
def encryptor(self):
if isinstance(self.mode, modes.ModeWithAuthenticationTag):
if self.mode.tag is not None:
raise ValueError(
"Authentication tag must be None when encrypting."
)
return rust_openssl.ciphers.create_encryption_ctx(
self.algorithm, self.mode
)
@typing.overload
def decryptor(
self: Cipher[modes.ModeWithAuthenticationTag],
) -> AEADDecryptionContext: ...
@typing.overload
def decryptor(
self: _CIPHER_TYPE,
) -> CipherContext: ...
def decryptor(self):
return rust_openssl.ciphers.create_decryption_ctx(
self.algorithm, self.mode
)
_CIPHER_TYPE = Cipher[
typing.Union[
modes.ModeWithNonce,
modes.ModeWithTweak,
None,
modes.ECB,
modes.ModeWithInitializationVector,
]
]
CipherContext.register(rust_openssl.ciphers.CipherContext)
AEADEncryptionContext.register(rust_openssl.ciphers.AEADEncryptionContext)
AEADDecryptionContext.register(rust_openssl.ciphers.AEADDecryptionContext)

View File

@ -0,0 +1,268 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
from cryptography import utils
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.primitives._cipheralgorithm import (
BlockCipherAlgorithm,
CipherAlgorithm,
)
from cryptography.hazmat.primitives.ciphers import algorithms
class Mode(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def name(self) -> str:
"""
A string naming this mode (e.g. "ECB", "CBC").
"""
@abc.abstractmethod
def validate_for_algorithm(self, algorithm: CipherAlgorithm) -> None:
"""
Checks that all the necessary invariants of this (mode, algorithm)
combination are met.
"""
class ModeWithInitializationVector(Mode, metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def initialization_vector(self) -> bytes:
"""
The value of the initialization vector for this mode as bytes.
"""
class ModeWithTweak(Mode, metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def tweak(self) -> bytes:
"""
The value of the tweak for this mode as bytes.
"""
class ModeWithNonce(Mode, metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def nonce(self) -> bytes:
"""
The value of the nonce for this mode as bytes.
"""
class ModeWithAuthenticationTag(Mode, metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def tag(self) -> bytes | None:
"""
The value of the tag supplied to the constructor of this mode.
"""
def _check_aes_key_length(self: Mode, algorithm: CipherAlgorithm) -> None:
if algorithm.key_size > 256 and algorithm.name == "AES":
raise ValueError(
"Only 128, 192, and 256 bit keys are allowed for this AES mode"
)
def _check_iv_length(
self: ModeWithInitializationVector, algorithm: BlockCipherAlgorithm
) -> None:
iv_len = len(self.initialization_vector)
if iv_len * 8 != algorithm.block_size:
raise ValueError(f"Invalid IV size ({iv_len}) for {self.name}.")
def _check_nonce_length(
nonce: bytes, name: str, algorithm: CipherAlgorithm
) -> None:
if not isinstance(algorithm, BlockCipherAlgorithm):
raise UnsupportedAlgorithm(
f"{name} requires a block cipher algorithm",
_Reasons.UNSUPPORTED_CIPHER,
)
if len(nonce) * 8 != algorithm.block_size:
raise ValueError(f"Invalid nonce size ({len(nonce)}) for {name}.")
def _check_iv_and_key_length(
self: ModeWithInitializationVector, algorithm: CipherAlgorithm
) -> None:
if not isinstance(algorithm, BlockCipherAlgorithm):
raise UnsupportedAlgorithm(
f"{self} requires a block cipher algorithm",
_Reasons.UNSUPPORTED_CIPHER,
)
_check_aes_key_length(self, algorithm)
_check_iv_length(self, algorithm)
class CBC(ModeWithInitializationVector):
name = "CBC"
def __init__(self, initialization_vector: bytes):
utils._check_byteslike("initialization_vector", initialization_vector)
self._initialization_vector = initialization_vector
@property
def initialization_vector(self) -> bytes:
return self._initialization_vector
validate_for_algorithm = _check_iv_and_key_length
class XTS(ModeWithTweak):
name = "XTS"
def __init__(self, tweak: bytes):
utils._check_byteslike("tweak", tweak)
if len(tweak) != 16:
raise ValueError("tweak must be 128-bits (16 bytes)")
self._tweak = tweak
@property
def tweak(self) -> bytes:
return self._tweak
def validate_for_algorithm(self, algorithm: CipherAlgorithm) -> None:
if isinstance(algorithm, (algorithms.AES128, algorithms.AES256)):
raise TypeError(
"The AES128 and AES256 classes do not support XTS, please use "
"the standard AES class instead."
)
if algorithm.key_size not in (256, 512):
raise ValueError(
"The XTS specification requires a 256-bit key for AES-128-XTS"
" and 512-bit key for AES-256-XTS"
)
class ECB(Mode):
name = "ECB"
validate_for_algorithm = _check_aes_key_length
class OFB(ModeWithInitializationVector):
name = "OFB"
def __init__(self, initialization_vector: bytes):
utils._check_byteslike("initialization_vector", initialization_vector)
self._initialization_vector = initialization_vector
@property
def initialization_vector(self) -> bytes:
return self._initialization_vector
validate_for_algorithm = _check_iv_and_key_length
class CFB(ModeWithInitializationVector):
name = "CFB"
def __init__(self, initialization_vector: bytes):
utils._check_byteslike("initialization_vector", initialization_vector)
self._initialization_vector = initialization_vector
@property
def initialization_vector(self) -> bytes:
return self._initialization_vector
validate_for_algorithm = _check_iv_and_key_length
class CFB8(ModeWithInitializationVector):
name = "CFB8"
def __init__(self, initialization_vector: bytes):
utils._check_byteslike("initialization_vector", initialization_vector)
self._initialization_vector = initialization_vector
@property
def initialization_vector(self) -> bytes:
return self._initialization_vector
validate_for_algorithm = _check_iv_and_key_length
class CTR(ModeWithNonce):
name = "CTR"
def __init__(self, nonce: bytes):
utils._check_byteslike("nonce", nonce)
self._nonce = nonce
@property
def nonce(self) -> bytes:
return self._nonce
def validate_for_algorithm(self, algorithm: CipherAlgorithm) -> None:
_check_aes_key_length(self, algorithm)
_check_nonce_length(self.nonce, self.name, algorithm)
class GCM(ModeWithInitializationVector, ModeWithAuthenticationTag):
name = "GCM"
_MAX_ENCRYPTED_BYTES = (2**39 - 256) // 8
_MAX_AAD_BYTES = (2**64) // 8
def __init__(
self,
initialization_vector: bytes,
tag: bytes | None = None,
min_tag_length: int = 16,
):
# OpenSSL 3.0.0 constrains GCM IVs to [64, 1024] bits inclusive
# This is a sane limit anyway so we'll enforce it here.
utils._check_byteslike("initialization_vector", initialization_vector)
if len(initialization_vector) < 8 or len(initialization_vector) > 128:
raise ValueError(
"initialization_vector must be between 8 and 128 bytes (64 "
"and 1024 bits)."
)
self._initialization_vector = initialization_vector
if tag is not None:
utils._check_bytes("tag", tag)
if min_tag_length < 4:
raise ValueError("min_tag_length must be >= 4")
if len(tag) < min_tag_length:
raise ValueError(
f"Authentication tag must be {min_tag_length} bytes or "
"longer."
)
self._tag = tag
self._min_tag_length = min_tag_length
@property
def tag(self) -> bytes | None:
return self._tag
@property
def initialization_vector(self) -> bytes:
return self._initialization_vector
def validate_for_algorithm(self, algorithm: CipherAlgorithm) -> None:
_check_aes_key_length(self, algorithm)
if not isinstance(algorithm, BlockCipherAlgorithm):
raise UnsupportedAlgorithm(
"GCM requires a block cipher algorithm",
_Reasons.UNSUPPORTED_CIPHER,
)
block_size_bytes = algorithm.block_size // 8
if self._tag is not None and len(self._tag) > block_size_bytes:
raise ValueError(
f"Authentication tag cannot be more than {block_size_bytes} "
"bytes."
)

View File

@ -0,0 +1,10 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
__all__ = ["CMAC"]
CMAC = rust_openssl.cmac.CMAC

View File

@ -0,0 +1,14 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import hmac
def bytes_eq(a: bytes, b: bytes) -> bool:
if not isinstance(a, bytes) or not isinstance(b, bytes):
raise TypeError("a and b must be bytes.")
return hmac.compare_digest(a, b)

View File

@ -0,0 +1,242 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
__all__ = [
"MD5",
"SHA1",
"SHA3_224",
"SHA3_256",
"SHA3_384",
"SHA3_512",
"SHA224",
"SHA256",
"SHA384",
"SHA512",
"SHA512_224",
"SHA512_256",
"SHAKE128",
"SHAKE256",
"SM3",
"BLAKE2b",
"BLAKE2s",
"ExtendableOutputFunction",
"Hash",
"HashAlgorithm",
"HashContext",
]
class HashAlgorithm(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def name(self) -> str:
"""
A string naming this algorithm (e.g. "sha256", "md5").
"""
@property
@abc.abstractmethod
def digest_size(self) -> int:
"""
The size of the resulting digest in bytes.
"""
@property
@abc.abstractmethod
def block_size(self) -> int | None:
"""
The internal block size of the hash function, or None if the hash
function does not use blocks internally (e.g. SHA3).
"""
class HashContext(metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def algorithm(self) -> HashAlgorithm:
"""
A HashAlgorithm that will be used by this context.
"""
@abc.abstractmethod
def update(self, data: bytes) -> None:
"""
Processes the provided bytes through the hash.
"""
@abc.abstractmethod
def finalize(self) -> bytes:
"""
Finalizes the hash context and returns the hash digest as bytes.
"""
@abc.abstractmethod
def copy(self) -> HashContext:
"""
Return a HashContext that is a copy of the current context.
"""
Hash = rust_openssl.hashes.Hash
HashContext.register(Hash)
class ExtendableOutputFunction(metaclass=abc.ABCMeta):
"""
An interface for extendable output functions.
"""
class SHA1(HashAlgorithm):
name = "sha1"
digest_size = 20
block_size = 64
class SHA512_224(HashAlgorithm): # noqa: N801
name = "sha512-224"
digest_size = 28
block_size = 128
class SHA512_256(HashAlgorithm): # noqa: N801
name = "sha512-256"
digest_size = 32
block_size = 128
class SHA224(HashAlgorithm):
name = "sha224"
digest_size = 28
block_size = 64
class SHA256(HashAlgorithm):
name = "sha256"
digest_size = 32
block_size = 64
class SHA384(HashAlgorithm):
name = "sha384"
digest_size = 48
block_size = 128
class SHA512(HashAlgorithm):
name = "sha512"
digest_size = 64
block_size = 128
class SHA3_224(HashAlgorithm): # noqa: N801
name = "sha3-224"
digest_size = 28
block_size = None
class SHA3_256(HashAlgorithm): # noqa: N801
name = "sha3-256"
digest_size = 32
block_size = None
class SHA3_384(HashAlgorithm): # noqa: N801
name = "sha3-384"
digest_size = 48
block_size = None
class SHA3_512(HashAlgorithm): # noqa: N801
name = "sha3-512"
digest_size = 64
block_size = None
class SHAKE128(HashAlgorithm, ExtendableOutputFunction):
name = "shake128"
block_size = None
def __init__(self, digest_size: int):
if not isinstance(digest_size, int):
raise TypeError("digest_size must be an integer")
if digest_size < 1:
raise ValueError("digest_size must be a positive integer")
self._digest_size = digest_size
@property
def digest_size(self) -> int:
return self._digest_size
class SHAKE256(HashAlgorithm, ExtendableOutputFunction):
name = "shake256"
block_size = None
def __init__(self, digest_size: int):
if not isinstance(digest_size, int):
raise TypeError("digest_size must be an integer")
if digest_size < 1:
raise ValueError("digest_size must be a positive integer")
self._digest_size = digest_size
@property
def digest_size(self) -> int:
return self._digest_size
class MD5(HashAlgorithm):
name = "md5"
digest_size = 16
block_size = 64
class BLAKE2b(HashAlgorithm):
name = "blake2b"
_max_digest_size = 64
_min_digest_size = 1
block_size = 128
def __init__(self, digest_size: int):
if digest_size != 64:
raise ValueError("Digest size must be 64")
self._digest_size = digest_size
@property
def digest_size(self) -> int:
return self._digest_size
class BLAKE2s(HashAlgorithm):
name = "blake2s"
block_size = 64
_max_digest_size = 32
_min_digest_size = 1
def __init__(self, digest_size: int):
if digest_size != 32:
raise ValueError("Digest size must be 32")
self._digest_size = digest_size
@property
def digest_size(self) -> int:
return self._digest_size
class SM3(HashAlgorithm):
name = "sm3"
digest_size = 32
block_size = 64

View File

@ -0,0 +1,13 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives import hashes
__all__ = ["HMAC"]
HMAC = rust_openssl.hmac.HMAC
hashes.HashContext.register(HMAC)

View File

@ -0,0 +1,23 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
class KeyDerivationFunction(metaclass=abc.ABCMeta):
@abc.abstractmethod
def derive(self, key_material: bytes) -> bytes:
"""
Deterministically generates and returns a new key based on the existing
key material.
"""
@abc.abstractmethod
def verify(self, key_material: bytes, expected_key: bytes) -> None:
"""
Checks whether the key generated by the key material matches the
expected derived key. Raises an exception if they do not match.
"""

View File

@ -0,0 +1,13 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives.kdf import KeyDerivationFunction
Argon2id = rust_openssl.kdf.Argon2id
KeyDerivationFunction.register(Argon2id)
__all__ = ["Argon2id"]

View File

@ -0,0 +1,124 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import typing
from cryptography import utils
from cryptography.exceptions import AlreadyFinalized, InvalidKey
from cryptography.hazmat.primitives import constant_time, hashes, hmac
from cryptography.hazmat.primitives.kdf import KeyDerivationFunction
def _int_to_u32be(n: int) -> bytes:
return n.to_bytes(length=4, byteorder="big")
def _common_args_checks(
algorithm: hashes.HashAlgorithm,
length: int,
otherinfo: bytes | None,
) -> None:
max_length = algorithm.digest_size * (2**32 - 1)
if length > max_length:
raise ValueError(f"Cannot derive keys larger than {max_length} bits.")
if otherinfo is not None:
utils._check_bytes("otherinfo", otherinfo)
def _concatkdf_derive(
key_material: bytes,
length: int,
auxfn: typing.Callable[[], hashes.HashContext],
otherinfo: bytes,
) -> bytes:
utils._check_byteslike("key_material", key_material)
output = [b""]
outlen = 0
counter = 1
while length > outlen:
h = auxfn()
h.update(_int_to_u32be(counter))
h.update(key_material)
h.update(otherinfo)
output.append(h.finalize())
outlen += len(output[-1])
counter += 1
return b"".join(output)[:length]
class ConcatKDFHash(KeyDerivationFunction):
def __init__(
self,
algorithm: hashes.HashAlgorithm,
length: int,
otherinfo: bytes | None,
backend: typing.Any = None,
):
_common_args_checks(algorithm, length, otherinfo)
self._algorithm = algorithm
self._length = length
self._otherinfo: bytes = otherinfo if otherinfo is not None else b""
self._used = False
def _hash(self) -> hashes.Hash:
return hashes.Hash(self._algorithm)
def derive(self, key_material: bytes) -> bytes:
if self._used:
raise AlreadyFinalized
self._used = True
return _concatkdf_derive(
key_material, self._length, self._hash, self._otherinfo
)
def verify(self, key_material: bytes, expected_key: bytes) -> None:
if not constant_time.bytes_eq(self.derive(key_material), expected_key):
raise InvalidKey
class ConcatKDFHMAC(KeyDerivationFunction):
def __init__(
self,
algorithm: hashes.HashAlgorithm,
length: int,
salt: bytes | None,
otherinfo: bytes | None,
backend: typing.Any = None,
):
_common_args_checks(algorithm, length, otherinfo)
self._algorithm = algorithm
self._length = length
self._otherinfo: bytes = otherinfo if otherinfo is not None else b""
if algorithm.block_size is None:
raise TypeError(f"{algorithm.name} is unsupported for ConcatKDF")
if salt is None:
salt = b"\x00" * algorithm.block_size
else:
utils._check_bytes("salt", salt)
self._salt = salt
self._used = False
def _hmac(self) -> hmac.HMAC:
return hmac.HMAC(self._salt, self._algorithm)
def derive(self, key_material: bytes) -> bytes:
if self._used:
raise AlreadyFinalized
self._used = True
return _concatkdf_derive(
key_material, self._length, self._hmac, self._otherinfo
)
def verify(self, key_material: bytes, expected_key: bytes) -> None:
if not constant_time.bytes_eq(self.derive(key_material), expected_key):
raise InvalidKey

View File

@ -0,0 +1,101 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import typing
from cryptography import utils
from cryptography.exceptions import AlreadyFinalized, InvalidKey
from cryptography.hazmat.primitives import constant_time, hashes, hmac
from cryptography.hazmat.primitives.kdf import KeyDerivationFunction
class HKDF(KeyDerivationFunction):
def __init__(
self,
algorithm: hashes.HashAlgorithm,
length: int,
salt: bytes | None,
info: bytes | None,
backend: typing.Any = None,
):
self._algorithm = algorithm
if salt is None:
salt = b"\x00" * self._algorithm.digest_size
else:
utils._check_bytes("salt", salt)
self._salt = salt
self._hkdf_expand = HKDFExpand(self._algorithm, length, info)
def _extract(self, key_material: bytes) -> bytes:
h = hmac.HMAC(self._salt, self._algorithm)
h.update(key_material)
return h.finalize()
def derive(self, key_material: bytes) -> bytes:
utils._check_byteslike("key_material", key_material)
return self._hkdf_expand.derive(self._extract(key_material))
def verify(self, key_material: bytes, expected_key: bytes) -> None:
if not constant_time.bytes_eq(self.derive(key_material), expected_key):
raise InvalidKey
class HKDFExpand(KeyDerivationFunction):
def __init__(
self,
algorithm: hashes.HashAlgorithm,
length: int,
info: bytes | None,
backend: typing.Any = None,
):
self._algorithm = algorithm
max_length = 255 * algorithm.digest_size
if length > max_length:
raise ValueError(
f"Cannot derive keys larger than {max_length} octets."
)
self._length = length
if info is None:
info = b""
else:
utils._check_bytes("info", info)
self._info = info
self._used = False
def _expand(self, key_material: bytes) -> bytes:
output = [b""]
counter = 1
while self._algorithm.digest_size * (len(output) - 1) < self._length:
h = hmac.HMAC(key_material, self._algorithm)
h.update(output[-1])
h.update(self._info)
h.update(bytes([counter]))
output.append(h.finalize())
counter += 1
return b"".join(output)[: self._length]
def derive(self, key_material: bytes) -> bytes:
utils._check_byteslike("key_material", key_material)
if self._used:
raise AlreadyFinalized
self._used = True
return self._expand(key_material)
def verify(self, key_material: bytes, expected_key: bytes) -> None:
if not constant_time.bytes_eq(self.derive(key_material), expected_key):
raise InvalidKey

View File

@ -0,0 +1,302 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import typing
from cryptography import utils
from cryptography.exceptions import (
AlreadyFinalized,
InvalidKey,
UnsupportedAlgorithm,
_Reasons,
)
from cryptography.hazmat.primitives import (
ciphers,
cmac,
constant_time,
hashes,
hmac,
)
from cryptography.hazmat.primitives.kdf import KeyDerivationFunction
class Mode(utils.Enum):
CounterMode = "ctr"
class CounterLocation(utils.Enum):
BeforeFixed = "before_fixed"
AfterFixed = "after_fixed"
MiddleFixed = "middle_fixed"
class _KBKDFDeriver:
def __init__(
self,
prf: typing.Callable,
mode: Mode,
length: int,
rlen: int,
llen: int | None,
location: CounterLocation,
break_location: int | None,
label: bytes | None,
context: bytes | None,
fixed: bytes | None,
):
assert callable(prf)
if not isinstance(mode, Mode):
raise TypeError("mode must be of type Mode")
if not isinstance(location, CounterLocation):
raise TypeError("location must be of type CounterLocation")
if break_location is None and location is CounterLocation.MiddleFixed:
raise ValueError("Please specify a break_location")
if (
break_location is not None
and location != CounterLocation.MiddleFixed
):
raise ValueError(
"break_location is ignored when location is not"
" CounterLocation.MiddleFixed"
)
if break_location is not None and not isinstance(break_location, int):
raise TypeError("break_location must be an integer")
if break_location is not None and break_location < 0:
raise ValueError("break_location must be a positive integer")
if (label or context) and fixed:
raise ValueError(
"When supplying fixed data, label and context are ignored."
)
if rlen is None or not self._valid_byte_length(rlen):
raise ValueError("rlen must be between 1 and 4")
if llen is None and fixed is None:
raise ValueError("Please specify an llen")
if llen is not None and not isinstance(llen, int):
raise TypeError("llen must be an integer")
if llen == 0:
raise ValueError("llen must be non-zero")
if label is None:
label = b""
if context is None:
context = b""
utils._check_bytes("label", label)
utils._check_bytes("context", context)
self._prf = prf
self._mode = mode
self._length = length
self._rlen = rlen
self._llen = llen
self._location = location
self._break_location = break_location
self._label = label
self._context = context
self._used = False
self._fixed_data = fixed
@staticmethod
def _valid_byte_length(value: int) -> bool:
if not isinstance(value, int):
raise TypeError("value must be of type int")
value_bin = utils.int_to_bytes(1, value)
if not 1 <= len(value_bin) <= 4:
return False
return True
def derive(self, key_material: bytes, prf_output_size: int) -> bytes:
if self._used:
raise AlreadyFinalized
utils._check_byteslike("key_material", key_material)
self._used = True
# inverse floor division (equivalent to ceiling)
rounds = -(-self._length // prf_output_size)
output = [b""]
# For counter mode, the number of iterations shall not be
# larger than 2^r-1, where r <= 32 is the binary length of the counter
# This ensures that the counter values used as an input to the
# PRF will not repeat during a particular call to the KDF function.
r_bin = utils.int_to_bytes(1, self._rlen)
if rounds > pow(2, len(r_bin) * 8) - 1:
raise ValueError("There are too many iterations.")
fixed = self._generate_fixed_input()
if self._location == CounterLocation.BeforeFixed:
data_before_ctr = b""
data_after_ctr = fixed
elif self._location == CounterLocation.AfterFixed:
data_before_ctr = fixed
data_after_ctr = b""
else:
if isinstance(
self._break_location, int
) and self._break_location > len(fixed):
raise ValueError("break_location offset > len(fixed)")
data_before_ctr = fixed[: self._break_location]
data_after_ctr = fixed[self._break_location :]
for i in range(1, rounds + 1):
h = self._prf(key_material)
counter = utils.int_to_bytes(i, self._rlen)
input_data = data_before_ctr + counter + data_after_ctr
h.update(input_data)
output.append(h.finalize())
return b"".join(output)[: self._length]
def _generate_fixed_input(self) -> bytes:
if self._fixed_data and isinstance(self._fixed_data, bytes):
return self._fixed_data
l_val = utils.int_to_bytes(self._length * 8, self._llen)
return b"".join([self._label, b"\x00", self._context, l_val])
class KBKDFHMAC(KeyDerivationFunction):
def __init__(
self,
algorithm: hashes.HashAlgorithm,
mode: Mode,
length: int,
rlen: int,
llen: int | None,
location: CounterLocation,
label: bytes | None,
context: bytes | None,
fixed: bytes | None,
backend: typing.Any = None,
*,
break_location: int | None = None,
):
if not isinstance(algorithm, hashes.HashAlgorithm):
raise UnsupportedAlgorithm(
"Algorithm supplied is not a supported hash algorithm.",
_Reasons.UNSUPPORTED_HASH,
)
from cryptography.hazmat.backends.openssl.backend import (
backend as ossl,
)
if not ossl.hmac_supported(algorithm):
raise UnsupportedAlgorithm(
"Algorithm supplied is not a supported hmac algorithm.",
_Reasons.UNSUPPORTED_HASH,
)
self._algorithm = algorithm
self._deriver = _KBKDFDeriver(
self._prf,
mode,
length,
rlen,
llen,
location,
break_location,
label,
context,
fixed,
)
def _prf(self, key_material: bytes) -> hmac.HMAC:
return hmac.HMAC(key_material, self._algorithm)
def derive(self, key_material: bytes) -> bytes:
return self._deriver.derive(key_material, self._algorithm.digest_size)
def verify(self, key_material: bytes, expected_key: bytes) -> None:
if not constant_time.bytes_eq(self.derive(key_material), expected_key):
raise InvalidKey
class KBKDFCMAC(KeyDerivationFunction):
def __init__(
self,
algorithm,
mode: Mode,
length: int,
rlen: int,
llen: int | None,
location: CounterLocation,
label: bytes | None,
context: bytes | None,
fixed: bytes | None,
backend: typing.Any = None,
*,
break_location: int | None = None,
):
if not issubclass(
algorithm, ciphers.BlockCipherAlgorithm
) or not issubclass(algorithm, ciphers.CipherAlgorithm):
raise UnsupportedAlgorithm(
"Algorithm supplied is not a supported cipher algorithm.",
_Reasons.UNSUPPORTED_CIPHER,
)
self._algorithm = algorithm
self._cipher: ciphers.BlockCipherAlgorithm | None = None
self._deriver = _KBKDFDeriver(
self._prf,
mode,
length,
rlen,
llen,
location,
break_location,
label,
context,
fixed,
)
def _prf(self, _: bytes) -> cmac.CMAC:
assert self._cipher is not None
return cmac.CMAC(self._cipher)
def derive(self, key_material: bytes) -> bytes:
self._cipher = self._algorithm(key_material)
assert self._cipher is not None
from cryptography.hazmat.backends.openssl.backend import (
backend as ossl,
)
if not ossl.cmac_algorithm_supported(self._cipher):
raise UnsupportedAlgorithm(
"Algorithm supplied is not a supported cipher algorithm.",
_Reasons.UNSUPPORTED_CIPHER,
)
return self._deriver.derive(key_material, self._cipher.block_size // 8)
def verify(self, key_material: bytes, expected_key: bytes) -> None:
if not constant_time.bytes_eq(self.derive(key_material), expected_key):
raise InvalidKey

View File

@ -0,0 +1,62 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import typing
from cryptography import utils
from cryptography.exceptions import (
AlreadyFinalized,
InvalidKey,
UnsupportedAlgorithm,
_Reasons,
)
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives import constant_time, hashes
from cryptography.hazmat.primitives.kdf import KeyDerivationFunction
class PBKDF2HMAC(KeyDerivationFunction):
def __init__(
self,
algorithm: hashes.HashAlgorithm,
length: int,
salt: bytes,
iterations: int,
backend: typing.Any = None,
):
from cryptography.hazmat.backends.openssl.backend import (
backend as ossl,
)
if not ossl.pbkdf2_hmac_supported(algorithm):
raise UnsupportedAlgorithm(
f"{algorithm.name} is not supported for PBKDF2.",
_Reasons.UNSUPPORTED_HASH,
)
self._used = False
self._algorithm = algorithm
self._length = length
utils._check_bytes("salt", salt)
self._salt = salt
self._iterations = iterations
def derive(self, key_material: bytes) -> bytes:
if self._used:
raise AlreadyFinalized("PBKDF2 instances can only be used once.")
self._used = True
return rust_openssl.kdf.derive_pbkdf2_hmac(
key_material,
self._algorithm,
self._salt,
self._iterations,
self._length,
)
def verify(self, key_material: bytes, expected_key: bytes) -> None:
derived_key = self.derive(key_material)
if not constant_time.bytes_eq(derived_key, expected_key):
raise InvalidKey("Keys do not match.")

View File

@ -0,0 +1,19 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import sys
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives.kdf import KeyDerivationFunction
# This is used by the scrypt tests to skip tests that require more memory
# than the MEM_LIMIT
_MEM_LIMIT = sys.maxsize // 2
Scrypt = rust_openssl.kdf.Scrypt
KeyDerivationFunction.register(Scrypt)
__all__ = ["Scrypt"]

View File

@ -0,0 +1,61 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import typing
from cryptography import utils
from cryptography.exceptions import AlreadyFinalized, InvalidKey
from cryptography.hazmat.primitives import constant_time, hashes
from cryptography.hazmat.primitives.kdf import KeyDerivationFunction
def _int_to_u32be(n: int) -> bytes:
return n.to_bytes(length=4, byteorder="big")
class X963KDF(KeyDerivationFunction):
def __init__(
self,
algorithm: hashes.HashAlgorithm,
length: int,
sharedinfo: bytes | None,
backend: typing.Any = None,
):
max_len = algorithm.digest_size * (2**32 - 1)
if length > max_len:
raise ValueError(f"Cannot derive keys larger than {max_len} bits.")
if sharedinfo is not None:
utils._check_bytes("sharedinfo", sharedinfo)
self._algorithm = algorithm
self._length = length
self._sharedinfo = sharedinfo
self._used = False
def derive(self, key_material: bytes) -> bytes:
if self._used:
raise AlreadyFinalized
self._used = True
utils._check_byteslike("key_material", key_material)
output = [b""]
outlen = 0
counter = 1
while self._length > outlen:
h = hashes.Hash(self._algorithm)
h.update(key_material)
h.update(_int_to_u32be(counter))
if self._sharedinfo is not None:
h.update(self._sharedinfo)
output.append(h.finalize())
outlen += len(output[-1])
counter += 1
return b"".join(output)[: self._length]
def verify(self, key_material: bytes, expected_key: bytes) -> None:
if not constant_time.bytes_eq(self.derive(key_material), expected_key):
raise InvalidKey

View File

@ -0,0 +1,177 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import typing
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.ciphers.modes import ECB
from cryptography.hazmat.primitives.constant_time import bytes_eq
def _wrap_core(
wrapping_key: bytes,
a: bytes,
r: list[bytes],
) -> bytes:
# RFC 3394 Key Wrap - 2.2.1 (index method)
encryptor = Cipher(AES(wrapping_key), ECB()).encryptor()
n = len(r)
for j in range(6):
for i in range(n):
# every encryption operation is a discrete 16 byte chunk (because
# AES has a 128-bit block size) and since we're using ECB it is
# safe to reuse the encryptor for the entire operation
b = encryptor.update(a + r[i])
a = (
int.from_bytes(b[:8], byteorder="big") ^ ((n * j) + i + 1)
).to_bytes(length=8, byteorder="big")
r[i] = b[-8:]
assert encryptor.finalize() == b""
return a + b"".join(r)
def aes_key_wrap(
wrapping_key: bytes,
key_to_wrap: bytes,
backend: typing.Any = None,
) -> bytes:
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
if len(key_to_wrap) < 16:
raise ValueError("The key to wrap must be at least 16 bytes")
if len(key_to_wrap) % 8 != 0:
raise ValueError("The key to wrap must be a multiple of 8 bytes")
a = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [key_to_wrap[i : i + 8] for i in range(0, len(key_to_wrap), 8)]
return _wrap_core(wrapping_key, a, r)
def _unwrap_core(
wrapping_key: bytes,
a: bytes,
r: list[bytes],
) -> tuple[bytes, list[bytes]]:
# Implement RFC 3394 Key Unwrap - 2.2.2 (index method)
decryptor = Cipher(AES(wrapping_key), ECB()).decryptor()
n = len(r)
for j in reversed(range(6)):
for i in reversed(range(n)):
atr = (
int.from_bytes(a, byteorder="big") ^ ((n * j) + i + 1)
).to_bytes(length=8, byteorder="big") + r[i]
# every decryption operation is a discrete 16 byte chunk so
# it is safe to reuse the decryptor for the entire operation
b = decryptor.update(atr)
a = b[:8]
r[i] = b[-8:]
assert decryptor.finalize() == b""
return a, r
def aes_key_wrap_with_padding(
wrapping_key: bytes,
key_to_wrap: bytes,
backend: typing.Any = None,
) -> bytes:
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
aiv = b"\xa6\x59\x59\xa6" + len(key_to_wrap).to_bytes(
length=4, byteorder="big"
)
# pad the key to wrap if necessary
pad = (8 - (len(key_to_wrap) % 8)) % 8
key_to_wrap = key_to_wrap + b"\x00" * pad
if len(key_to_wrap) == 8:
# RFC 5649 - 4.1 - exactly 8 octets after padding
encryptor = Cipher(AES(wrapping_key), ECB()).encryptor()
b = encryptor.update(aiv + key_to_wrap)
assert encryptor.finalize() == b""
return b
else:
r = [key_to_wrap[i : i + 8] for i in range(0, len(key_to_wrap), 8)]
return _wrap_core(wrapping_key, aiv, r)
def aes_key_unwrap_with_padding(
wrapping_key: bytes,
wrapped_key: bytes,
backend: typing.Any = None,
) -> bytes:
if len(wrapped_key) < 16:
raise InvalidUnwrap("Must be at least 16 bytes")
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
if len(wrapped_key) == 16:
# RFC 5649 - 4.2 - exactly two 64-bit blocks
decryptor = Cipher(AES(wrapping_key), ECB()).decryptor()
out = decryptor.update(wrapped_key)
assert decryptor.finalize() == b""
a = out[:8]
data = out[8:]
n = 1
else:
r = [wrapped_key[i : i + 8] for i in range(0, len(wrapped_key), 8)]
encrypted_aiv = r.pop(0)
n = len(r)
a, r = _unwrap_core(wrapping_key, encrypted_aiv, r)
data = b"".join(r)
# 1) Check that MSB(32,A) = A65959A6.
# 2) Check that 8*(n-1) < LSB(32,A) <= 8*n. If so, let
# MLI = LSB(32,A).
# 3) Let b = (8*n)-MLI, and then check that the rightmost b octets of
# the output data are zero.
mli = int.from_bytes(a[4:], byteorder="big")
b = (8 * n) - mli
if (
not bytes_eq(a[:4], b"\xa6\x59\x59\xa6")
or not 8 * (n - 1) < mli <= 8 * n
or (b != 0 and not bytes_eq(data[-b:], b"\x00" * b))
):
raise InvalidUnwrap()
if b == 0:
return data
else:
return data[:-b]
def aes_key_unwrap(
wrapping_key: bytes,
wrapped_key: bytes,
backend: typing.Any = None,
) -> bytes:
if len(wrapped_key) < 24:
raise InvalidUnwrap("Must be at least 24 bytes")
if len(wrapped_key) % 8 != 0:
raise InvalidUnwrap("The wrapped key must be a multiple of 8 bytes")
if len(wrapping_key) not in [16, 24, 32]:
raise ValueError("The wrapping key must be a valid AES key length")
aiv = b"\xa6\xa6\xa6\xa6\xa6\xa6\xa6\xa6"
r = [wrapped_key[i : i + 8] for i in range(0, len(wrapped_key), 8)]
a = r.pop(0)
a, r = _unwrap_core(wrapping_key, a, r)
if not bytes_eq(a, aiv):
raise InvalidUnwrap()
return b"".join(r)
class InvalidUnwrap(Exception):
pass

View File

@ -0,0 +1,183 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
import typing
from cryptography import utils
from cryptography.exceptions import AlreadyFinalized
from cryptography.hazmat.bindings._rust import (
PKCS7PaddingContext,
PKCS7UnpaddingContext,
check_ansix923_padding,
)
class PaddingContext(metaclass=abc.ABCMeta):
@abc.abstractmethod
def update(self, data: bytes) -> bytes:
"""
Pads the provided bytes and returns any available data as bytes.
"""
@abc.abstractmethod
def finalize(self) -> bytes:
"""
Finalize the padding, returns bytes.
"""
def _byte_padding_check(block_size: int) -> None:
if not (0 <= block_size <= 2040):
raise ValueError("block_size must be in range(0, 2041).")
if block_size % 8 != 0:
raise ValueError("block_size must be a multiple of 8.")
def _byte_padding_update(
buffer_: bytes | None, data: bytes, block_size: int
) -> tuple[bytes, bytes]:
if buffer_ is None:
raise AlreadyFinalized("Context was already finalized.")
utils._check_byteslike("data", data)
buffer_ += bytes(data)
finished_blocks = len(buffer_) // (block_size // 8)
result = buffer_[: finished_blocks * (block_size // 8)]
buffer_ = buffer_[finished_blocks * (block_size // 8) :]
return buffer_, result
def _byte_padding_pad(
buffer_: bytes | None,
block_size: int,
paddingfn: typing.Callable[[int], bytes],
) -> bytes:
if buffer_ is None:
raise AlreadyFinalized("Context was already finalized.")
pad_size = block_size // 8 - len(buffer_)
return buffer_ + paddingfn(pad_size)
def _byte_unpadding_update(
buffer_: bytes | None, data: bytes, block_size: int
) -> tuple[bytes, bytes]:
if buffer_ is None:
raise AlreadyFinalized("Context was already finalized.")
utils._check_byteslike("data", data)
buffer_ += bytes(data)
finished_blocks = max(len(buffer_) // (block_size // 8) - 1, 0)
result = buffer_[: finished_blocks * (block_size // 8)]
buffer_ = buffer_[finished_blocks * (block_size // 8) :]
return buffer_, result
def _byte_unpadding_check(
buffer_: bytes | None,
block_size: int,
checkfn: typing.Callable[[bytes], int],
) -> bytes:
if buffer_ is None:
raise AlreadyFinalized("Context was already finalized.")
if len(buffer_) != block_size // 8:
raise ValueError("Invalid padding bytes.")
valid = checkfn(buffer_)
if not valid:
raise ValueError("Invalid padding bytes.")
pad_size = buffer_[-1]
return buffer_[:-pad_size]
class PKCS7:
def __init__(self, block_size: int):
_byte_padding_check(block_size)
self.block_size = block_size
def padder(self) -> PaddingContext:
return PKCS7PaddingContext(self.block_size)
def unpadder(self) -> PaddingContext:
return PKCS7UnpaddingContext(self.block_size)
PaddingContext.register(PKCS7PaddingContext)
PaddingContext.register(PKCS7UnpaddingContext)
class ANSIX923:
def __init__(self, block_size: int):
_byte_padding_check(block_size)
self.block_size = block_size
def padder(self) -> PaddingContext:
return _ANSIX923PaddingContext(self.block_size)
def unpadder(self) -> PaddingContext:
return _ANSIX923UnpaddingContext(self.block_size)
class _ANSIX923PaddingContext(PaddingContext):
_buffer: bytes | None
def __init__(self, block_size: int):
self.block_size = block_size
# TODO: more copies than necessary, we should use zero-buffer (#193)
self._buffer = b""
def update(self, data: bytes) -> bytes:
self._buffer, result = _byte_padding_update(
self._buffer, data, self.block_size
)
return result
def _padding(self, size: int) -> bytes:
return bytes([0]) * (size - 1) + bytes([size])
def finalize(self) -> bytes:
result = _byte_padding_pad(
self._buffer, self.block_size, self._padding
)
self._buffer = None
return result
class _ANSIX923UnpaddingContext(PaddingContext):
_buffer: bytes | None
def __init__(self, block_size: int):
self.block_size = block_size
# TODO: more copies than necessary, we should use zero-buffer (#193)
self._buffer = b""
def update(self, data: bytes) -> bytes:
self._buffer, result = _byte_unpadding_update(
self._buffer, data, self.block_size
)
return result
def finalize(self) -> bytes:
result = _byte_unpadding_check(
self._buffer,
self.block_size,
check_ansix923_padding,
)
self._buffer = None
return result

View File

@ -0,0 +1,11 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
__all__ = ["Poly1305"]
Poly1305 = rust_openssl.poly1305.Poly1305

View File

@ -0,0 +1,63 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
from cryptography.hazmat.primitives._serialization import (
BestAvailableEncryption,
Encoding,
KeySerializationEncryption,
NoEncryption,
ParameterFormat,
PrivateFormat,
PublicFormat,
_KeySerializationEncryption,
)
from cryptography.hazmat.primitives.serialization.base import (
load_der_parameters,
load_der_private_key,
load_der_public_key,
load_pem_parameters,
load_pem_private_key,
load_pem_public_key,
)
from cryptography.hazmat.primitives.serialization.ssh import (
SSHCertificate,
SSHCertificateBuilder,
SSHCertificateType,
SSHCertPrivateKeyTypes,
SSHCertPublicKeyTypes,
SSHPrivateKeyTypes,
SSHPublicKeyTypes,
load_ssh_private_key,
load_ssh_public_identity,
load_ssh_public_key,
)
__all__ = [
"BestAvailableEncryption",
"Encoding",
"KeySerializationEncryption",
"NoEncryption",
"ParameterFormat",
"PrivateFormat",
"PublicFormat",
"SSHCertPrivateKeyTypes",
"SSHCertPublicKeyTypes",
"SSHCertificate",
"SSHCertificateBuilder",
"SSHCertificateType",
"SSHPrivateKeyTypes",
"SSHPublicKeyTypes",
"_KeySerializationEncryption",
"load_der_parameters",
"load_der_private_key",
"load_der_public_key",
"load_pem_parameters",
"load_pem_private_key",
"load_pem_public_key",
"load_ssh_private_key",
"load_ssh_public_identity",
"load_ssh_public_key",
]

View File

@ -0,0 +1,14 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
load_pem_private_key = rust_openssl.keys.load_pem_private_key
load_der_private_key = rust_openssl.keys.load_der_private_key
load_pem_public_key = rust_openssl.keys.load_pem_public_key
load_der_public_key = rust_openssl.keys.load_der_public_key
load_pem_parameters = rust_openssl.dh.from_pem_parameters
load_der_parameters = rust_openssl.dh.from_der_parameters

View File

@ -0,0 +1,156 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import typing
from cryptography import x509
from cryptography.hazmat.bindings._rust import pkcs12 as rust_pkcs12
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives._serialization import PBES as PBES
from cryptography.hazmat.primitives.asymmetric import (
dsa,
ec,
ed448,
ed25519,
rsa,
)
from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
__all__ = [
"PBES",
"PKCS12Certificate",
"PKCS12KeyAndCertificates",
"PKCS12PrivateKeyTypes",
"load_key_and_certificates",
"load_pkcs12",
"serialize_key_and_certificates",
]
PKCS12PrivateKeyTypes = typing.Union[
rsa.RSAPrivateKey,
dsa.DSAPrivateKey,
ec.EllipticCurvePrivateKey,
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
]
PKCS12Certificate = rust_pkcs12.PKCS12Certificate
class PKCS12KeyAndCertificates:
def __init__(
self,
key: PrivateKeyTypes | None,
cert: PKCS12Certificate | None,
additional_certs: list[PKCS12Certificate],
):
if key is not None and not isinstance(
key,
(
rsa.RSAPrivateKey,
dsa.DSAPrivateKey,
ec.EllipticCurvePrivateKey,
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
),
):
raise TypeError(
"Key must be RSA, DSA, EllipticCurve, ED25519, or ED448"
" private key, or None."
)
if cert is not None and not isinstance(cert, PKCS12Certificate):
raise TypeError("cert must be a PKCS12Certificate object or None")
if not all(
isinstance(add_cert, PKCS12Certificate)
for add_cert in additional_certs
):
raise TypeError(
"all values in additional_certs must be PKCS12Certificate"
" objects"
)
self._key = key
self._cert = cert
self._additional_certs = additional_certs
@property
def key(self) -> PrivateKeyTypes | None:
return self._key
@property
def cert(self) -> PKCS12Certificate | None:
return self._cert
@property
def additional_certs(self) -> list[PKCS12Certificate]:
return self._additional_certs
def __eq__(self, other: object) -> bool:
if not isinstance(other, PKCS12KeyAndCertificates):
return NotImplemented
return (
self.key == other.key
and self.cert == other.cert
and self.additional_certs == other.additional_certs
)
def __hash__(self) -> int:
return hash((self.key, self.cert, tuple(self.additional_certs)))
def __repr__(self) -> str:
fmt = (
"<PKCS12KeyAndCertificates(key={}, cert={}, additional_certs={})>"
)
return fmt.format(self.key, self.cert, self.additional_certs)
load_key_and_certificates = rust_pkcs12.load_key_and_certificates
load_pkcs12 = rust_pkcs12.load_pkcs12
_PKCS12CATypes = typing.Union[
x509.Certificate,
PKCS12Certificate,
]
def serialize_key_and_certificates(
name: bytes | None,
key: PKCS12PrivateKeyTypes | None,
cert: x509.Certificate | None,
cas: typing.Iterable[_PKCS12CATypes] | None,
encryption_algorithm: serialization.KeySerializationEncryption,
) -> bytes:
if key is not None and not isinstance(
key,
(
rsa.RSAPrivateKey,
dsa.DSAPrivateKey,
ec.EllipticCurvePrivateKey,
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
),
):
raise TypeError(
"Key must be RSA, DSA, EllipticCurve, ED25519, or ED448"
" private key, or None."
)
if not isinstance(
encryption_algorithm, serialization.KeySerializationEncryption
):
raise TypeError(
"Key encryption algorithm must be a "
"KeySerializationEncryption instance"
)
if key is None and cert is None and not cas:
raise ValueError("You must supply at least one of key, cert, or cas")
return rust_pkcs12.serialize_key_and_certificates(
name, key, cert, cas, encryption_algorithm
)

View File

@ -0,0 +1,369 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import email.base64mime
import email.generator
import email.message
import email.policy
import io
import typing
from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.bindings._rust import pkcs7 as rust_pkcs7
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
from cryptography.utils import _check_byteslike
load_pem_pkcs7_certificates = rust_pkcs7.load_pem_pkcs7_certificates
load_der_pkcs7_certificates = rust_pkcs7.load_der_pkcs7_certificates
serialize_certificates = rust_pkcs7.serialize_certificates
PKCS7HashTypes = typing.Union[
hashes.SHA224,
hashes.SHA256,
hashes.SHA384,
hashes.SHA512,
]
PKCS7PrivateKeyTypes = typing.Union[
rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey
]
class PKCS7Options(utils.Enum):
Text = "Add text/plain MIME type"
Binary = "Don't translate input data into canonical MIME format"
DetachedSignature = "Don't embed data in the PKCS7 structure"
NoCapabilities = "Don't embed SMIME capabilities"
NoAttributes = "Don't embed authenticatedAttributes"
NoCerts = "Don't embed signer certificate"
class PKCS7SignatureBuilder:
def __init__(
self,
data: bytes | None = None,
signers: list[
tuple[
x509.Certificate,
PKCS7PrivateKeyTypes,
PKCS7HashTypes,
padding.PSS | padding.PKCS1v15 | None,
]
] = [],
additional_certs: list[x509.Certificate] = [],
):
self._data = data
self._signers = signers
self._additional_certs = additional_certs
def set_data(self, data: bytes) -> PKCS7SignatureBuilder:
_check_byteslike("data", data)
if self._data is not None:
raise ValueError("data may only be set once")
return PKCS7SignatureBuilder(data, self._signers)
def add_signer(
self,
certificate: x509.Certificate,
private_key: PKCS7PrivateKeyTypes,
hash_algorithm: PKCS7HashTypes,
*,
rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
) -> PKCS7SignatureBuilder:
if not isinstance(
hash_algorithm,
(
hashes.SHA224,
hashes.SHA256,
hashes.SHA384,
hashes.SHA512,
),
):
raise TypeError(
"hash_algorithm must be one of hashes.SHA224, "
"SHA256, SHA384, or SHA512"
)
if not isinstance(certificate, x509.Certificate):
raise TypeError("certificate must be a x509.Certificate")
if not isinstance(
private_key, (rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey)
):
raise TypeError("Only RSA & EC keys are supported at this time.")
if rsa_padding is not None:
if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
raise TypeError("Padding must be PSS or PKCS1v15")
if not isinstance(private_key, rsa.RSAPrivateKey):
raise TypeError("Padding is only supported for RSA keys")
return PKCS7SignatureBuilder(
self._data,
[
*self._signers,
(certificate, private_key, hash_algorithm, rsa_padding),
],
)
def add_certificate(
self, certificate: x509.Certificate
) -> PKCS7SignatureBuilder:
if not isinstance(certificate, x509.Certificate):
raise TypeError("certificate must be a x509.Certificate")
return PKCS7SignatureBuilder(
self._data, self._signers, [*self._additional_certs, certificate]
)
def sign(
self,
encoding: serialization.Encoding,
options: typing.Iterable[PKCS7Options],
backend: typing.Any = None,
) -> bytes:
if len(self._signers) == 0:
raise ValueError("Must have at least one signer")
if self._data is None:
raise ValueError("You must add data to sign")
options = list(options)
if not all(isinstance(x, PKCS7Options) for x in options):
raise ValueError("options must be from the PKCS7Options enum")
if encoding not in (
serialization.Encoding.PEM,
serialization.Encoding.DER,
serialization.Encoding.SMIME,
):
raise ValueError(
"Must be PEM, DER, or SMIME from the Encoding enum"
)
# Text is a meaningless option unless it is accompanied by
# DetachedSignature
if (
PKCS7Options.Text in options
and PKCS7Options.DetachedSignature not in options
):
raise ValueError(
"When passing the Text option you must also pass "
"DetachedSignature"
)
if PKCS7Options.Text in options and encoding in (
serialization.Encoding.DER,
serialization.Encoding.PEM,
):
raise ValueError(
"The Text option is only available for SMIME serialization"
)
# No attributes implies no capabilities so we'll error if you try to
# pass both.
if (
PKCS7Options.NoAttributes in options
and PKCS7Options.NoCapabilities in options
):
raise ValueError(
"NoAttributes is a superset of NoCapabilities. Do not pass "
"both values."
)
return rust_pkcs7.sign_and_serialize(self, encoding, options)
class PKCS7EnvelopeBuilder:
def __init__(
self,
*,
_data: bytes | None = None,
_recipients: list[x509.Certificate] | None = None,
):
from cryptography.hazmat.backends.openssl.backend import (
backend as ossl,
)
if not ossl.rsa_encryption_supported(padding=padding.PKCS1v15()):
raise UnsupportedAlgorithm(
"RSA with PKCS1 v1.5 padding is not supported by this version"
" of OpenSSL.",
_Reasons.UNSUPPORTED_PADDING,
)
self._data = _data
self._recipients = _recipients if _recipients is not None else []
def set_data(self, data: bytes) -> PKCS7EnvelopeBuilder:
_check_byteslike("data", data)
if self._data is not None:
raise ValueError("data may only be set once")
return PKCS7EnvelopeBuilder(_data=data, _recipients=self._recipients)
def add_recipient(
self,
certificate: x509.Certificate,
) -> PKCS7EnvelopeBuilder:
if not isinstance(certificate, x509.Certificate):
raise TypeError("certificate must be a x509.Certificate")
if not isinstance(certificate.public_key(), rsa.RSAPublicKey):
raise TypeError("Only RSA keys are supported at this time.")
return PKCS7EnvelopeBuilder(
_data=self._data,
_recipients=[
*self._recipients,
certificate,
],
)
def encrypt(
self,
encoding: serialization.Encoding,
options: typing.Iterable[PKCS7Options],
) -> bytes:
if len(self._recipients) == 0:
raise ValueError("Must have at least one recipient")
if self._data is None:
raise ValueError("You must add data to encrypt")
options = list(options)
if not all(isinstance(x, PKCS7Options) for x in options):
raise ValueError("options must be from the PKCS7Options enum")
if encoding not in (
serialization.Encoding.PEM,
serialization.Encoding.DER,
serialization.Encoding.SMIME,
):
raise ValueError(
"Must be PEM, DER, or SMIME from the Encoding enum"
)
# Only allow options that make sense for encryption
if any(
opt not in [PKCS7Options.Text, PKCS7Options.Binary]
for opt in options
):
raise ValueError(
"Only the following options are supported for encryption: "
"Text, Binary"
)
elif PKCS7Options.Text in options and PKCS7Options.Binary in options:
# OpenSSL accepts both options at the same time, but ignores Text.
# We fail defensively to avoid unexpected outputs.
raise ValueError(
"Cannot use Binary and Text options at the same time"
)
return rust_pkcs7.encrypt_and_serialize(self, encoding, options)
pkcs7_decrypt_der = rust_pkcs7.decrypt_der
pkcs7_decrypt_pem = rust_pkcs7.decrypt_pem
pkcs7_decrypt_smime = rust_pkcs7.decrypt_smime
def _smime_signed_encode(
data: bytes, signature: bytes, micalg: str, text_mode: bool
) -> bytes:
# This function works pretty hard to replicate what OpenSSL does
# precisely. For good and for ill.
m = email.message.Message()
m.add_header("MIME-Version", "1.0")
m.add_header(
"Content-Type",
"multipart/signed",
protocol="application/x-pkcs7-signature",
micalg=micalg,
)
m.preamble = "This is an S/MIME signed message\n"
msg_part = OpenSSLMimePart()
msg_part.set_payload(data)
if text_mode:
msg_part.add_header("Content-Type", "text/plain")
m.attach(msg_part)
sig_part = email.message.MIMEPart()
sig_part.add_header(
"Content-Type", "application/x-pkcs7-signature", name="smime.p7s"
)
sig_part.add_header("Content-Transfer-Encoding", "base64")
sig_part.add_header(
"Content-Disposition", "attachment", filename="smime.p7s"
)
sig_part.set_payload(
email.base64mime.body_encode(signature, maxlinelen=65)
)
del sig_part["MIME-Version"]
m.attach(sig_part)
fp = io.BytesIO()
g = email.generator.BytesGenerator(
fp,
maxheaderlen=0,
mangle_from_=False,
policy=m.policy.clone(linesep="\r\n"),
)
g.flatten(m)
return fp.getvalue()
def _smime_enveloped_encode(data: bytes) -> bytes:
m = email.message.Message()
m.add_header("MIME-Version", "1.0")
m.add_header("Content-Disposition", "attachment", filename="smime.p7m")
m.add_header(
"Content-Type",
"application/pkcs7-mime",
smime_type="enveloped-data",
name="smime.p7m",
)
m.add_header("Content-Transfer-Encoding", "base64")
m.set_payload(email.base64mime.body_encode(data, maxlinelen=65))
return m.as_bytes(policy=m.policy.clone(linesep="\n", max_line_length=0))
def _smime_enveloped_decode(data: bytes) -> bytes:
m = email.message_from_bytes(data)
if m.get_content_type() not in {
"application/x-pkcs7-mime",
"application/pkcs7-mime",
}:
raise ValueError("Not an S/MIME enveloped message")
return bytes(m.get_payload(decode=True))
def _smime_remove_text_headers(data: bytes) -> bytes:
m = email.message_from_bytes(data)
# Using get() instead of get_content_type() since it has None as default,
# where the latter has "text/plain". Both methods are case-insensitive.
content_type = m.get("content-type")
if content_type is None:
raise ValueError(
"Decrypted MIME data has no 'Content-Type' header. "
"Please remove the 'Text' option to parse it manually."
)
if "text/plain" not in content_type:
raise ValueError(
f"Decrypted MIME data content type is '{content_type}', not "
"'text/plain'. Remove the 'Text' option to parse it manually."
)
return bytes(m.get_payload(decode=True))
class OpenSSLMimePart(email.message.MIMEPart):
# A MIMEPart subclass that replicates OpenSSL's behavior of not including
# a newline if there are no headers.
def _write_headers(self, generator) -> None:
if list(self.raw_items()):
generator._write_headers(self)

View File

@ -0,0 +1,9 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
class InvalidToken(Exception):
pass

View File

@ -0,0 +1,100 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import base64
import typing
from urllib.parse import quote, urlencode
from cryptography.hazmat.primitives import constant_time, hmac
from cryptography.hazmat.primitives.hashes import SHA1, SHA256, SHA512
from cryptography.hazmat.primitives.twofactor import InvalidToken
HOTPHashTypes = typing.Union[SHA1, SHA256, SHA512]
def _generate_uri(
hotp: HOTP,
type_name: str,
account_name: str,
issuer: str | None,
extra_parameters: list[tuple[str, int]],
) -> str:
parameters = [
("digits", hotp._length),
("secret", base64.b32encode(hotp._key)),
("algorithm", hotp._algorithm.name.upper()),
]
if issuer is not None:
parameters.append(("issuer", issuer))
parameters.extend(extra_parameters)
label = (
f"{quote(issuer)}:{quote(account_name)}"
if issuer
else quote(account_name)
)
return f"otpauth://{type_name}/{label}?{urlencode(parameters)}"
class HOTP:
def __init__(
self,
key: bytes,
length: int,
algorithm: HOTPHashTypes,
backend: typing.Any = None,
enforce_key_length: bool = True,
) -> None:
if len(key) < 16 and enforce_key_length is True:
raise ValueError("Key length has to be at least 128 bits.")
if not isinstance(length, int):
raise TypeError("Length parameter must be an integer type.")
if length < 6 or length > 8:
raise ValueError("Length of HOTP has to be between 6 and 8.")
if not isinstance(algorithm, (SHA1, SHA256, SHA512)):
raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.")
self._key = key
self._length = length
self._algorithm = algorithm
def generate(self, counter: int) -> bytes:
if not isinstance(counter, int):
raise TypeError("Counter parameter must be an integer type.")
truncated_value = self._dynamic_truncate(counter)
hotp = truncated_value % (10**self._length)
return "{0:0{1}}".format(hotp, self._length).encode()
def verify(self, hotp: bytes, counter: int) -> None:
if not constant_time.bytes_eq(self.generate(counter), hotp):
raise InvalidToken("Supplied HOTP value does not match.")
def _dynamic_truncate(self, counter: int) -> int:
ctx = hmac.HMAC(self._key, self._algorithm)
try:
ctx.update(counter.to_bytes(length=8, byteorder="big"))
except OverflowError:
raise ValueError(f"Counter must be between 0 and {2 ** 64 - 1}.")
hmac_value = ctx.finalize()
offset = hmac_value[len(hmac_value) - 1] & 0b1111
p = hmac_value[offset : offset + 4]
return int.from_bytes(p, byteorder="big") & 0x7FFFFFFF
def get_provisioning_uri(
self, account_name: str, counter: int, issuer: str | None
) -> str:
return _generate_uri(
self, "hotp", account_name, issuer, [("counter", int(counter))]
)

View File

@ -0,0 +1,55 @@
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import typing
from cryptography.hazmat.primitives import constant_time
from cryptography.hazmat.primitives.twofactor import InvalidToken
from cryptography.hazmat.primitives.twofactor.hotp import (
HOTP,
HOTPHashTypes,
_generate_uri,
)
class TOTP:
def __init__(
self,
key: bytes,
length: int,
algorithm: HOTPHashTypes,
time_step: int,
backend: typing.Any = None,
enforce_key_length: bool = True,
):
self._time_step = time_step
self._hotp = HOTP(
key, length, algorithm, enforce_key_length=enforce_key_length
)
def generate(self, time: int | float) -> bytes:
if not isinstance(time, (int, float)):
raise TypeError(
"Time parameter must be an integer type or float type."
)
counter = int(time / self._time_step)
return self._hotp.generate(counter)
def verify(self, totp: bytes, time: int) -> None:
if not constant_time.bytes_eq(self.generate(time), totp):
raise InvalidToken("Supplied TOTP value does not match.")
def get_provisioning_uri(
self, account_name: str, issuer: str | None
) -> str:
return _generate_uri(
self._hotp,
"totp",
account_name,
issuer,
[("period", int(self._time_step))],
)