# -*- coding: utf-8 -*- import base64 import logging import ssl from datetime import datetime from cryptography.hazmat.primitives import serialization _logger = logging.getLogger(__name__) try: from OpenSSL import crypto except ImportError: _logger.warning('OpenSSL library not found. If you plan to use l10n_mx_edi, please install the library from https://pypi.python.org/pypi/pyOpenSSL') from pytz import timezone from odoo import _, api, fields, models, tools from odoo.exceptions import ValidationError, UserError from odoo.tools.misc import DEFAULT_SERVER_DATETIME_FORMAT def convert_key_cer_to_pem(key, password): # TODO compute it from a python way private_key = serialization.load_der_private_key(base64.b64decode(key), password.encode()) return private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) def str_to_datetime(dt_str, tz=timezone('America/Mexico_City')): return tz.localize(fields.Datetime.from_string(dt_str)) class AccountEsignatureCertificate(models.Model): _name = 'account.esignature.certificate' _description = 'Certificado de México' content = fields.Binary(string='Certificado', required=True) key = fields.Binary(string='Llave de certificado',required=True) password = fields.Char(string='Contraseña', required=True) holder = fields.Char(string='Nombre', required=False) holder_vat = fields.Char(string="RFC", required=False) serial_number = fields.Char(string='Número de serie', readonly=True, index=True) date_start = fields.Datetime(string='Fecha inicio', readonly=True) date_end = fields.Datetime(string='Fecha final', readonly=True) @tools.ormcache('content') def get_pem_cer(self, content): '''Get the current content in PEM format ''' self.ensure_one() return ssl.DER_cert_to_PEM_cert(base64.decodebytes(content)).encode('UTF-8') @tools.ormcache('key', 'password') def get_pem_key(self, key, password): '''Get the current key in PEM format ''' self.ensure_one() private_key = serialization.load_der_private_key(base64.b64decode(key), password.encode()) return private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) def get_data(self): '''Return the content (b64 encoded) and the certificate decrypted ''' self.ensure_one() cer_pem = self.get_pem_cer(self.content) certificate = crypto.load_certificate(crypto.FILETYPE_PEM, cer_pem) for to_del in ['\n', ssl.PEM_HEADER, ssl.PEM_FOOTER]: cer_pem = cer_pem.replace(to_del.encode('UTF-8'), b'') return cer_pem, certificate def get_mx_current_datetime(self): '''Get the current datetime with the Mexican timezone. ''' mexican_tz = timezone('America/Mexico_City') return datetime.now(mexican_tz) def get_valid_certificate(self): '''Search for a valid certificate that is available and not expired. ''' mexican_dt = self.get_mx_current_datetime() for record in self: date_start = str_to_datetime(record.date_start) date_end = str_to_datetime(record.date_end) if date_start <= mexican_dt <= date_end: return record return None def get_encrypted_cadena(self, cadena): '''Encrypt the cadena using the private key. ''' self.ensure_one() key_pem = self.get_pem_key(self.key, self.password) private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) encrypt = 'sha256WithRSAEncryption' cadena_crypted = crypto.sign(private_key, cadena, encrypt) return base64.b64encode(cadena_crypted) @api.constrains('content', 'key', 'password') def _check_credentials(self): '''Check the validity of content/key/password and fill the fields with the certificate values. ''' mexican_tz = timezone('America/Mexico_City') mexican_dt = self.get_mx_current_datetime() date_format = '%Y%m%d%H%M%SZ' for record in self: try: cer_pem, certificate = record.get_data() before = mexican_tz.localize( datetime.strptime(certificate.get_notBefore().decode("utf-8"), date_format)) after = mexican_tz.localize( datetime.strptime(certificate.get_notAfter().decode("utf-8"), date_format)) serial_number = certificate.get_serial_number() subject = certificate.get_subject() holder = subject.CN except Exception as e: raise ValidationError(_('El contenido del certificado no es valido.')) record.holder = holder record.serial_number = ('%x' % serial_number)[1::2] record.date_start = before.strftime(DEFAULT_SERVER_DATETIME_FORMAT) record.date_end = after.strftime(DEFAULT_SERVER_DATETIME_FORMAT) if mexican_dt > after: raise ValidationError(_('El certificado expiro desde %s') % record.date_end) try: key_pem = self.get_pem_key(self.key, self.password) crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem) except Exception: raise ValidationError(_('La contraseña no corresponde, favor de validar')) @api.model def create(self, data): res = super(AccountEsignatureCertificate, self).create(data) self.clear_caches() return res def write(self, data): res = super(AccountEsignatureCertificate, self).write(data) self.clear_caches() return res def unlink(self): if self.env['account.move'].search([('l10n_mx_edi_cfdi_certificate_id', 'in', self.ids)]): raise UserError(_('No es posible eliminar un certificado valido y vigente.')) res = super(AccountEsignatureCertificate, self).unlink() self.clear_caches() return res