from odoo import api, fields, models, _, Command from odoo.exceptions import ValidationError from collections import OrderedDict from datetime import date from os.path import basename from zipfile import ZipFile from tempfile import TemporaryDirectory import xmltodict import base64 import logging import os.path _logger = logging.getLogger(__name__) class AccountCFDI(models.Model): _name = 'account.cfdi' _inherit = ['mail.thread', 'mail.activity.mixin'] _description = 'Complemento CFDI' code = fields.Char(string="Código") name = fields.Char(string="Referencia") uuid = fields.Char(string="UUID") certificate = fields.Char(string="Certificado") certificate_number = fields.Char(string="Nro. de certificado") serie = fields.Char(string="Serie") folio = fields.Char(string="Folio") stamp = fields.Char(string="Sello") version = fields.Char(string="Versión") payment_condition = fields.Char(string="Condiciones de pago") currency = fields.Char(string="Moneda") payment_method = fields.Char(string="Forma de pago") location = fields.Char(string="Lugar de expedición") observations = fields.Text(string='Notas') attachment_id = fields.Many2one(comodel_name="ir.attachment", string="Archivo adjunto") pdf_id = fields.Many2one(comodel_name="ir.attachment", string="Representación impresa") company_id = fields.Many2one(comodel_name="res.company", string="Empresa", default=lambda self: self.env.company) emitter_id = fields.Many2one(comodel_name="res.partner", string="Emisor") receiver_id = fields.Many2one(comodel_name="res.partner", string="Receptor") move_id = fields.Many2one(comodel_name="account.move", string="Movimiento contable", copy=False) payable_account_id = fields.Many2one(comodel_name='account.account', string='Cuenta contable a pagar', tracking=True) account_id = fields.Many2one(comodel_name='account.account', string='Cuenta contable gasto', tracking=True) account_analytic_account_id = fields.Many2one(comodel_name='account.analytic.account', string='Cuenta analítica') fiscal_position_id = fields.Many2one(comodel_name='account.fiscal.position', string='Posición fiscal') journal_id = fields.Many2one(comodel_name='account.journal', string='Diario', tracking=True) tax_isr_id = fields.Many2one(comodel_name='account.tax', string='Retención ISR') tax_iva_id = fields.Many2one(comodel_name='account.tax', string='Retención IVA') analytic_distribution = fields.Json(string="Distribución analítica") analytic_precision = fields.Integer(string="Precisión analítica", store=False, default=lambda self: self.env['decimal.precision'].precision_get("Percentage Analytic")) concept_ids = fields.One2many("account.cfdi.line", "cfdi_id", string="Conceptos") tax_ids = fields.One2many('account.cfdi.tax', 'cfdi_id', string='Impuestos') date = fields.Date(string="Fecha") subtotal = fields.Float(string="Subtotal", copy=False) total = fields.Float(string="Total", copy=False) tax_total = fields.Float(string="Impuestos", compute="compute_tax_total", store=True) payment_type = fields.Selection(selection=[('PPD', 'PPD'), ('PUE', 'PUE')], string='Método de pago', readonly=True) cfdi_type = fields.Selection(string="Tipo de comprobante", selection=[ ('I', 'Facturas de clientes'), ('SI', 'Facturas de proveedor'), ('E', 'Notas de crédito cliente'), ('SE', 'Notas de crédito proveedor'), ('P', 'REP de clientes'), ('SP', 'REP de proveedores'), ('N', 'Nóminas de empleados'), ('SN', 'Nómina propia'), ('T', 'Factura de traslado cliente'), ('ST', 'Factura de traslado proveedor') ], index=True, copy=False) state = fields.Selection(string="Estado", selection=[ ('draft', 'Borrador'), ('done', 'Procesada'), ('cancel', 'Anulado') ], copy=False, default='draft', tracking=True) sat_state = fields.Selection(string="Estado SAT", selection=[ ("valid", "Valido"), ("not_found", "No Encontrado"), ("undefined", "No sincronizado Aún"), ("none", "Estado no definido"), ("cancelled", "Cancelado") ], copy=False, tracking=True, default="valid") #Datos de addenda delivery_number = fields.Char(string="No. Entrega") invoice_qty = fields.Integer(string="Unidades facturadas") @api.depends("subtotal","total") def compute_tax_total(self): for rec in self: rec.tax_total = rec.total - rec.subtotal # ---------------------------------------------------Metodos de creación--------------------------------------------------------- @api.model def create(self, vals_list): self = self.with_context(skip_invoice_sync=True, check_move_validity=False) res = super().create(vals_list) for cfdi in res.filtered(lambda move: move.move_id): cfdi.move_id.update({ "cfdi_id": cfdi.id }) return res def name_get(self): result = [] for record in self: folio = f"[{record.serie}-{record.folio}] - " if record.serie and record.folio else f"[{record.serie}] - " if record.serie and not record.folio else f"[{record.folio}] - " if not record.serie and record.folio else "" name = f"{folio}" + record.uuid + ' - $' + str(record.total) result.append((record.id, name)) return result # Creación de CFDIS def create_cfdis(self, attachment_data): self = self.with_context(skip_invoice_sync=True, check_move_validity=False) cfdi_list = [] cfdi_ids = self.env["account.cfdi"] uuids = [] for data in attachment_data: # Obtener la información para crear el cfdi data_xml = data.get("xml") xml_data = self.get_cfdi_data(data_xml.get("datas")) if xml_data.get("Comprobante"): cfdi_type = self.get_cfdi_type(xml_data) uuid = self.validation_cfdi(xml_data, cfdi_type) # Evitar UUID repetidos ya que el SAT manda en ocasiones el mismo XML dos veces if uuid and uuid not in uuids: uuids.append(uuid) emiiter_partner_id, recipient_partner_id = self.get_cfdi_partners(xml_data) move_id = self.validate_cfdi_move(xml_data, uuid, cfdi_type, emiiter_partner_id) cfdi_data = self.add_cfdi_data(xml_data, uuid, emiiter_partner_id, recipient_partner_id, move_id, cfdi_type, data) cfdi_data = self.get_cfdi_lines(xml_data, cfdi_data, cfdi_type, emiiter_partner_id, recipient_partner_id) # cfdi_data = self.get_payment_tax(cfdi_type, xml_data, cfdi_data) cfdi_list.append(cfdi_data) else: continue if cfdi_list: cfdi_ids = self.env["account.cfdi"].sudo().create(cfdi_list) return cfdi_ids # Obtener la información del xml def get_cfdi_data(self, xml_file): file_content = base64.b64decode(xml_file) if b'xmlns:schemaLocation' in file_content and b'xsi:schemaLocation' not in file_content: file_content = file_content.replace(b'xmlns:schemaLocation', b'xsi:schemaLocation') file_content = file_content.replace(b'cfdi:', b'') file_content = file_content.replace(b'tfd:', b'') try: xml_data = xmltodict.parse(file_content) return xml_data except Exception as e: _logger.info(e) return dict() # Obtener el tipo de comprobante que es el CFDI def get_cfdi_type(self, xml_data): cfdi_type = xml_data['Comprobante']['@TipoDeComprobante'] if '@TipoDeComprobante' in xml_data['Comprobante'] else 'I' if cfdi_type in ['I', 'E', 'P']: if xml_data['Comprobante']['Emisor']['@Rfc'] != self.env.company.vat: cfdi_type = 'S' + cfdi_type return cfdi_type # Validaciones antes de la creación del CFDI def validation_cfdi(self, xml_data, cfdi_type): if '@UUID' in xml_data['Comprobante']['Complemento']['TimbreFiscalDigital']: uuid = xml_data['Comprobante']['Complemento']['TimbreFiscalDigital']['@UUID'] cfdi_id = self.env['account.cfdi'].sudo().search([('uuid', '=', uuid)], limit=1) if cfdi_id: _logger.info(f"El CFDI con UUID {uuid}, ya existe en la base de datos, se omitirá en el proceso.") return False # Evitar que se suban xml que no pertenecen a la empresa if "S" in cfdi_type: partner_rfc = xml_data['Comprobante']['Receptor']['@Rfc'] else: partner_rfc = xml_data['Comprobante']['Emisor']['@Rfc'] if partner_rfc != self.env.company.vat: return False else: return False return uuid # Buscar el asiento contable de odoo relacionado si es que existe def validate_cfdi_move(self, xml_data, uuid, cfdi_type, emitter_partner_id): move_id = self.env["account.move"] delivery_number, invoice_qty = self.get_addenda_data(xml_data) if cfdi_type in ["I", "E"]: move_id = move_id.sudo().search( [("move_type", "in", ["out_invoice", "out_refund"]), ("l10n_mx_edi_cfdi_uuid", "=", uuid), ("state", "=", "posted"), ("cfdi_id", "=", False)], limit=1) elif cfdi_type in ["SI", "SE"]: invoice_date = xml_data['Comprobante']['@Fecha'] if '@Fecha' in xml_data['Comprobante'] else "" folio = xml_data['Comprobante']['@Folio'] if '@Folio' in xml_data['Comprobante'] else '' move_id = self.env['account.move'].sudo().search( [('partner_id.vat', '=', emitter_partner_id.vat), ('ref', '=', folio), ('move_type', 'in', ['in_invoice', 'in_refund']), ("state", "=", "posted"), ("cfdi_id", "=", False), ("invoice_date", "=", invoice_date[:10])], limit=1) if not move_id: if '@Serie' in xml_data['Comprobante']: folio = xml_data['Comprobante']['@Serie'] + folio move_id = self.env['account.move'].sudo().search( [('partner_id.vat', '=', emitter_partner_id.vat), ('ref', '=', folio), ('move_type', 'in', ['in_invoice', 'in_refund']), ("cfdi_id", "=", False), ("state", "=", "posted"), ("invoice_date", "=", invoice_date[:10])], limit=1) if not move_id: move_id = self.env['account.move'].sudo().search( [('partner_id.vat', '=', emitter_partner_id.vat), ('move_type', 'in', ['in_invoice', 'in_refund']), ("state", "=", "posted"), ("cfdi_id", "=", False), ("invoice_date", "=", invoice_date[:10])], limit=1) if not move_id and delivery_number: move_id = self.env['account.move'].sudo().search( [('partner_id.vat', '=', emitter_partner_id.vat), ('move_type', 'in', ['in_invoice', 'in_refund']), ("state", "=", "posted"), ("cfdi_id", "=", False), ("x_delivery_number","!=",False), ("x_delivery_number", "=", delivery_number)], limit=1) if not move_id: amount_untaxed = xml_data['Comprobante']['@SubTotal'] if '@SubTotal' in xml_data['Comprobante'] else 0 invoice_date = xml_data['Comprobante']['@Fecha'] if '@Fecha' in xml_data['Comprobante'] else "" move_id = self.env['account.move'].sudo().search( [('partner_id.vat', '=', emitter_partner_id.vat), ('move_type', 'in', ['in_invoice']), ("cfdi_id", "=", False), ("state", "=", "posted"), ("amount_untaxed", "=", amount_untaxed), ("invoice_date", "=", invoice_date[:10])], limit=1) return move_id # Preparar la informacion del CFDI def add_cfdi_data(self, xml_data, uuid, emitter_partner_id, recipient_partner_id, move_id, cfdi_type, attachment_data): journal_id, account_id = self.get_cfdi_journal_id(cfdi_type, emitter_partner_id, recipient_partner_id) partner_id = recipient_partner_id if cfdi_type in ["I", "E"] else emitter_partner_id payable_account_id = self.get_payable_cfdi_account_id(cfdi_type, partner_id) attachment_id = self.env["ir.attachment"].sudo().create(attachment_data.get("xml")) pdf_id = self.env["ir.attachment"].sudo().create(attachment_data.get("pdf")) if attachment_data.get("pdf") else False delivery_number, invoice_qty = self.get_addenda_data(xml_data) data = { "attachment_id": attachment_id.id, "pdf_id": pdf_id.id if pdf_id else False, "code": uuid, "uuid": uuid, "certificate": xml_data['Comprobante']['@Certificado'] if '@Certificado' in xml_data['Comprobante'] else '', "date": xml_data['Comprobante']['@Fecha'] if '@Fecha' in xml_data['Comprobante'] else '', "folio": xml_data['Comprobante']['@Folio'] if '@Folio' in xml_data['Comprobante'] else '', "payment_method": xml_data['Comprobante']['@FormaPago'] if '@FormaPago' in xml_data['Comprobante'] else '', "location": xml_data['Comprobante']['@LugarExpedicion'] if '@LugarExpedicion' in xml_data['Comprobante'] else '', "payment_type": xml_data['Comprobante']['@MetodoPago'] if '@MetodoPago' in xml_data['Comprobante'] else '', "currency": xml_data['Comprobante']['@Moneda'] if '@Moneda' in xml_data['Comprobante'] else '', "certificate_number": xml_data['Comprobante']['@NoCertificado'] if '@NoCertificado' in xml_data['Comprobante'] else '', "stamp": xml_data['Comprobante']['@Sello'] if '@Sello' in xml_data['Comprobante'] else '', "serie": xml_data['Comprobante']['@Serie'] if '@Serie' in xml_data['Comprobante'] else '', "subtotal": xml_data['Comprobante']['@SubTotal'] if '@SubTotal' in xml_data['Comprobante'] else '', "cfdi_type": cfdi_type, "total": xml_data['Comprobante']['@Total'] if '@Total' in xml_data['Comprobante'] else '', "version": xml_data['Comprobante']['@Version'] if '@Version' in xml_data['Comprobante'] else '', "payment_condition": xml_data['Comprobante']['@CondicionesDePago'] if '@CondicionesDePago' in xml_data['Comprobante'] else '', "emitter_id": emitter_partner_id.id, "receiver_id": recipient_partner_id.id, "move_id": move_id.id if move_id else False, "state": "draft" if not move_id else "done", "journal_id": journal_id.id if journal_id else False, "account_id": account_id.id if account_id else False, "fiscal_position_id": partner_id.property_account_position_id.id if partner_id.property_account_position_id else False, "tax_iva_id": partner_id.x_tax_iva_id.id if partner_id.x_tax_iva_id else False, "tax_isr_id": partner_id.x_tax_isr_id.id if partner_id.x_tax_isr_id else False, "payable_account_id": payable_account_id.id if payable_account_id else False, } data["name"] = data["serie"] + data["folio"] if data.get("serie") and data.get("folio") else data["serie"] if data.get("serie") else data.get("folio") data["delivery_number"] = delivery_number data["invoice_qty"] = invoice_qty return data # Se obtienen las lineas de CFDI def get_cfdi_lines(self, xml_data, cfdi_data, cfdi_type, emitter_partner_id, recipient_partner_id): if type(xml_data['Comprobante']['Conceptos']['Concepto']) is list: lines = xml_data['Comprobante']['Conceptos']['Concepto'] elif type(xml_data['Comprobante']['Conceptos']['Concepto']) is OrderedDict: lines = xml_data['Comprobante']['Conceptos'].items() else: lines = [xml_data['Comprobante']['Conceptos']['Concepto']] i = 1 data_list = [] for line_value in lines: if type(xml_data['Comprobante']['Conceptos']['Concepto']) is list: line = line_value elif type(xml_data['Comprobante']['Conceptos']['Concepto']) is OrderedDict: line = line_value[1] else: line = line_value if float(line['@Importe']) >= 0: uom_id = False if '@ClaveUnidad' in line: uom_unspsc_id = self.env['product.unspsc.code'].sudo().search([('code', '=', line['@ClaveUnidad'])]) if uom_unspsc_id: uom_id = self.env['uom.uom'].sudo().search([('unspsc_code_id', '=', uom_unspsc_id.id)], limit=1) if uom_id: unidad_id = uom_id.id else: unidad_id = False product_category_id = False if '@ClaveProdServ' in line: unspsc_product_category_id = self.env['product.unspsc.code'].sudo().search([('code', '=', line['@ClaveProdServ'])]) if unspsc_product_category_id: product_category_id = unspsc_product_category_id.id else: product_category_id = False data_line = { 'sequence': i, 'code_cfdi': cfdi_data.get("code"), 'date': cfdi_data.get("date"), 'folio': cfdi_data.get("folio"), 'payment_method': cfdi_data.get("payment_method"), 'location': cfdi_data.get("location"), 'payment_type': cfdi_data.get("payment_type"), 'currency': cfdi_data.get("currency"), 'certificate_number': cfdi_data.get("certificate_number"), 'stamp': cfdi_data.get("stamp"), 'serie': cfdi_data.get("serie"), 'subtotal': cfdi_data.get("subtotal"), 'cfdi_type': cfdi_data.get("cfdi_type"), 'total': cfdi_data.get("total"), 'version': cfdi_data.get("version"), 'emitter_id': cfdi_data.get("emitter_id"), 'receiver_id': cfdi_data.get("receiver_id"), 'product_code': line['@ClaveProdServ'] if '@ClaveProdServ' in line else '', 'no_identification': line['@NoIdentificacion'] if '@NoIdentificacion' in line else '', 'quantity': float(line['@Cantidad']), 'uom_code': line['@ClaveUnidad'] if '@ClaveUnidad' in line else '', 'uom': line['@Unidad'] if '@Unidad' in line else '', 'description': line['@Descripcion'], 'discount': float(line['@Descuento']) if '@Descuento' in line else 0, 'unit_price': float(line['@ValorUnitario']), 'uom_id': unidad_id, 'unspsc_product_category_id': product_category_id, 'amount': float(line['@Importe']), } data_line = self.search_cfdi_product(line, cfdi_type, data_line, emitter_partner_id, recipient_partner_id) data_line = self.get_cfdi_tax_lines(data_line, line, cfdi_type, recipient_partner_id) data_list.append(Command.create(data_line)) i += 1 if data_list: cfdi_data["concept_ids"] = data_list return cfdi_data #Obtener intereses de pago def get_payment_tax(self, cfdi_type, xml_data, cfdi_data): if cfdi_type in ['P', 'SP']: payment_tax_list = [] payments_list = xml_data['Comprobante']['Complemento']['pago20:Pagos']['pago20:Pago'] payments_list = self.get_data_iterable(payments_list) if payments_list: for payment_list in payments_list: payment_date = payment_list["@FechaPago"][:10], payments = payment_list["pago20:DoctoRelacionado"] payments = self.get_data_iterable(payments) if payments: for payment in payments: if payment.get("@ObjetoImpDR") and payment.get("@ObjetoImpDR") == '02': payment_taxes = payment["pago20:ImpuestosDR"]["pago20:TrasladosDR"]["pago20:TrasladoDR"] payment_taxes = self.get_data_iterable(payment_taxes) if payment_taxes: for payment_tax in payment_taxes: payment_tax_data = { "name": payment["@IdDocumento"], "serie": payment.get("@Serie"), "folio": payment.get("@Folio"), "currency": payment["@MonedaDR"], "currency_rate": payment["@EquivalenciaDR"], "paid_amount": payment["@ImpPagado"], "previous_balance": payment["@ImpSaldoAnt"], "current_balance": payment["@ImpSaldoInsoluto"], "subject_tax": payment["@ObjetoImpDR"], "payment_date": payment_date[0], "tax_amount": payment_tax.get("@ImporteDR"), "base_amount": payment_tax["@BaseDR"], "type_tax": payment_tax["@ImpuestoDR"], "base_tax": float(payment_tax["@TasaOCuotaDR"]) * 100 if payment_tax.get("@TasaOCuotaDR") else 0, "exempt_tax": True if payment_tax.get("@TipoFactorDR") == 'Exento' else False, } payment_tax_list.append(Command.create(payment_tax_data)) if payment_tax_list: cfdi_data["tax_paymnent_ids"] = payment_tax_list return cfdi_data def get_addenda_data(self, xml_data): try: addenda = xml_data["Comprobante"]["Addenda"] addenda_header = addenda["customized"]["NEW_ERA"]["Cabecera"] addenda_footer = addenda["customized"]["NEW_ERA"]["DatosPie"] return addenda_header["@DL_VBLEN"], addenda_footer["@SUMQTYEA"] except: return False, False #Obtener el producto del cfdi si existe def search_cfdi_product(self, line, cfdi_type, data_line, emitter_partner_id, recipient_partner_id): partner_id = recipient_partner_id if cfdi_type in ["I", "E"] else emitter_partner_id product_tmpl_id = self.env['product.template'] concept_id = self.env['account.cfdi.line'] account_line_id = self.env['account.move.line'] # Buscar el producto para poder relacionarlo a la linea del concepto if '@NoIdentificacion' in line: # Si el comprobante es una factura de proveedor o una nota de cliente del proveedor if cfdi_type in ['SI', 'SE']: # Se busca si es que se cuenta con una lista de precios a proveedor que identifique el producto mediante el codigo del producto product_supplier_id = self.env['product.supplierinfo'].sudo().search([('partner_id', '=', partner_id.id), ('product_code', '=', line['@NoIdentificacion'])], limit=1) if product_supplier_id: product_tmpl_id = product_supplier_id.product_tmpl_id if not product_tmpl_id and cfdi_type in ["I", "E"]: product_tmpl_id = self.env['product.template'].sudo().search([('default_code', '=', line['@NoIdentificacion'])], limit=1) # Identificar si se tiene registro de algun producto que coincida exactamente con la descripción del concepto if not product_tmpl_id and '@Descripcion' in line: if cfdi_type in ['SI', 'SE']: product_supplier_id = self.env['product.supplierinfo'].sudo().search([('partner_id', '=', partner_id.id), ('product_name', '=', line['@Descripcion'])], limit=1) if product_supplier_id: product_tmpl_id = product_supplier_id.product_tmpl_id elif cfdi_type in ['I', 'E']: product_tmpl_id = self.env['product.template'].sudo().search(['|', ("name", "=", line['@Descripcion']), ('default_code', '=', line['@Descripcion'])], limit=1) # Se busca el producto si ya ha habido lineas de producto con el mismo emisor y misma clave de producto o descripcion if not product_tmpl_id and '@ClaveProdServ' in line and partner_id: concept_id = self.env['account.cfdi.line'].sudo().search([("product_tmpl_id", "!=", False), ("emitter_id.id", "=", partner_id.id if cfdi_type in ["SI", "SE"] else emitter_partner_id.id), ("product_code", "=", line['@ClaveProdServ']), ("company_id.id", "=", self.env.company.id)], limit=1) if concept_id: product_tmpl_id = concept_id.product_tmpl_id else: account_line_id = self.env["account.move.line"].sudo().search([("product_id", "!=", False), ("product_id.unspsc_code_id.code", "=", line['@ClaveProdServ']), ("partner_id.id", "=", partner_id.id)], limit=1) if account_line_id: product_tmpl_id = account_line_id.product_id.product_tmpl_id # Identificar si existen lineas de conceptos que coincidan con la misma descripción y del mismo emisor if not product_tmpl_id and '@Descripcion' in line and partner_id: concept_id = self.env['account.cfdi.line'].sudo().search([("product_tmpl_id", "!=", False), ("emitter_id.id", "=", partner_id.id if cfdi_type in ["SI", "SE"] else emitter_partner_id.id), ("description", "=", line['@Descripcion']), ("company_id.id", "=", self.env.company.id)], limit=1) if concept_id: product_tmpl_id = concept_id.product_tmpl_id if not product_tmpl_id and partner_id and partner_id.x_product_tmpl_id: product_tmpl_id = partner_id.x_product_tmpl_id if product_tmpl_id: product_id = self.env['product.product'].sudo().search([('product_tmpl_id', '=', product_tmpl_id.id)], limit=1) data_line["product_id"] = product_id.id if product_id else False data_line["product_tmpl_id"] = product_tmpl_id.id categ_id = product_tmpl_id.categ_id if cfdi_type in ["SI", "SE"]: account_id = product_tmpl_id.property_account_expense_id if product_tmpl_id.property_account_expense_id else categ_id.property_account_expense_categ_id if categ_id else False elif cfdi_type in ["I", "E"]: account_id = product_tmpl_id.property_account_income_id if product_tmpl_id.property_account_income_id else categ_id.property_account_income_categ_id if categ_id else False data_line["account_id"] = concept_id.account_id.id if concept_id and concept_id.account_id else account_line_id.account_id.id if account_line_id else account_id.id if account_id else False return data_line #Obtener lineas de impuestos def get_cfdi_tax_lines(self, data_line, line, cfdi_type, recipient_partner_id): tax_list = [] if 'Impuestos' in line: j = 1 if 'Traslados' in line['Impuestos']: if type(line['Impuestos']['Traslados']['Traslado']) is list: impuestos = line['Impuestos']['Traslados']['Traslado'] elif type(line['Impuestos']['Traslados']['Traslado']) is OrderedDict: impuestos = line['Impuestos']['Traslados'].items() else: impuestos = [line['Impuestos']['Traslados']['Traslado']] for value_tax in impuestos: if type(line['Impuestos']['Traslados']['Traslado']) is list: impuesto = value_tax elif type(line['Impuestos']['Traslados']['Traslado']) is OrderedDict: impuesto = value_tax[1] else: impuesto = value_tax if str(impuesto['@TipoFactor']).strip().upper() == 'EXENTO': tasa_o_cuota = 0 importe = 0 else: tasa_o_cuota = float(impuesto['@TasaOCuota']) importe = float(impuesto['@Importe']) tax_data = { 'sequence': j, 'base': float(impuesto['@Base']), 'code': impuesto['@Impuesto'], 'factor_type': impuesto['@TipoFactor'], 'rate': tasa_o_cuota, 'amount': importe, 'tax_type': 'traslado' } j = j + 1 amount = tasa_o_cuota * 100 tax_domain = [('amount', '=', amount), ('company_id', '=', self.env.company.id)] t_id = self.env["account.tax"] if tax_data.get("impuesto") == '002': tax_domain.append(("name", "ilike", "iva")) elif tax_data.get("impuesto") == '003': tax_domain.append(("name", "ilike", "ieps")) elif tax_data.get("impuesto") == '001': tax_domain.append(("name", "ilike", "isr")) if cfdi_type in ['I', 'E']: tax_domain.append(('type_tax_use', '=', 'sale')) t_id = self.env['account.tax'].sudo().search(tax_domain, limit=1) elif cfdi_type in ['SI', 'SE']: tax_domain.append(('type_tax_use', '=', 'purchase')) t_id = self.env['account.tax'].sudo().search(tax_domain, limit=1) if t_id: tax_data["tax_id"] = t_id.id tax_list.append(Command.create(tax_data)) if 'Retenciones' in line['Impuestos']: if type(line['Impuestos']['Retenciones']['Retencion']) is list: retenciones = line['Impuestos']['Retenciones']['Retencion'] elif type(line['Impuestos']['Retenciones']['Retencion']) is OrderedDict: retenciones = line['Impuestos']['Retenciones'].items() else: retenciones = [line['Impuestos']['Retenciones']['Retencion']] for value_tax in retenciones: if type(line['Impuestos']['Retenciones']['Retencion']) is list: retencion = value_tax elif type(line['Impuestos']['Retenciones']['Retencion']) is OrderedDict: retencion = value_tax[1] else: retencion = value_tax tasa_o_cuota = float(retencion['@TasaOCuota']) importe = float(retencion['@Importe']) tax_data = { 'sequence': j, 'base': float(retencion['@Base']), 'code': retencion['@Impuesto'], 'factor_type': retencion['@TipoFactor'], 'rate': tasa_o_cuota, 'amount': importe, 'tax_type': 'retencion' } j = j + 1 amount = round(-(tasa_o_cuota * 100), 2) tax_domain = [('amount', '=', amount), ('company_id', '=', self.env.company.id)] t_id = self.env['account.tax'] if tax_data.get("impuesto") == '002': tax_domain.append(("name", "ilike", "iva")) elif tax_data.get("impuesto") == '003': tax_domain.append(("name", "ilike", "ieps")) elif tax_data.get("impuesto") == '001': tax_domain.append(("name", "ilike", "isr")) if cfdi_type in ['I', 'E']: tax_domain.append(('type_tax_use', '=', 'sale')) t_id = self.env['account.tax'].sudo().search(tax_domain, limit=1) elif cfdi_type in ['SI', 'SE']: tax_domain.append(('type_tax_use', '=', 'purchase')) t_id = self.env['account.tax'].sudo().search(tax_domain, limit=1) if t_id: tax_data["tax_id"] = t_id.id if not t_id and cfdi_type in ['SI', 'SE']: if tax_data.get("impuesto") == '001': if recipient_partner_id.tax_isr_id: tax_data["tax_id"] = recipient_partner_id.tax_isr_id.id elif tax_data.get("impuesto") == '002': if recipient_partner_id.tax_iva_id: tax_data["tax_id"] = recipient_partner_id.tax_iva_id.id tax_list.append(Command.create(tax_data)) if tax_list: data_line["tax_ids"] = tax_list return data_line # Obteniendo el diario contable dependiendo del tipo de comprobante def get_cfdi_journal_id(self, cfdi_type, emitter_partner_id, recipient_partner_id): journal_id = self.env['account.journal'].sudo().search([('x_cfdi_type', '=', cfdi_type), ("company_id.id", "=", self.env.company.id)], limit=1) # Buscamos si ya hubo un CFDI al mismo cliente y receptor y del mismo tipo y que tenga un diario colocado if not journal_id: cfdi_id = self.env["account.cfdi"].sudo().search( [("emitter_id.id", "=", emitter_partner_id.id), ("receiver_id.id", "=", recipient_partner_id.id), ("cfdi_type", "=", cfdi_type), ("journal_id", "!=", False)], limit=1) journal_id = cfdi_id.journal_id if cfdi_id else False account_id = journal_id.default_account_id if journal_id and journal_id.default_account_id else False if not account_id: partner_id = recipient_partner_id if cfdi_type in ["I", "E"] else emitter_partner_id account_id = self.get_expense_cfdi_account_id(cfdi_type, partner_id) return journal_id, account_id # Obtener la cuenta de gastos def get_expense_cfdi_account_id(self, cfdi_type, partner_id): if partner_id: expense_account_id = partner_id.x_account_expense_id # Buscamos un CFDI parecido para obtener la cuenta por pagar if not expense_account_id and cfdi_type in ['I', 'E']: cfdi_id = self.env["account.cfdi"].sudo().search( [("receiver_id.id", "=", partner_id.id), ("cfdi_type", "=", cfdi_type), ("account_id", "!=", False)], limit=1) expense_account_id = cfdi_id.payable_account_id if cfdi_id else False elif not expense_account_id and cfdi_type in ['SI', 'SE']: cfdi_id = self.env["account.cfdi"].sudo().search( [("emitter_id.id", "=", partner_id.id), ("cfdi_type", "=", cfdi_type), ("account_id", "!=", False)], limit=1) expense_account_id = cfdi_id.payable_account_id if cfdi_id else False return expense_account_id # Obtener cuenta por pagar dependiendo del tipo de comprobante def get_payable_cfdi_account_id(self, cfdi_type, partner_id): # Se busca si se tiene configurado la cuenta por pagar en la cuenta sino se busca por el contacto y por ultimo se busca un cfdi con el mismo contacto y tipo payable_account_id = self.env['account.account'].sudo().search([('x_cfdi_type', '=', cfdi_type), ("company_ids.id", "=", self.env.company.id)], limit=1) if not payable_account_id and partner_id: payable_account_id = partner_id.property_account_payable_id # Buscamos un CFDI parecido para obtener la cuenta por pagar if not payable_account_id and cfdi_type in ['I', 'E']: cfdi_id = self.env["account.cfdi"].sudo().search( [("receiver_id.id", "=", partner_id.id), ("cfdi_type", "=", cfdi_type), ("payable_account_id", "!=", False)], limit=1) payable_account_id = cfdi_id.payable_account_id if cfdi_id else False elif not payable_account_id and cfdi_type in ['SI', 'SE']: cfdi_id = self.env["account.cfdi"].sudo().search( [("emitter_id.id", "=", partner_id.id), ("cfdi_type", "=", cfdi_type), ("payable_account_id", "!=", False)], limit=1) payable_account_id = cfdi_id.payable_account_id if cfdi_id else False return payable_account_id # Obtener el emisor y receptor del cfdi def get_cfdi_partners(self, xml_data): emitter_id = self.env['res.partner'].sudo().search([('vat', '=', xml_data['Comprobante']['Emisor']['@Rfc'])], limit=1) receiver_id = self.env['res.partner'].sudo().search([('vat', '=', xml_data['Comprobante']['Receptor']['@Rfc'])], limit=1) if not emitter_id: emitter_id = self.create_cfdi_partner(xml_data, "Emisor") if not receiver_id: receiver_id = self.create_cfdi_partner(xml_data, "Receptor") return emitter_id, receiver_id # Crear los contactos del CFDI si es necesario def create_cfdi_partner(self, xml_data, partner_type): data = { 'name': xml_data['Comprobante'][partner_type]['@Nombre'], 'vat': xml_data['Comprobante'][partner_type]['@Rfc'], 'l10n_mx_edi_fiscal_regime': xml_data['Comprobante'][partner_type][ '@RegimenFiscal'] if partner_type == "Emisor" else xml_data['Comprobante'][partner_type][ '@RegimenFiscalReceptor'], 'country_id': self.env.company.country_id.id, 'company_type': 'company' } partner_id = self.env["res.partner"].create(data) return partner_id # -------------------------------------------------------------------------------------------------------------------- # ---------------------------------------------------Metodos de descarga masiva--------------------------------------------------------- def download_massive_pdf_zip(self): return self.download_massive_zip("PDF Masivos.zip", "pdf_id") def download_massive_xml_zip(self): return self.download_massive_zip("XML Masivos.zip", "attachment_id") def download_massive_zip(self, filename, field_name): self = self.with_user(1) zip_file = self.env['ir.attachment'].sudo().search([('name', '=', filename)], limit=1) if zip_file: zip_file.sudo().unlink() # Funcion para decodificar el archivo def isBase64_decodestring(s): try: decode_archive = base64.decodebytes(s) return decode_archive except Exception as e: raise ValidationError('Error:', + str(e)) tempdir_file = TemporaryDirectory() location_tempdir = tempdir_file.name # Creando ruta dinamica para poder guardar el archivo zip date_act = date.today() file_name = 'DescargaMasiva(Fecha de descarga' + " - " + str(date_act) + ")" file_name_zip = file_name + ".zip" zipfilepath = os.path.join(location_tempdir, file_name_zip) path_files = os.path.join(location_tempdir) # Creando zip for file in self.mapped(field_name): object_name = file.name ruta_ob = object_name object_handle = open(os.path.join(location_tempdir, ruta_ob), "wb") object_handle.write(isBase64_decodestring(file.datas)) object_handle.close() with ZipFile(zipfilepath, 'w') as zip_obj: for file in os.listdir(path_files): file_path = os.path.join(path_files, file) if file_path != zipfilepath: zip_obj.write(file_path, basename(file_path)) with open(zipfilepath, 'rb') as file_data: bytes_content = file_data.read() encoded = base64.b64encode(bytes_content) data = { 'name': filename, 'type': 'binary', 'datas': encoded, 'company_id': self.env.company.id } attachment = self.env['ir.attachment'].sudo().create(data) return self.download_zip(file_name_zip, attachment.id) def download_zip(self, filename, id_file): path = "/web/binary/download_document?" model = "ir.attachment" url = path + "model={}&id={}&filename={}".format(model, id_file, filename) return { 'type': 'ir.actions.act_url', 'url': url, 'target': 'self', } # --------------------------------------------------------------------------------------------------------------------- # ---------------------------------------------------Metodos de conciliación--------------------------------------------------------- def action_done(self): self = self.with_user(1) invoice_list = [] folio_names = [] for rec in self.filtered(lambda cfdi: not cfdi.move_id and cfdi.attachment_id and cfdi.cfdi_type in ["SI", "I","SE"]): cfdi_type = rec.cfdi_type partner_id = rec.emitter_id if cfdi_type in ["SI",'SE','SP'] else rec.receiver_id folio = f"{rec.serie}-{rec.folio}" if rec.serie and rec.folio else f"{rec.serie}" if rec.serie and not rec.folio else f"{rec.folio}" if not rec.serie and rec.folio else '' move_type = { "SI": "in_invoice", "I": "out_invoice", "SE": "in_refund", "E": "out_refund", } invoice_data = { 'move_type': move_type.get(cfdi_type), 'partner_id': partner_id.id, 'date': rec.date, 'invoice_date': rec.date, 'invoice_date_due': rec.date, 'fiscal_position_id': partner_id.property_account_position_id.id if partner_id.property_account_position_id else rec.fiscal_position_id.id, 'ref': folio, 'amount_total_signed': rec.total, 'amount_total': rec.total, 'journal_id': rec.journal_id.id, 'company_id': rec.company_id.id, 'cfdi_id': rec.id, 'x_uuid': rec.uuid, "x_delivery_number": rec.delivery_number, "x_invoice_qty": rec.invoice_qty } if folio != '' and not self.env["account.move"].sudo().search([("name", "=", folio), ("state", "=", "posted")], limit=1) and folio not in folio_names and cfdi_type == "I": invoice_data["name"] = folio folio_names.append(folio) i = 1 line_list = [] for line in rec.concept_ids: if float(line.amount) > 0: data_line = { 'sequence': i, 'name': line.description, 'quantity': float(line.quantity), 'product_uom_id': line.uom_id.id, 'discount': (float(line.discount) * 100) / float(line.amount), 'price_unit': float(line.unit_price), 'tax_ids': line.mapped("tax_ids.tax_id").ids, 'account_id': line.account_id.id if line.account_id else rec.account_id.id if rec.account_id else False, 'analytic_distribution': line.analytic_distribution if line.analytic_distribution else rec.analytic_distribution, 'partner_id': partner_id.id, } if line.product_tmpl_id: product_id = self.env['product.product'].sudo().search([('product_tmpl_id', '=', line.product_tmpl_id.id)], limit=1) data_line["product_id"] = product_id.id line_list.append(Command.create(data_line)) i = i + 1 invoice_data["invoice_line_ids"] = line_list invoice_list.append(invoice_data) invoice_ids = self.env['account.move'].with_context(check_move_validity=False).sudo().create(invoice_list) for invoice_id in invoice_ids: attachment_id = invoice_id.cfdi_id.attachment_id invoice_id.cfdi_id.write({ "move_id": invoice_id.id, "state": "done" }) attachment_id.write({ 'res_model': 'account.move', 'res_id': invoice_id.id, }) if invoice_id.move_type == "out_invoice": if attachment_id: id_edi_format = self.env['account.edi.format'].sudo().search([('name', '=', 'CFDI (4.0)')], limit=1) if not invoice_id.edi_document_ids: if id_edi_format: create_edi = self.env['account.edi.document'].sudo().create( {'edi_format_id': id_edi_format.id, 'attachment_id': attachment_id.id, 'state': 'sent', 'move_id': invoice_id.id, 'error': False}) invoice_id.write({'edi_document_ids': [(6, False, [create_edi.id])], 'l10n_mx_edi_cfdi_uuid': invoice_id.l10n_mx_edi_cfdi_uuid_cusom}) # Se colocaria la factura como timbrada elif id_edi_format and invoice_id.edi_document_ids: edi_format_id = invoice_id.edi_document_ids.filtered( lambda edi: edi.edi_format_id.id == id_edi_format.id) if edi_format_id: edi_format_id["attachment_id"] = attachment_id.id edi_format_id["error"] = False edi_format_id["state"] = "sent" else: data = { 'move_id': invoice_id.id, 'attachment_id': attachment_id.id, 'state': 'sent', 'error': False, 'edi_format_id': id_edi_format.id } self.env["account.edi.document"].sudo().create([data]) invoice_id.write({'l10n_mx_edi_cfdi_uuid': invoice_id.l10n_mx_edi_cfdi_uuid_cusom}) invoice_id.write({ "state": "posted" }) return { "name": _("Facturas"), "view_mode": "list,form", "res_model": "account.move", "type": "ir.actions.act_window", "target": "current", "domain": [('id', 'in', invoice_ids.ids)] } def action_unlink_move(self): for rec in self: if rec.move_id: rec.attachment_id.write({ 'res_model': 'account.cfdi', 'res_id': rec.id, }) rec.write({ "state": "draft", "move_id": False }) rec.move_id.write({ 'cfdi_id': False, 'x_uuid': False })