account_esignature_certificate.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # -*- coding: utf-8 -*-
  2. import base64
  3. import logging
  4. import ssl
  5. from datetime import datetime
  6. from cryptography.hazmat.primitives import serialization
  7. _logger = logging.getLogger(__name__)
  8. try:
  9. from OpenSSL import crypto
  10. except ImportError:
  11. _logger.warning('OpenSSL library not found. If you plan to use l10n_mx_edi, please install the library from https://pypi.python.org/pypi/pyOpenSSL')
  12. from pytz import timezone
  13. from odoo import _, api, fields, models, tools
  14. from odoo.exceptions import ValidationError, UserError
  15. from odoo.tools.misc import DEFAULT_SERVER_DATETIME_FORMAT
  16. def convert_key_cer_to_pem(key, password):
  17. # TODO compute it from a python way
  18. private_key = serialization.load_der_private_key(base64.b64decode(key), password.encode())
  19. return private_key.private_bytes(
  20. encoding=serialization.Encoding.PEM,
  21. format=serialization.PrivateFormat.PKCS8,
  22. encryption_algorithm=serialization.NoEncryption()
  23. )
  24. def str_to_datetime(dt_str, tz=timezone('America/Mexico_City')):
  25. return tz.localize(fields.Datetime.from_string(dt_str))
  26. class AccountEsignatureCertificate(models.Model):
  27. _name = 'account.esignature.certificate'
  28. _description = 'Certificado de México'
  29. content = fields.Binary(string='Certificado', required=True)
  30. key = fields.Binary(string='Llave de certificado',required=True)
  31. password = fields.Char(string='Contraseña', required=True)
  32. holder = fields.Char(string='Nombre', required=False)
  33. holder_vat = fields.Char(string="RFC", required=False)
  34. serial_number = fields.Char(string='Número de serie', readonly=True, index=True)
  35. date_start = fields.Datetime(string='Fecha inicio', readonly=True)
  36. date_end = fields.Datetime(string='Fecha final', readonly=True)
  37. @tools.ormcache('content')
  38. def get_pem_cer(self, content):
  39. '''Get the current content in PEM format
  40. '''
  41. self.ensure_one()
  42. return ssl.DER_cert_to_PEM_cert(base64.decodebytes(content)).encode('UTF-8')
  43. @tools.ormcache('key', 'password')
  44. def get_pem_key(self, key, password):
  45. '''Get the current key in PEM format
  46. '''
  47. self.ensure_one()
  48. private_key = serialization.load_der_private_key(base64.b64decode(key), password.encode())
  49. return private_key.private_bytes(
  50. encoding=serialization.Encoding.PEM,
  51. format=serialization.PrivateFormat.PKCS8,
  52. encryption_algorithm=serialization.NoEncryption()
  53. )
  54. def get_data(self):
  55. '''Return the content (b64 encoded) and the certificate decrypted
  56. '''
  57. self.ensure_one()
  58. cer_pem = self.get_pem_cer(self.content)
  59. certificate = crypto.load_certificate(crypto.FILETYPE_PEM, cer_pem)
  60. for to_del in ['\n', ssl.PEM_HEADER, ssl.PEM_FOOTER]:
  61. cer_pem = cer_pem.replace(to_del.encode('UTF-8'), b'')
  62. return cer_pem, certificate
  63. def get_mx_current_datetime(self):
  64. '''Get the current datetime with the Mexican timezone.
  65. '''
  66. mexican_tz = timezone('America/Mexico_City')
  67. return datetime.now(mexican_tz)
  68. def get_valid_certificate(self):
  69. '''Search for a valid certificate that is available and not expired.
  70. '''
  71. mexican_dt = self.get_mx_current_datetime()
  72. for record in self:
  73. date_start = str_to_datetime(record.date_start)
  74. date_end = str_to_datetime(record.date_end)
  75. if date_start <= mexican_dt <= date_end:
  76. return record
  77. return None
  78. def get_encrypted_cadena(self, cadena):
  79. '''Encrypt the cadena using the private key.
  80. '''
  81. self.ensure_one()
  82. key_pem = self.get_pem_key(self.key, self.password)
  83. private_key = crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
  84. encrypt = 'sha256WithRSAEncryption'
  85. cadena_crypted = crypto.sign(private_key, cadena, encrypt)
  86. return base64.b64encode(cadena_crypted)
  87. @api.constrains('content', 'key', 'password')
  88. def _check_credentials(self):
  89. '''Check the validity of content/key/password and fill the fields
  90. with the certificate values.
  91. '''
  92. mexican_tz = timezone('America/Mexico_City')
  93. mexican_dt = self.get_mx_current_datetime()
  94. date_format = '%Y%m%d%H%M%SZ'
  95. for record in self:
  96. try:
  97. cer_pem, certificate = record.get_data()
  98. before = mexican_tz.localize(
  99. datetime.strptime(certificate.get_notBefore().decode("utf-8"), date_format))
  100. after = mexican_tz.localize(
  101. datetime.strptime(certificate.get_notAfter().decode("utf-8"), date_format))
  102. serial_number = certificate.get_serial_number()
  103. subject = certificate.get_subject()
  104. holder = subject.CN
  105. except Exception as e:
  106. raise ValidationError(_('El contenido del certificado no es valido.'))
  107. record.holder = holder
  108. record.serial_number = ('%x' % serial_number)[1::2]
  109. record.date_start = before.strftime(DEFAULT_SERVER_DATETIME_FORMAT)
  110. record.date_end = after.strftime(DEFAULT_SERVER_DATETIME_FORMAT)
  111. if mexican_dt > after:
  112. raise ValidationError(_('El certificado expiro desde %s') % record.date_end)
  113. try:
  114. key_pem = self.get_pem_key(self.key, self.password)
  115. crypto.load_privatekey(crypto.FILETYPE_PEM, key_pem)
  116. except Exception:
  117. raise ValidationError(_('La contraseña no corresponde, favor de validar'))
  118. @api.model
  119. def create(self, data):
  120. res = super(AccountEsignatureCertificate, self).create(data)
  121. self.clear_caches()
  122. return res
  123. def write(self, data):
  124. res = super(AccountEsignatureCertificate, self).write(data)
  125. self.clear_caches()
  126. return res
  127. def unlink(self):
  128. if self.env['account.move'].search([('l10n_mx_edi_cfdi_certificate_id', 'in', self.ids)]):
  129. raise UserError(_('No es posible eliminar un certificado valido y vigente.'))
  130. res = super(AccountEsignatureCertificate, self).unlink()
  131. self.clear_caches()
  132. return res