mirror of
				https://gitlab.sectorq.eu/jaydee/omv_backup.git
				synced 2025-10-31 10:31:11 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			816 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			816 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| # This file is dual licensed under the terms of the Apache License, Version
 | |
| # 2.0, and the BSD License. See the LICENSE file in the root of this repository
 | |
| # for complete details.
 | |
| 
 | |
| from __future__ import annotations
 | |
| 
 | |
| import abc
 | |
| import datetime
 | |
| import os
 | |
| import typing
 | |
| import warnings
 | |
| 
 | |
| from cryptography import utils
 | |
| from cryptography.hazmat.bindings._rust import x509 as rust_x509
 | |
| from cryptography.hazmat.primitives import hashes
 | |
| from cryptography.hazmat.primitives.asymmetric import (
 | |
|     dsa,
 | |
|     ec,
 | |
|     ed448,
 | |
|     ed25519,
 | |
|     padding,
 | |
|     rsa,
 | |
|     x448,
 | |
|     x25519,
 | |
| )
 | |
| from cryptography.hazmat.primitives.asymmetric.types import (
 | |
|     CertificateIssuerPrivateKeyTypes,
 | |
|     CertificatePublicKeyTypes,
 | |
| )
 | |
| from cryptography.x509.extensions import (
 | |
|     Extension,
 | |
|     Extensions,
 | |
|     ExtensionType,
 | |
|     _make_sequence_methods,
 | |
| )
 | |
| from cryptography.x509.name import Name, _ASN1Type
 | |
| from cryptography.x509.oid import ObjectIdentifier
 | |
| 
 | |
| _EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1)
 | |
| 
 | |
| # This must be kept in sync with sign.rs's list of allowable types in
 | |
| # identify_hash_type
 | |
| _AllowedHashTypes = typing.Union[
 | |
|     hashes.SHA224,
 | |
|     hashes.SHA256,
 | |
|     hashes.SHA384,
 | |
|     hashes.SHA512,
 | |
|     hashes.SHA3_224,
 | |
|     hashes.SHA3_256,
 | |
|     hashes.SHA3_384,
 | |
|     hashes.SHA3_512,
 | |
| ]
 | |
| 
 | |
| 
 | |
| class AttributeNotFound(Exception):
 | |
|     def __init__(self, msg: str, oid: ObjectIdentifier) -> None:
 | |
|         super().__init__(msg)
 | |
|         self.oid = oid
 | |
| 
 | |
| 
 | |
| def _reject_duplicate_extension(
 | |
|     extension: Extension[ExtensionType],
 | |
|     extensions: list[Extension[ExtensionType]],
 | |
| ) -> None:
 | |
|     # This is quadratic in the number of extensions
 | |
|     for e in extensions:
 | |
|         if e.oid == extension.oid:
 | |
|             raise ValueError("This extension has already been set.")
 | |
| 
 | |
| 
 | |
| def _reject_duplicate_attribute(
 | |
|     oid: ObjectIdentifier,
 | |
|     attributes: list[tuple[ObjectIdentifier, bytes, int | None]],
 | |
| ) -> None:
 | |
|     # This is quadratic in the number of attributes
 | |
|     for attr_oid, _, _ in attributes:
 | |
|         if attr_oid == oid:
 | |
|             raise ValueError("This attribute has already been set.")
 | |
| 
 | |
| 
 | |
| def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime:
 | |
|     """Normalizes a datetime to a naive datetime in UTC.
 | |
| 
 | |
|     time -- datetime to normalize. Assumed to be in UTC if not timezone
 | |
|             aware.
 | |
|     """
 | |
|     if time.tzinfo is not None:
 | |
|         offset = time.utcoffset()
 | |
|         offset = offset if offset else datetime.timedelta()
 | |
|         return time.replace(tzinfo=None) - offset
 | |
|     else:
 | |
|         return time
 | |
| 
 | |
| 
 | |
| class Attribute:
 | |
|     def __init__(
 | |
|         self,
 | |
|         oid: ObjectIdentifier,
 | |
|         value: bytes,
 | |
|         _type: int = _ASN1Type.UTF8String.value,
 | |
|     ) -> None:
 | |
|         self._oid = oid
 | |
|         self._value = value
 | |
|         self._type = _type
 | |
| 
 | |
|     @property
 | |
|     def oid(self) -> ObjectIdentifier:
 | |
|         return self._oid
 | |
| 
 | |
|     @property
 | |
|     def value(self) -> bytes:
 | |
|         return self._value
 | |
| 
 | |
|     def __repr__(self) -> str:
 | |
|         return f"<Attribute(oid={self.oid}, value={self.value!r})>"
 | |
| 
 | |
|     def __eq__(self, other: object) -> bool:
 | |
|         if not isinstance(other, Attribute):
 | |
|             return NotImplemented
 | |
| 
 | |
|         return (
 | |
|             self.oid == other.oid
 | |
|             and self.value == other.value
 | |
|             and self._type == other._type
 | |
|         )
 | |
| 
 | |
|     def __hash__(self) -> int:
 | |
|         return hash((self.oid, self.value, self._type))
 | |
| 
 | |
| 
 | |
| class Attributes:
 | |
|     def __init__(
 | |
|         self,
 | |
|         attributes: typing.Iterable[Attribute],
 | |
|     ) -> None:
 | |
|         self._attributes = list(attributes)
 | |
| 
 | |
|     __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes")
 | |
| 
 | |
|     def __repr__(self) -> str:
 | |
|         return f"<Attributes({self._attributes})>"
 | |
| 
 | |
|     def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute:
 | |
|         for attr in self:
 | |
|             if attr.oid == oid:
 | |
|                 return attr
 | |
| 
 | |
|         raise AttributeNotFound(f"No {oid} attribute was found", oid)
 | |
| 
 | |
| 
 | |
| class Version(utils.Enum):
 | |
|     v1 = 0
 | |
|     v3 = 2
 | |
| 
 | |
| 
 | |
| class InvalidVersion(Exception):
 | |
|     def __init__(self, msg: str, parsed_version: int) -> None:
 | |
|         super().__init__(msg)
 | |
|         self.parsed_version = parsed_version
 | |
| 
 | |
| 
 | |
| Certificate = rust_x509.Certificate
 | |
| 
 | |
| 
 | |
| class RevokedCertificate(metaclass=abc.ABCMeta):
 | |
|     @property
 | |
|     @abc.abstractmethod
 | |
|     def serial_number(self) -> int:
 | |
|         """
 | |
|         Returns the serial number of the revoked certificate.
 | |
|         """
 | |
| 
 | |
|     @property
 | |
|     @abc.abstractmethod
 | |
|     def revocation_date(self) -> datetime.datetime:
 | |
|         """
 | |
|         Returns the date of when this certificate was revoked.
 | |
|         """
 | |
| 
 | |
|     @property
 | |
|     @abc.abstractmethod
 | |
|     def revocation_date_utc(self) -> datetime.datetime:
 | |
|         """
 | |
|         Returns the date of when this certificate was revoked as a non-naive
 | |
|         UTC datetime.
 | |
|         """
 | |
| 
 | |
|     @property
 | |
|     @abc.abstractmethod
 | |
|     def extensions(self) -> Extensions:
 | |
|         """
 | |
|         Returns an Extensions object containing a list of Revoked extensions.
 | |
|         """
 | |
| 
 | |
| 
 | |
| # Runtime isinstance checks need this since the rust class is not a subclass.
 | |
| RevokedCertificate.register(rust_x509.RevokedCertificate)
 | |
| 
 | |
| 
 | |
| class _RawRevokedCertificate(RevokedCertificate):
 | |
|     def __init__(
 | |
|         self,
 | |
|         serial_number: int,
 | |
|         revocation_date: datetime.datetime,
 | |
|         extensions: Extensions,
 | |
|     ):
 | |
|         self._serial_number = serial_number
 | |
|         self._revocation_date = revocation_date
 | |
|         self._extensions = extensions
 | |
| 
 | |
|     @property
 | |
|     def serial_number(self) -> int:
 | |
|         return self._serial_number
 | |
| 
 | |
|     @property
 | |
|     def revocation_date(self) -> datetime.datetime:
 | |
|         warnings.warn(
 | |
|             "Properties that return a naïve datetime object have been "
 | |
|             "deprecated. Please switch to revocation_date_utc.",
 | |
|             utils.DeprecatedIn42,
 | |
|             stacklevel=2,
 | |
|         )
 | |
|         return self._revocation_date
 | |
| 
 | |
|     @property
 | |
|     def revocation_date_utc(self) -> datetime.datetime:
 | |
|         return self._revocation_date.replace(tzinfo=datetime.timezone.utc)
 | |
| 
 | |
|     @property
 | |
|     def extensions(self) -> Extensions:
 | |
|         return self._extensions
 | |
| 
 | |
| 
 | |
| CertificateRevocationList = rust_x509.CertificateRevocationList
 | |
| CertificateSigningRequest = rust_x509.CertificateSigningRequest
 | |
| 
 | |
| 
 | |
| load_pem_x509_certificate = rust_x509.load_pem_x509_certificate
 | |
| load_der_x509_certificate = rust_x509.load_der_x509_certificate
 | |
| 
 | |
| load_pem_x509_certificates = rust_x509.load_pem_x509_certificates
 | |
| 
 | |
| load_pem_x509_csr = rust_x509.load_pem_x509_csr
 | |
| load_der_x509_csr = rust_x509.load_der_x509_csr
 | |
| 
 | |
| load_pem_x509_crl = rust_x509.load_pem_x509_crl
 | |
| load_der_x509_crl = rust_x509.load_der_x509_crl
 | |
| 
 | |
| 
 | |
| class CertificateSigningRequestBuilder:
 | |
|     def __init__(
 | |
|         self,
 | |
|         subject_name: Name | None = None,
 | |
|         extensions: list[Extension[ExtensionType]] = [],
 | |
|         attributes: list[tuple[ObjectIdentifier, bytes, int | None]] = [],
 | |
|     ):
 | |
|         """
 | |
|         Creates an empty X.509 certificate request (v1).
 | |
|         """
 | |
|         self._subject_name = subject_name
 | |
|         self._extensions = extensions
 | |
|         self._attributes = attributes
 | |
| 
 | |
|     def subject_name(self, name: Name) -> CertificateSigningRequestBuilder:
 | |
|         """
 | |
|         Sets the certificate requestor's distinguished name.
 | |
|         """
 | |
|         if not isinstance(name, Name):
 | |
|             raise TypeError("Expecting x509.Name object.")
 | |
|         if self._subject_name is not None:
 | |
|             raise ValueError("The subject name may only be set once.")
 | |
|         return CertificateSigningRequestBuilder(
 | |
|             name, self._extensions, self._attributes
 | |
|         )
 | |
| 
 | |
|     def add_extension(
 | |
|         self, extval: ExtensionType, critical: bool
 | |
|     ) -> CertificateSigningRequestBuilder:
 | |
|         """
 | |
|         Adds an X.509 extension to the certificate request.
 | |
|         """
 | |
|         if not isinstance(extval, ExtensionType):
 | |
|             raise TypeError("extension must be an ExtensionType")
 | |
| 
 | |
|         extension = Extension(extval.oid, critical, extval)
 | |
|         _reject_duplicate_extension(extension, self._extensions)
 | |
| 
 | |
|         return CertificateSigningRequestBuilder(
 | |
|             self._subject_name,
 | |
|             [*self._extensions, extension],
 | |
|             self._attributes,
 | |
|         )
 | |
| 
 | |
|     def add_attribute(
 | |
|         self,
 | |
|         oid: ObjectIdentifier,
 | |
|         value: bytes,
 | |
|         *,
 | |
|         _tag: _ASN1Type | None = None,
 | |
|     ) -> CertificateSigningRequestBuilder:
 | |
|         """
 | |
|         Adds an X.509 attribute with an OID and associated value.
 | |
|         """
 | |
|         if not isinstance(oid, ObjectIdentifier):
 | |
|             raise TypeError("oid must be an ObjectIdentifier")
 | |
| 
 | |
|         if not isinstance(value, bytes):
 | |
|             raise TypeError("value must be bytes")
 | |
| 
 | |
|         if _tag is not None and not isinstance(_tag, _ASN1Type):
 | |
|             raise TypeError("tag must be _ASN1Type")
 | |
| 
 | |
|         _reject_duplicate_attribute(oid, self._attributes)
 | |
| 
 | |
|         if _tag is not None:
 | |
|             tag = _tag.value
 | |
|         else:
 | |
|             tag = None
 | |
| 
 | |
|         return CertificateSigningRequestBuilder(
 | |
|             self._subject_name,
 | |
|             self._extensions,
 | |
|             [*self._attributes, (oid, value, tag)],
 | |
|         )
 | |
| 
 | |
|     def sign(
 | |
|         self,
 | |
|         private_key: CertificateIssuerPrivateKeyTypes,
 | |
|         algorithm: _AllowedHashTypes | None,
 | |
|         backend: typing.Any = None,
 | |
|         *,
 | |
|         rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
 | |
|     ) -> CertificateSigningRequest:
 | |
|         """
 | |
|         Signs the request using the requestor's private key.
 | |
|         """
 | |
|         if self._subject_name is None:
 | |
|             raise ValueError("A CertificateSigningRequest must have a subject")
 | |
| 
 | |
|         if rsa_padding is not None:
 | |
|             if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
 | |
|                 raise TypeError("Padding must be PSS or PKCS1v15")
 | |
|             if not isinstance(private_key, rsa.RSAPrivateKey):
 | |
|                 raise TypeError("Padding is only supported for RSA keys")
 | |
| 
 | |
|         return rust_x509.create_x509_csr(
 | |
|             self, private_key, algorithm, rsa_padding
 | |
|         )
 | |
| 
 | |
| 
 | |
| class CertificateBuilder:
 | |
|     _extensions: list[Extension[ExtensionType]]
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         issuer_name: Name | None = None,
 | |
|         subject_name: Name | None = None,
 | |
|         public_key: CertificatePublicKeyTypes | None = None,
 | |
|         serial_number: int | None = None,
 | |
|         not_valid_before: datetime.datetime | None = None,
 | |
|         not_valid_after: datetime.datetime | None = None,
 | |
|         extensions: list[Extension[ExtensionType]] = [],
 | |
|     ) -> None:
 | |
|         self._version = Version.v3
 | |
|         self._issuer_name = issuer_name
 | |
|         self._subject_name = subject_name
 | |
|         self._public_key = public_key
 | |
|         self._serial_number = serial_number
 | |
|         self._not_valid_before = not_valid_before
 | |
|         self._not_valid_after = not_valid_after
 | |
|         self._extensions = extensions
 | |
| 
 | |
|     def issuer_name(self, name: Name) -> CertificateBuilder:
 | |
|         """
 | |
|         Sets the CA's distinguished name.
 | |
|         """
 | |
|         if not isinstance(name, Name):
 | |
|             raise TypeError("Expecting x509.Name object.")
 | |
|         if self._issuer_name is not None:
 | |
|             raise ValueError("The issuer name may only be set once.")
 | |
|         return CertificateBuilder(
 | |
|             name,
 | |
|             self._subject_name,
 | |
|             self._public_key,
 | |
|             self._serial_number,
 | |
|             self._not_valid_before,
 | |
|             self._not_valid_after,
 | |
|             self._extensions,
 | |
|         )
 | |
| 
 | |
|     def subject_name(self, name: Name) -> CertificateBuilder:
 | |
|         """
 | |
|         Sets the requestor's distinguished name.
 | |
|         """
 | |
|         if not isinstance(name, Name):
 | |
|             raise TypeError("Expecting x509.Name object.")
 | |
|         if self._subject_name is not None:
 | |
|             raise ValueError("The subject name may only be set once.")
 | |
|         return CertificateBuilder(
 | |
|             self._issuer_name,
 | |
|             name,
 | |
|             self._public_key,
 | |
|             self._serial_number,
 | |
|             self._not_valid_before,
 | |
|             self._not_valid_after,
 | |
|             self._extensions,
 | |
|         )
 | |
| 
 | |
|     def public_key(
 | |
|         self,
 | |
|         key: CertificatePublicKeyTypes,
 | |
|     ) -> CertificateBuilder:
 | |
|         """
 | |
|         Sets the requestor's public key (as found in the signing request).
 | |
|         """
 | |
|         if not isinstance(
 | |
|             key,
 | |
|             (
 | |
|                 dsa.DSAPublicKey,
 | |
|                 rsa.RSAPublicKey,
 | |
|                 ec.EllipticCurvePublicKey,
 | |
|                 ed25519.Ed25519PublicKey,
 | |
|                 ed448.Ed448PublicKey,
 | |
|                 x25519.X25519PublicKey,
 | |
|                 x448.X448PublicKey,
 | |
|             ),
 | |
|         ):
 | |
|             raise TypeError(
 | |
|                 "Expecting one of DSAPublicKey, RSAPublicKey,"
 | |
|                 " EllipticCurvePublicKey, Ed25519PublicKey,"
 | |
|                 " Ed448PublicKey, X25519PublicKey, or "
 | |
|                 "X448PublicKey."
 | |
|             )
 | |
|         if self._public_key is not None:
 | |
|             raise ValueError("The public key may only be set once.")
 | |
|         return CertificateBuilder(
 | |
|             self._issuer_name,
 | |
|             self._subject_name,
 | |
|             key,
 | |
|             self._serial_number,
 | |
|             self._not_valid_before,
 | |
|             self._not_valid_after,
 | |
|             self._extensions,
 | |
|         )
 | |
| 
 | |
|     def serial_number(self, number: int) -> CertificateBuilder:
 | |
|         """
 | |
|         Sets the certificate serial number.
 | |
|         """
 | |
|         if not isinstance(number, int):
 | |
|             raise TypeError("Serial number must be of integral type.")
 | |
|         if self._serial_number is not None:
 | |
|             raise ValueError("The serial number may only be set once.")
 | |
|         if number <= 0:
 | |
|             raise ValueError("The serial number should be positive.")
 | |
| 
 | |
|         # ASN.1 integers are always signed, so most significant bit must be
 | |
|         # zero.
 | |
|         if number.bit_length() >= 160:  # As defined in RFC 5280
 | |
|             raise ValueError(
 | |
|                 "The serial number should not be more than 159 bits."
 | |
|             )
 | |
|         return CertificateBuilder(
 | |
|             self._issuer_name,
 | |
|             self._subject_name,
 | |
|             self._public_key,
 | |
|             number,
 | |
|             self._not_valid_before,
 | |
|             self._not_valid_after,
 | |
|             self._extensions,
 | |
|         )
 | |
| 
 | |
|     def not_valid_before(self, time: datetime.datetime) -> CertificateBuilder:
 | |
|         """
 | |
|         Sets the certificate activation time.
 | |
|         """
 | |
|         if not isinstance(time, datetime.datetime):
 | |
|             raise TypeError("Expecting datetime object.")
 | |
|         if self._not_valid_before is not None:
 | |
|             raise ValueError("The not valid before may only be set once.")
 | |
|         time = _convert_to_naive_utc_time(time)
 | |
|         if time < _EARLIEST_UTC_TIME:
 | |
|             raise ValueError(
 | |
|                 "The not valid before date must be on or after"
 | |
|                 " 1950 January 1)."
 | |
|             )
 | |
|         if self._not_valid_after is not None and time > self._not_valid_after:
 | |
|             raise ValueError(
 | |
|                 "The not valid before date must be before the not valid after "
 | |
|                 "date."
 | |
|             )
 | |
|         return CertificateBuilder(
 | |
|             self._issuer_name,
 | |
|             self._subject_name,
 | |
|             self._public_key,
 | |
|             self._serial_number,
 | |
|             time,
 | |
|             self._not_valid_after,
 | |
|             self._extensions,
 | |
|         )
 | |
| 
 | |
|     def not_valid_after(self, time: datetime.datetime) -> CertificateBuilder:
 | |
|         """
 | |
|         Sets the certificate expiration time.
 | |
|         """
 | |
|         if not isinstance(time, datetime.datetime):
 | |
|             raise TypeError("Expecting datetime object.")
 | |
|         if self._not_valid_after is not None:
 | |
|             raise ValueError("The not valid after may only be set once.")
 | |
|         time = _convert_to_naive_utc_time(time)
 | |
|         if time < _EARLIEST_UTC_TIME:
 | |
|             raise ValueError(
 | |
|                 "The not valid after date must be on or after"
 | |
|                 " 1950 January 1."
 | |
|             )
 | |
|         if (
 | |
|             self._not_valid_before is not None
 | |
|             and time < self._not_valid_before
 | |
|         ):
 | |
|             raise ValueError(
 | |
|                 "The not valid after date must be after the not valid before "
 | |
|                 "date."
 | |
|             )
 | |
|         return CertificateBuilder(
 | |
|             self._issuer_name,
 | |
|             self._subject_name,
 | |
|             self._public_key,
 | |
|             self._serial_number,
 | |
|             self._not_valid_before,
 | |
|             time,
 | |
|             self._extensions,
 | |
|         )
 | |
| 
 | |
|     def add_extension(
 | |
|         self, extval: ExtensionType, critical: bool
 | |
|     ) -> CertificateBuilder:
 | |
|         """
 | |
|         Adds an X.509 extension to the certificate.
 | |
|         """
 | |
|         if not isinstance(extval, ExtensionType):
 | |
|             raise TypeError("extension must be an ExtensionType")
 | |
| 
 | |
|         extension = Extension(extval.oid, critical, extval)
 | |
|         _reject_duplicate_extension(extension, self._extensions)
 | |
| 
 | |
|         return CertificateBuilder(
 | |
|             self._issuer_name,
 | |
|             self._subject_name,
 | |
|             self._public_key,
 | |
|             self._serial_number,
 | |
|             self._not_valid_before,
 | |
|             self._not_valid_after,
 | |
|             [*self._extensions, extension],
 | |
|         )
 | |
| 
 | |
|     def sign(
 | |
|         self,
 | |
|         private_key: CertificateIssuerPrivateKeyTypes,
 | |
|         algorithm: _AllowedHashTypes | None,
 | |
|         backend: typing.Any = None,
 | |
|         *,
 | |
|         rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
 | |
|     ) -> Certificate:
 | |
|         """
 | |
|         Signs the certificate using the CA's private key.
 | |
|         """
 | |
|         if self._subject_name is None:
 | |
|             raise ValueError("A certificate must have a subject name")
 | |
| 
 | |
|         if self._issuer_name is None:
 | |
|             raise ValueError("A certificate must have an issuer name")
 | |
| 
 | |
|         if self._serial_number is None:
 | |
|             raise ValueError("A certificate must have a serial number")
 | |
| 
 | |
|         if self._not_valid_before is None:
 | |
|             raise ValueError("A certificate must have a not valid before time")
 | |
| 
 | |
|         if self._not_valid_after is None:
 | |
|             raise ValueError("A certificate must have a not valid after time")
 | |
| 
 | |
|         if self._public_key is None:
 | |
|             raise ValueError("A certificate must have a public key")
 | |
| 
 | |
|         if rsa_padding is not None:
 | |
|             if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
 | |
|                 raise TypeError("Padding must be PSS or PKCS1v15")
 | |
|             if not isinstance(private_key, rsa.RSAPrivateKey):
 | |
|                 raise TypeError("Padding is only supported for RSA keys")
 | |
| 
 | |
|         return rust_x509.create_x509_certificate(
 | |
|             self, private_key, algorithm, rsa_padding
 | |
|         )
 | |
| 
 | |
| 
 | |
| class CertificateRevocationListBuilder:
 | |
|     _extensions: list[Extension[ExtensionType]]
 | |
|     _revoked_certificates: list[RevokedCertificate]
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         issuer_name: Name | None = None,
 | |
|         last_update: datetime.datetime | None = None,
 | |
|         next_update: datetime.datetime | None = None,
 | |
|         extensions: list[Extension[ExtensionType]] = [],
 | |
|         revoked_certificates: list[RevokedCertificate] = [],
 | |
|     ):
 | |
|         self._issuer_name = issuer_name
 | |
|         self._last_update = last_update
 | |
|         self._next_update = next_update
 | |
|         self._extensions = extensions
 | |
|         self._revoked_certificates = revoked_certificates
 | |
| 
 | |
|     def issuer_name(
 | |
|         self, issuer_name: Name
 | |
|     ) -> CertificateRevocationListBuilder:
 | |
|         if not isinstance(issuer_name, Name):
 | |
|             raise TypeError("Expecting x509.Name object.")
 | |
|         if self._issuer_name is not None:
 | |
|             raise ValueError("The issuer name may only be set once.")
 | |
|         return CertificateRevocationListBuilder(
 | |
|             issuer_name,
 | |
|             self._last_update,
 | |
|             self._next_update,
 | |
|             self._extensions,
 | |
|             self._revoked_certificates,
 | |
|         )
 | |
| 
 | |
|     def last_update(
 | |
|         self, last_update: datetime.datetime
 | |
|     ) -> CertificateRevocationListBuilder:
 | |
|         if not isinstance(last_update, datetime.datetime):
 | |
|             raise TypeError("Expecting datetime object.")
 | |
|         if self._last_update is not None:
 | |
|             raise ValueError("Last update may only be set once.")
 | |
|         last_update = _convert_to_naive_utc_time(last_update)
 | |
|         if last_update < _EARLIEST_UTC_TIME:
 | |
|             raise ValueError(
 | |
|                 "The last update date must be on or after 1950 January 1."
 | |
|             )
 | |
|         if self._next_update is not None and last_update > self._next_update:
 | |
|             raise ValueError(
 | |
|                 "The last update date must be before the next update date."
 | |
|             )
 | |
|         return CertificateRevocationListBuilder(
 | |
|             self._issuer_name,
 | |
|             last_update,
 | |
|             self._next_update,
 | |
|             self._extensions,
 | |
|             self._revoked_certificates,
 | |
|         )
 | |
| 
 | |
|     def next_update(
 | |
|         self, next_update: datetime.datetime
 | |
|     ) -> CertificateRevocationListBuilder:
 | |
|         if not isinstance(next_update, datetime.datetime):
 | |
|             raise TypeError("Expecting datetime object.")
 | |
|         if self._next_update is not None:
 | |
|             raise ValueError("Last update may only be set once.")
 | |
|         next_update = _convert_to_naive_utc_time(next_update)
 | |
|         if next_update < _EARLIEST_UTC_TIME:
 | |
|             raise ValueError(
 | |
|                 "The last update date must be on or after 1950 January 1."
 | |
|             )
 | |
|         if self._last_update is not None and next_update < self._last_update:
 | |
|             raise ValueError(
 | |
|                 "The next update date must be after the last update date."
 | |
|             )
 | |
|         return CertificateRevocationListBuilder(
 | |
|             self._issuer_name,
 | |
|             self._last_update,
 | |
|             next_update,
 | |
|             self._extensions,
 | |
|             self._revoked_certificates,
 | |
|         )
 | |
| 
 | |
|     def add_extension(
 | |
|         self, extval: ExtensionType, critical: bool
 | |
|     ) -> CertificateRevocationListBuilder:
 | |
|         """
 | |
|         Adds an X.509 extension to the certificate revocation list.
 | |
|         """
 | |
|         if not isinstance(extval, ExtensionType):
 | |
|             raise TypeError("extension must be an ExtensionType")
 | |
| 
 | |
|         extension = Extension(extval.oid, critical, extval)
 | |
|         _reject_duplicate_extension(extension, self._extensions)
 | |
|         return CertificateRevocationListBuilder(
 | |
|             self._issuer_name,
 | |
|             self._last_update,
 | |
|             self._next_update,
 | |
|             [*self._extensions, extension],
 | |
|             self._revoked_certificates,
 | |
|         )
 | |
| 
 | |
|     def add_revoked_certificate(
 | |
|         self, revoked_certificate: RevokedCertificate
 | |
|     ) -> CertificateRevocationListBuilder:
 | |
|         """
 | |
|         Adds a revoked certificate to the CRL.
 | |
|         """
 | |
|         if not isinstance(revoked_certificate, RevokedCertificate):
 | |
|             raise TypeError("Must be an instance of RevokedCertificate")
 | |
| 
 | |
|         return CertificateRevocationListBuilder(
 | |
|             self._issuer_name,
 | |
|             self._last_update,
 | |
|             self._next_update,
 | |
|             self._extensions,
 | |
|             [*self._revoked_certificates, revoked_certificate],
 | |
|         )
 | |
| 
 | |
|     def sign(
 | |
|         self,
 | |
|         private_key: CertificateIssuerPrivateKeyTypes,
 | |
|         algorithm: _AllowedHashTypes | None,
 | |
|         backend: typing.Any = None,
 | |
|         *,
 | |
|         rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
 | |
|     ) -> CertificateRevocationList:
 | |
|         if self._issuer_name is None:
 | |
|             raise ValueError("A CRL must have an issuer name")
 | |
| 
 | |
|         if self._last_update is None:
 | |
|             raise ValueError("A CRL must have a last update time")
 | |
| 
 | |
|         if self._next_update is None:
 | |
|             raise ValueError("A CRL must have a next update time")
 | |
| 
 | |
|         if rsa_padding is not None:
 | |
|             if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
 | |
|                 raise TypeError("Padding must be PSS or PKCS1v15")
 | |
|             if not isinstance(private_key, rsa.RSAPrivateKey):
 | |
|                 raise TypeError("Padding is only supported for RSA keys")
 | |
| 
 | |
|         return rust_x509.create_x509_crl(
 | |
|             self, private_key, algorithm, rsa_padding
 | |
|         )
 | |
| 
 | |
| 
 | |
| class RevokedCertificateBuilder:
 | |
|     def __init__(
 | |
|         self,
 | |
|         serial_number: int | None = None,
 | |
|         revocation_date: datetime.datetime | None = None,
 | |
|         extensions: list[Extension[ExtensionType]] = [],
 | |
|     ):
 | |
|         self._serial_number = serial_number
 | |
|         self._revocation_date = revocation_date
 | |
|         self._extensions = extensions
 | |
| 
 | |
|     def serial_number(self, number: int) -> RevokedCertificateBuilder:
 | |
|         if not isinstance(number, int):
 | |
|             raise TypeError("Serial number must be of integral type.")
 | |
|         if self._serial_number is not None:
 | |
|             raise ValueError("The serial number may only be set once.")
 | |
|         if number <= 0:
 | |
|             raise ValueError("The serial number should be positive")
 | |
| 
 | |
|         # ASN.1 integers are always signed, so most significant bit must be
 | |
|         # zero.
 | |
|         if number.bit_length() >= 160:  # As defined in RFC 5280
 | |
|             raise ValueError(
 | |
|                 "The serial number should not be more than 159 bits."
 | |
|             )
 | |
|         return RevokedCertificateBuilder(
 | |
|             number, self._revocation_date, self._extensions
 | |
|         )
 | |
| 
 | |
|     def revocation_date(
 | |
|         self, time: datetime.datetime
 | |
|     ) -> RevokedCertificateBuilder:
 | |
|         if not isinstance(time, datetime.datetime):
 | |
|             raise TypeError("Expecting datetime object.")
 | |
|         if self._revocation_date is not None:
 | |
|             raise ValueError("The revocation date may only be set once.")
 | |
|         time = _convert_to_naive_utc_time(time)
 | |
|         if time < _EARLIEST_UTC_TIME:
 | |
|             raise ValueError(
 | |
|                 "The revocation date must be on or after 1950 January 1."
 | |
|             )
 | |
|         return RevokedCertificateBuilder(
 | |
|             self._serial_number, time, self._extensions
 | |
|         )
 | |
| 
 | |
|     def add_extension(
 | |
|         self, extval: ExtensionType, critical: bool
 | |
|     ) -> RevokedCertificateBuilder:
 | |
|         if not isinstance(extval, ExtensionType):
 | |
|             raise TypeError("extension must be an ExtensionType")
 | |
| 
 | |
|         extension = Extension(extval.oid, critical, extval)
 | |
|         _reject_duplicate_extension(extension, self._extensions)
 | |
|         return RevokedCertificateBuilder(
 | |
|             self._serial_number,
 | |
|             self._revocation_date,
 | |
|             [*self._extensions, extension],
 | |
|         )
 | |
| 
 | |
|     def build(self, backend: typing.Any = None) -> RevokedCertificate:
 | |
|         if self._serial_number is None:
 | |
|             raise ValueError("A revoked certificate must have a serial number")
 | |
|         if self._revocation_date is None:
 | |
|             raise ValueError(
 | |
|                 "A revoked certificate must have a revocation date"
 | |
|             )
 | |
|         return _RawRevokedCertificate(
 | |
|             self._serial_number,
 | |
|             self._revocation_date,
 | |
|             Extensions(self._extensions),
 | |
|         )
 | |
| 
 | |
| 
 | |
| def random_serial_number() -> int:
 | |
|     return int.from_bytes(os.urandom(20), "big") >> 1
 | 
