| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831 |
- from odoo import api, fields, models, _, Command
- from odoo.exceptions import ValidationError
- from collections import OrderedDict
- from datetime import date
- from os.path import basename
- from zipfile import ZipFile
- from tempfile import TemporaryDirectory
- import xmltodict
- import base64
- import logging
- import os.path
- _logger = logging.getLogger(__name__)
- class AccountCFDI(models.Model):
- _name = 'account.cfdi'
- _inherit = ['mail.thread', 'mail.activity.mixin']
- _description = 'Complemento CFDI'
- code = fields.Char(string="Código")
- name = fields.Char(string="Referencia")
- uuid = fields.Char(string="UUID")
- certificate = fields.Char(string="Certificado")
- certificate_number = fields.Char(string="Nro. de certificado")
- serie = fields.Char(string="Serie")
- folio = fields.Char(string="Folio")
- stamp = fields.Char(string="Sello")
- version = fields.Char(string="Versión")
- payment_condition = fields.Char(string="Condiciones de pago")
- currency = fields.Char(string="Moneda")
- payment_method = fields.Char(string="Forma de pago")
- location = fields.Char(string="Lugar de expedición")
- observations = fields.Text(string='Notas')
- attachment_id = fields.Many2one(comodel_name="ir.attachment", string="Archivo adjunto")
- pdf_id = fields.Many2one(comodel_name="ir.attachment", string="Representación impresa")
- company_id = fields.Many2one(comodel_name="res.company", string="Empresa", default=lambda self: self.env.company)
- emitter_id = fields.Many2one(comodel_name="res.partner", string="Emisor")
- receiver_id = fields.Many2one(comodel_name="res.partner", string="Receptor")
- move_id = fields.Many2one(comodel_name="account.move", string="Movimiento contable", copy=False)
- payable_account_id = fields.Many2one(comodel_name='account.account', string='Cuenta contable a pagar', tracking=True)
- account_id = fields.Many2one(comodel_name='account.account', string='Cuenta contable gasto', tracking=True)
- account_analytic_account_id = fields.Many2one(comodel_name='account.analytic.account', string='Cuenta analítica')
- fiscal_position_id = fields.Many2one(comodel_name='account.fiscal.position', string='Posición fiscal')
- journal_id = fields.Many2one(comodel_name='account.journal', string='Diario', tracking=True)
- tax_isr_id = fields.Many2one(comodel_name='account.tax', string='Retención ISR')
- tax_iva_id = fields.Many2one(comodel_name='account.tax', string='Retención IVA')
- analytic_distribution = fields.Json(string="Distribución analítica")
- analytic_precision = fields.Integer(string="Precisión analítica", store=False, default=lambda self: self.env['decimal.precision'].precision_get("Percentage Analytic"))
- concept_ids = fields.One2many("account.cfdi.line", "cfdi_id", string="Conceptos")
- tax_ids = fields.One2many('account.cfdi.tax', 'cfdi_id', string='Impuestos')
- date = fields.Date(string="Fecha")
- subtotal = fields.Float(string="Subtotal", copy=False)
- total = fields.Float(string="Total", copy=False)
- tax_total = fields.Float(string="Impuestos", compute="compute_tax_total", store=True)
- payment_type = fields.Selection(selection=[('PPD', 'PPD'), ('PUE', 'PUE')], string='Método de pago', readonly=True)
- cfdi_type = fields.Selection(string="Tipo de comprobante", selection=[
- ('I', 'Facturas de clientes'),
- ('SI', 'Facturas de proveedor'),
- ('E', 'Notas de crédito cliente'),
- ('SE', 'Notas de crédito proveedor'),
- ('P', 'REP de clientes'),
- ('SP', 'REP de proveedores'),
- ('N', 'Nóminas de empleados'),
- ('SN', 'Nómina propia'),
- ('T', 'Factura de traslado cliente'),
- ('ST', 'Factura de traslado proveedor')
- ], index=True, copy=False)
- state = fields.Selection(string="Estado", selection=[
- ('draft', 'Borrador'),
- ('done', 'Procesada'),
- ('cancel', 'Anulado')
- ], copy=False, default='draft', tracking=True)
- sat_state = fields.Selection(string="Estado SAT", selection=[
- ("valid", "Valido"),
- ("not_found", "No Encontrado"),
- ("undefined", "No sincronizado Aún"),
- ("none", "Estado no definido"),
- ("cancelled", "Cancelado")
- ], copy=False, tracking=True, default="valid")
- #Datos de addenda
- delivery_number = fields.Char(string="No. Entrega")
- invoice_qty = fields.Integer(string="Unidades facturadas")
- @api.depends("subtotal","total")
- def compute_tax_total(self):
- for rec in self:
- rec.tax_total = rec.total - rec.subtotal
- # ---------------------------------------------------Metodos de creación---------------------------------------------------------
- @api.model
- def create(self, vals_list):
- self = self.with_context(skip_invoice_sync=True, check_move_validity=False)
- res = super().create(vals_list)
- for cfdi in res.filtered(lambda move: move.move_id):
- cfdi.move_id.update({
- "cfdi_id": cfdi.id
- })
- return res
- def name_get(self):
- result = []
- for record in self:
- folio = f"[{record.serie}-{record.folio}] - " if record.serie and record.folio else f"[{record.serie}] - " if record.serie and not record.folio else f"[{record.folio}] - " if not record.serie and record.folio else ""
- name = f"{folio}" + record.uuid + ' - $' + str(record.total)
- result.append((record.id, name))
- return result
- # Creación de CFDIS
- def create_cfdis(self, attachment_data):
- self = self.with_context(skip_invoice_sync=True, check_move_validity=False)
- cfdi_list = []
- cfdi_ids = self.env["account.cfdi"]
- uuids = []
- for data in attachment_data:
- # Obtener la información para crear el cfdi
- data_xml = data.get("xml")
- xml_data = self.get_cfdi_data(data_xml.get("datas"))
- if xml_data.get("Comprobante"):
- cfdi_type = self.get_cfdi_type(xml_data)
- uuid = self.validation_cfdi(xml_data, cfdi_type)
- # Evitar UUID repetidos ya que el SAT manda en ocasiones el mismo XML dos veces
- if uuid and uuid not in uuids:
- uuids.append(uuid)
- emiiter_partner_id, recipient_partner_id = self.get_cfdi_partners(xml_data)
- move_id = self.validate_cfdi_move(xml_data, uuid, cfdi_type, emiiter_partner_id)
- cfdi_data = self.add_cfdi_data(xml_data, uuid, emiiter_partner_id, recipient_partner_id, move_id, cfdi_type, data)
- cfdi_data = self.get_cfdi_lines(xml_data, cfdi_data, cfdi_type, emiiter_partner_id, recipient_partner_id)
- # cfdi_data = self.get_payment_tax(cfdi_type, xml_data, cfdi_data)
- cfdi_list.append(cfdi_data)
- else:
- continue
- if cfdi_list:
- cfdi_ids = self.env["account.cfdi"].sudo().create(cfdi_list)
- return cfdi_ids
- # Obtener la información del xml
- def get_cfdi_data(self, xml_file):
- file_content = base64.b64decode(xml_file)
- if b'xmlns:schemaLocation' in file_content and b'xsi:schemaLocation' not in file_content:
- file_content = file_content.replace(b'xmlns:schemaLocation', b'xsi:schemaLocation')
- file_content = file_content.replace(b'cfdi:', b'')
- file_content = file_content.replace(b'tfd:', b'')
- try:
- xml_data = xmltodict.parse(file_content)
- return xml_data
- except Exception as e:
- _logger.info(e)
- return dict()
- # Obtener el tipo de comprobante que es el CFDI
- def get_cfdi_type(self, xml_data):
- cfdi_type = xml_data['Comprobante']['@TipoDeComprobante'] if '@TipoDeComprobante' in xml_data['Comprobante'] else 'I'
- if cfdi_type in ['I', 'E', 'P']:
- if xml_data['Comprobante']['Emisor']['@Rfc'] != self.env.company.vat:
- cfdi_type = 'S' + cfdi_type
- return cfdi_type
- # Validaciones antes de la creación del CFDI
- def validation_cfdi(self, xml_data, cfdi_type):
- if '@UUID' in xml_data['Comprobante']['Complemento']['TimbreFiscalDigital']:
- uuid = xml_data['Comprobante']['Complemento']['TimbreFiscalDigital']['@UUID']
- cfdi_id = self.env['account.cfdi'].sudo().search([('uuid', '=', uuid)], limit=1)
- if cfdi_id:
- _logger.info(f"El CFDI con UUID {uuid}, ya existe en la base de datos, se omitirá en el proceso.")
- return False
- # Evitar que se suban xml que no pertenecen a la empresa
- if "S" in cfdi_type:
- partner_rfc = xml_data['Comprobante']['Receptor']['@Rfc']
- else:
- partner_rfc = xml_data['Comprobante']['Emisor']['@Rfc']
- if partner_rfc != self.env.company.vat:
- return False
- else:
- return False
- return uuid
- # Buscar el asiento contable de odoo relacionado si es que existe
- def validate_cfdi_move(self, xml_data, uuid, cfdi_type, emitter_partner_id):
- move_id = self.env["account.move"]
- delivery_number, invoice_qty = self.get_addenda_data(xml_data)
- if cfdi_type in ["I", "E"]:
- move_id = move_id.sudo().search(
- [("move_type", "in", ["out_invoice", "out_refund"]), ("l10n_mx_edi_cfdi_uuid", "=", uuid),
- ("state", "=", "posted"), ("cfdi_id", "=", False)], limit=1)
- elif cfdi_type in ["SI", "SE"]:
- invoice_date = xml_data['Comprobante']['@Fecha'] if '@Fecha' in xml_data['Comprobante'] else ""
- folio = xml_data['Comprobante']['@Folio'] if '@Folio' in xml_data['Comprobante'] else ''
- move_id = self.env['account.move'].sudo().search(
- [('partner_id.vat', '=', emitter_partner_id.vat), ('ref', '=', folio),
- ('move_type', 'in', ['in_invoice', 'in_refund']), ("state", "=", "posted"), ("cfdi_id", "=", False),
- ("invoice_date", "=", invoice_date[:10])], limit=1)
- if not move_id:
- if '@Serie' in xml_data['Comprobante']:
- folio = xml_data['Comprobante']['@Serie'] + folio
- move_id = self.env['account.move'].sudo().search(
- [('partner_id.vat', '=', emitter_partner_id.vat), ('ref', '=', folio),
- ('move_type', 'in', ['in_invoice', 'in_refund']), ("cfdi_id", "=", False),
- ("state", "=", "posted"), ("invoice_date", "=", invoice_date[:10])], limit=1)
- if not move_id:
- move_id = self.env['account.move'].sudo().search(
- [('partner_id.vat', '=', emitter_partner_id.vat),
- ('move_type', 'in', ['in_invoice', 'in_refund']), ("state", "=", "posted"),
- ("cfdi_id", "=", False),
- ("invoice_date", "=", invoice_date[:10])], limit=1)
- if not move_id and delivery_number:
- move_id = self.env['account.move'].sudo().search(
- [('partner_id.vat', '=', emitter_partner_id.vat),
- ('move_type', 'in', ['in_invoice', 'in_refund']), ("state", "=", "posted"),
- ("cfdi_id", "=", False), ("x_delivery_number","!=",False),
- ("x_delivery_number", "=", delivery_number)], limit=1)
- if not move_id:
- amount_untaxed = xml_data['Comprobante']['@SubTotal'] if '@SubTotal' in xml_data['Comprobante'] else 0
- invoice_date = xml_data['Comprobante']['@Fecha'] if '@Fecha' in xml_data['Comprobante'] else ""
- move_id = self.env['account.move'].sudo().search(
- [('partner_id.vat', '=', emitter_partner_id.vat), ('move_type', 'in', ['in_invoice']),
- ("cfdi_id", "=", False), ("state", "=", "posted"), ("amount_untaxed", "=", amount_untaxed),
- ("invoice_date", "=", invoice_date[:10])], limit=1)
- return move_id
- # Preparar la informacion del CFDI
- def add_cfdi_data(self, xml_data, uuid, emitter_partner_id, recipient_partner_id, move_id, cfdi_type, attachment_data):
- journal_id, account_id = self.get_cfdi_journal_id(cfdi_type, emitter_partner_id, recipient_partner_id)
- partner_id = recipient_partner_id if cfdi_type in ["I", "E"] else emitter_partner_id
- payable_account_id = self.get_payable_cfdi_account_id(cfdi_type, partner_id)
- attachment_id = self.env["ir.attachment"].sudo().create(attachment_data.get("xml"))
- pdf_id = self.env["ir.attachment"].sudo().create(attachment_data.get("pdf")) if attachment_data.get("pdf") else False
- delivery_number, invoice_qty = self.get_addenda_data(xml_data)
-
- data = {
- "attachment_id": attachment_id.id,
- "pdf_id": pdf_id.id if pdf_id else False,
- "code": uuid,
- "uuid": uuid,
- "certificate": xml_data['Comprobante']['@Certificado'] if '@Certificado' in xml_data['Comprobante'] else '',
- "date": xml_data['Comprobante']['@Fecha'] if '@Fecha' in xml_data['Comprobante'] else '',
- "folio": xml_data['Comprobante']['@Folio'] if '@Folio' in xml_data['Comprobante'] else '',
- "payment_method": xml_data['Comprobante']['@FormaPago'] if '@FormaPago' in xml_data['Comprobante'] else '',
- "location": xml_data['Comprobante']['@LugarExpedicion'] if '@LugarExpedicion' in xml_data['Comprobante'] else '',
- "payment_type": xml_data['Comprobante']['@MetodoPago'] if '@MetodoPago' in xml_data['Comprobante'] else '',
- "currency": xml_data['Comprobante']['@Moneda'] if '@Moneda' in xml_data['Comprobante'] else '',
- "certificate_number": xml_data['Comprobante']['@NoCertificado'] if '@NoCertificado' in xml_data['Comprobante'] else '',
- "stamp": xml_data['Comprobante']['@Sello'] if '@Sello' in xml_data['Comprobante'] else '',
- "serie": xml_data['Comprobante']['@Serie'] if '@Serie' in xml_data['Comprobante'] else '',
- "subtotal": xml_data['Comprobante']['@SubTotal'] if '@SubTotal' in xml_data['Comprobante'] else '',
- "cfdi_type": cfdi_type,
- "total": xml_data['Comprobante']['@Total'] if '@Total' in xml_data['Comprobante'] else '',
- "version": xml_data['Comprobante']['@Version'] if '@Version' in xml_data['Comprobante'] else '',
- "payment_condition": xml_data['Comprobante']['@CondicionesDePago'] if '@CondicionesDePago' in xml_data['Comprobante'] else '',
- "emitter_id": emitter_partner_id.id,
- "receiver_id": recipient_partner_id.id,
- "move_id": move_id.id if move_id else False,
- "state": "draft" if not move_id else "done",
- "journal_id": journal_id.id if journal_id else False,
- "account_id": account_id.id if account_id else False,
- "fiscal_position_id": partner_id.property_account_position_id.id if partner_id.property_account_position_id else False,
- "tax_iva_id": partner_id.x_tax_iva_id.id if partner_id.x_tax_iva_id else False,
- "tax_isr_id": partner_id.x_tax_isr_id.id if partner_id.x_tax_isr_id else False,
- "payable_account_id": payable_account_id.id if payable_account_id else False,
- }
- data["name"] = data["serie"] + data["folio"] if data.get("serie") and data.get("folio") else data["serie"] if data.get("serie") else data.get("folio")
- data["delivery_number"] = delivery_number
- data["invoice_qty"] = invoice_qty
- return data
- # Se obtienen las lineas de CFDI
- def get_cfdi_lines(self, xml_data, cfdi_data, cfdi_type, emitter_partner_id, recipient_partner_id):
- if type(xml_data['Comprobante']['Conceptos']['Concepto']) is list:
- lines = xml_data['Comprobante']['Conceptos']['Concepto']
- elif type(xml_data['Comprobante']['Conceptos']['Concepto']) is OrderedDict:
- lines = xml_data['Comprobante']['Conceptos'].items()
- else:
- lines = [xml_data['Comprobante']['Conceptos']['Concepto']]
- i = 1
- data_list = []
- for line_value in lines:
- if type(xml_data['Comprobante']['Conceptos']['Concepto']) is list:
- line = line_value
- elif type(xml_data['Comprobante']['Conceptos']['Concepto']) is OrderedDict:
- line = line_value[1]
- else:
- line = line_value
- if float(line['@Importe']) >= 0:
- uom_id = False
- if '@ClaveUnidad' in line:
- uom_unspsc_id = self.env['product.unspsc.code'].sudo().search([('code', '=', line['@ClaveUnidad'])])
- if uom_unspsc_id:
- uom_id = self.env['uom.uom'].sudo().search([('unspsc_code_id', '=', uom_unspsc_id.id)], limit=1)
- if uom_id:
- unidad_id = uom_id.id
- else:
- unidad_id = False
- product_category_id = False
- if '@ClaveProdServ' in line:
- unspsc_product_category_id = self.env['product.unspsc.code'].sudo().search([('code', '=', line['@ClaveProdServ'])])
- if unspsc_product_category_id:
- product_category_id = unspsc_product_category_id.id
- else:
- product_category_id = False
- data_line = {
- 'sequence': i,
- 'code_cfdi': cfdi_data.get("code"),
- 'date': cfdi_data.get("date"),
- 'folio': cfdi_data.get("folio"),
- 'payment_method': cfdi_data.get("payment_method"),
- 'location': cfdi_data.get("location"),
- 'payment_type': cfdi_data.get("payment_type"),
- 'currency': cfdi_data.get("currency"),
- 'certificate_number': cfdi_data.get("certificate_number"),
- 'stamp': cfdi_data.get("stamp"),
- 'serie': cfdi_data.get("serie"),
- 'subtotal': cfdi_data.get("subtotal"),
- 'cfdi_type': cfdi_data.get("cfdi_type"),
- 'total': cfdi_data.get("total"),
- 'version': cfdi_data.get("version"),
- 'emitter_id': cfdi_data.get("emitter_id"),
- 'receiver_id': cfdi_data.get("receiver_id"),
- 'product_code': line['@ClaveProdServ'] if '@ClaveProdServ' in line else '',
- 'no_identification': line['@NoIdentificacion'] if '@NoIdentificacion' in line else '',
- 'quantity': float(line['@Cantidad']),
- 'uom_code': line['@ClaveUnidad'] if '@ClaveUnidad' in line else '',
- 'uom': line['@Unidad'] if '@Unidad' in line else '',
- 'description': line['@Descripcion'],
- 'discount': float(line['@Descuento']) if '@Descuento' in line else 0,
- 'unit_price': float(line['@ValorUnitario']),
- 'uom_id': unidad_id,
- 'unspsc_product_category_id': product_category_id,
- 'amount': float(line['@Importe']),
- }
- data_line = self.search_cfdi_product(line, cfdi_type, data_line, emitter_partner_id, recipient_partner_id)
- data_line = self.get_cfdi_tax_lines(data_line, line, cfdi_type, recipient_partner_id)
- data_list.append(Command.create(data_line))
- i += 1
- if data_list:
- cfdi_data["concept_ids"] = data_list
- return cfdi_data
- #Obtener intereses de pago
- def get_payment_tax(self, cfdi_type, xml_data, cfdi_data):
- if cfdi_type in ['P', 'SP']:
- payment_tax_list = []
- payments_list = xml_data['Comprobante']['Complemento']['pago20:Pagos']['pago20:Pago']
- payments_list = self.get_data_iterable(payments_list)
- if payments_list:
- for payment_list in payments_list:
- payment_date = payment_list["@FechaPago"][:10],
- payments = payment_list["pago20:DoctoRelacionado"]
- payments = self.get_data_iterable(payments)
- if payments:
- for payment in payments:
- if payment.get("@ObjetoImpDR") and payment.get("@ObjetoImpDR") == '02':
- payment_taxes = payment["pago20:ImpuestosDR"]["pago20:TrasladosDR"]["pago20:TrasladoDR"]
- payment_taxes = self.get_data_iterable(payment_taxes)
- if payment_taxes:
- for payment_tax in payment_taxes:
- payment_tax_data = {
- "name": payment["@IdDocumento"],
- "serie": payment.get("@Serie"),
- "folio": payment.get("@Folio"),
- "currency": payment["@MonedaDR"],
- "currency_rate": payment["@EquivalenciaDR"],
- "paid_amount": payment["@ImpPagado"],
- "previous_balance": payment["@ImpSaldoAnt"],
- "current_balance": payment["@ImpSaldoInsoluto"],
- "subject_tax": payment["@ObjetoImpDR"],
- "payment_date": payment_date[0],
- "tax_amount": payment_tax.get("@ImporteDR"),
- "base_amount": payment_tax["@BaseDR"],
- "type_tax": payment_tax["@ImpuestoDR"],
- "base_tax": float(payment_tax["@TasaOCuotaDR"]) * 100 if payment_tax.get("@TasaOCuotaDR") else 0,
- "exempt_tax": True if payment_tax.get("@TipoFactorDR") == 'Exento' else False,
- }
- payment_tax_list.append(Command.create(payment_tax_data))
- if payment_tax_list:
- cfdi_data["tax_paymnent_ids"] = payment_tax_list
- return cfdi_data
- def get_addenda_data(self, xml_data):
- try:
- addenda = xml_data["Comprobante"]["Addenda"]
- addenda_header = addenda["customized"]["NEW_ERA"]["Cabecera"]
- addenda_footer = addenda["customized"]["NEW_ERA"]["DatosPie"]
- return addenda_header["@DL_VBLEN"], addenda_footer["@SUMQTYEA"]
- except:
- return False, False
-
- #Obtener el producto del cfdi si existe
- def search_cfdi_product(self, line, cfdi_type, data_line, emitter_partner_id, recipient_partner_id):
- partner_id = recipient_partner_id if cfdi_type in ["I", "E"] else emitter_partner_id
- product_tmpl_id = self.env['product.template']
- concept_id = self.env['account.cfdi.line']
- account_line_id = self.env['account.move.line']
- # Buscar el producto para poder relacionarlo a la linea del concepto
- if '@NoIdentificacion' in line:
- # Si el comprobante es una factura de proveedor o una nota de cliente del proveedor
- if cfdi_type in ['SI', 'SE']:
- # Se busca si es que se cuenta con una lista de precios a proveedor que identifique el producto mediante el codigo del producto
- product_supplier_id = self.env['product.supplierinfo'].sudo().search([('partner_id', '=', partner_id.id), ('product_code', '=', line['@NoIdentificacion'])], limit=1)
- if product_supplier_id:
- product_tmpl_id = product_supplier_id.product_tmpl_id
- if not product_tmpl_id and cfdi_type in ["I", "E"]:
- product_tmpl_id = self.env['product.template'].sudo().search([('default_code', '=', line['@NoIdentificacion'])], limit=1)
- # Identificar si se tiene registro de algun producto que coincida exactamente con la descripción del concepto
- if not product_tmpl_id and '@Descripcion' in line:
- if cfdi_type in ['SI', 'SE']:
- product_supplier_id = self.env['product.supplierinfo'].sudo().search([('partner_id', '=', partner_id.id), ('product_name', '=', line['@Descripcion'])], limit=1)
- if product_supplier_id:
- product_tmpl_id = product_supplier_id.product_tmpl_id
- elif cfdi_type in ['I', 'E']:
- product_tmpl_id = self.env['product.template'].sudo().search(['|', ("name", "=", line['@Descripcion']), ('default_code', '=', line['@Descripcion'])], limit=1)
- # Se busca el producto si ya ha habido lineas de producto con el mismo emisor y misma clave de producto o descripcion
- if not product_tmpl_id and '@ClaveProdServ' in line and partner_id:
- concept_id = self.env['account.cfdi.line'].sudo().search([("product_tmpl_id", "!=", False), ("emitter_id.id", "=", partner_id.id if cfdi_type in ["SI", "SE"] else emitter_partner_id.id), ("product_code", "=", line['@ClaveProdServ']), ("company_id.id", "=", self.env.company.id)], limit=1)
- if concept_id:
- product_tmpl_id = concept_id.product_tmpl_id
- else:
- account_line_id = self.env["account.move.line"].sudo().search([("product_id", "!=", False), ("product_id.unspsc_code_id.code", "=", line['@ClaveProdServ']), ("partner_id.id", "=", partner_id.id)], limit=1)
- if account_line_id:
- product_tmpl_id = account_line_id.product_id.product_tmpl_id
- # Identificar si existen lineas de conceptos que coincidan con la misma descripción y del mismo emisor
- if not product_tmpl_id and '@Descripcion' in line and partner_id:
- concept_id = self.env['account.cfdi.line'].sudo().search([("product_tmpl_id", "!=", False), ("emitter_id.id", "=", partner_id.id if cfdi_type in ["SI", "SE"] else emitter_partner_id.id), ("description", "=", line['@Descripcion']), ("company_id.id", "=", self.env.company.id)], limit=1)
- if concept_id:
- product_tmpl_id = concept_id.product_tmpl_id
- if not product_tmpl_id and partner_id and partner_id.x_product_tmpl_id:
- product_tmpl_id = partner_id.x_product_tmpl_id
- if product_tmpl_id:
- product_id = self.env['product.product'].sudo().search([('product_tmpl_id', '=', product_tmpl_id.id)], limit=1)
- data_line["product_id"] = product_id.id if product_id else False
- data_line["product_tmpl_id"] = product_tmpl_id.id
- categ_id = product_tmpl_id.categ_id
- if cfdi_type in ["SI", "SE"]:
- account_id = product_tmpl_id.property_account_expense_id if product_tmpl_id.property_account_expense_id else categ_id.property_account_expense_categ_id if categ_id else False
- elif cfdi_type in ["I", "E"]:
- account_id = product_tmpl_id.property_account_income_id if product_tmpl_id.property_account_income_id else categ_id.property_account_income_categ_id if categ_id else False
- data_line["account_id"] = concept_id.account_id.id if concept_id and concept_id.account_id else account_line_id.account_id.id if account_line_id else account_id.id if account_id else False
- return data_line
- #Obtener lineas de impuestos
- def get_cfdi_tax_lines(self, data_line, line, cfdi_type, recipient_partner_id):
- tax_list = []
- if 'Impuestos' in line:
- j = 1
- if 'Traslados' in line['Impuestos']:
- if type(line['Impuestos']['Traslados']['Traslado']) is list:
- impuestos = line['Impuestos']['Traslados']['Traslado']
- elif type(line['Impuestos']['Traslados']['Traslado']) is OrderedDict:
- impuestos = line['Impuestos']['Traslados'].items()
- else:
- impuestos = [line['Impuestos']['Traslados']['Traslado']]
- for value_tax in impuestos:
- if type(line['Impuestos']['Traslados']['Traslado']) is list:
- impuesto = value_tax
- elif type(line['Impuestos']['Traslados']['Traslado']) is OrderedDict:
- impuesto = value_tax[1]
- else:
- impuesto = value_tax
- if str(impuesto['@TipoFactor']).strip().upper() == 'EXENTO':
- tasa_o_cuota = 0
- importe = 0
- else:
- tasa_o_cuota = float(impuesto['@TasaOCuota'])
- importe = float(impuesto['@Importe'])
- tax_data = {
- 'sequence': j,
- 'base': float(impuesto['@Base']),
- 'code': impuesto['@Impuesto'],
- 'factor_type': impuesto['@TipoFactor'],
- 'rate': tasa_o_cuota,
- 'amount': importe,
- 'tax_type': 'traslado'
- }
- j = j + 1
- amount = tasa_o_cuota * 100
- tax_domain = [('amount', '=', amount), ('company_id', '=', self.env.company.id)]
- t_id = self.env["account.tax"]
- if tax_data.get("impuesto") == '002':
- tax_domain.append(("name", "ilike", "iva"))
- elif tax_data.get("impuesto") == '003':
- tax_domain.append(("name", "ilike", "ieps"))
- elif tax_data.get("impuesto") == '001':
- tax_domain.append(("name", "ilike", "isr"))
- if cfdi_type in ['I', 'E']:
- tax_domain.append(('type_tax_use', '=', 'sale'))
- t_id = self.env['account.tax'].sudo().search(tax_domain, limit=1)
- elif cfdi_type in ['SI', 'SE']:
- tax_domain.append(('type_tax_use', '=', 'purchase'))
- t_id = self.env['account.tax'].sudo().search(tax_domain, limit=1)
- if t_id:
- tax_data["tax_id"] = t_id.id
- tax_list.append(Command.create(tax_data))
- if 'Retenciones' in line['Impuestos']:
- if type(line['Impuestos']['Retenciones']['Retencion']) is list:
- retenciones = line['Impuestos']['Retenciones']['Retencion']
- elif type(line['Impuestos']['Retenciones']['Retencion']) is OrderedDict:
- retenciones = line['Impuestos']['Retenciones'].items()
- else:
- retenciones = [line['Impuestos']['Retenciones']['Retencion']]
- for value_tax in retenciones:
- if type(line['Impuestos']['Retenciones']['Retencion']) is list:
- retencion = value_tax
- elif type(line['Impuestos']['Retenciones']['Retencion']) is OrderedDict:
- retencion = value_tax[1]
- else:
- retencion = value_tax
- tasa_o_cuota = float(retencion['@TasaOCuota'])
- importe = float(retencion['@Importe'])
- tax_data = {
- 'sequence': j,
- 'base': float(retencion['@Base']),
- 'code': retencion['@Impuesto'],
- 'factor_type': retencion['@TipoFactor'],
- 'rate': tasa_o_cuota,
- 'amount': importe,
- 'tax_type': 'retencion'
- }
- j = j + 1
- amount = round(-(tasa_o_cuota * 100), 2)
- tax_domain = [('amount', '=', amount), ('company_id', '=', self.env.company.id)]
- t_id = self.env['account.tax']
- if tax_data.get("impuesto") == '002':
- tax_domain.append(("name", "ilike", "iva"))
- elif tax_data.get("impuesto") == '003':
- tax_domain.append(("name", "ilike", "ieps"))
- elif tax_data.get("impuesto") == '001':
- tax_domain.append(("name", "ilike", "isr"))
- if cfdi_type in ['I', 'E']:
- tax_domain.append(('type_tax_use', '=', 'sale'))
- t_id = self.env['account.tax'].sudo().search(tax_domain, limit=1)
- elif cfdi_type in ['SI', 'SE']:
- tax_domain.append(('type_tax_use', '=', 'purchase'))
- t_id = self.env['account.tax'].sudo().search(tax_domain, limit=1)
- if t_id:
- tax_data["tax_id"] = t_id.id
- if not t_id and cfdi_type in ['SI', 'SE']:
- if tax_data.get("impuesto") == '001':
- if recipient_partner_id.tax_isr_id:
- tax_data["tax_id"] = recipient_partner_id.tax_isr_id.id
- elif tax_data.get("impuesto") == '002':
- if recipient_partner_id.tax_iva_id:
- tax_data["tax_id"] = recipient_partner_id.tax_iva_id.id
- tax_list.append(Command.create(tax_data))
- if tax_list:
- data_line["tax_ids"] = tax_list
- return data_line
- # Obteniendo el diario contable dependiendo del tipo de comprobante
- def get_cfdi_journal_id(self, cfdi_type, emitter_partner_id, recipient_partner_id):
- journal_id = self.env['account.journal'].sudo().search([('x_cfdi_type', '=', cfdi_type), ("company_id.id", "=", self.env.company.id)], limit=1)
- # Buscamos si ya hubo un CFDI al mismo cliente y receptor y del mismo tipo y que tenga un diario colocado
- if not journal_id:
- cfdi_id = self.env["account.cfdi"].sudo().search(
- [("emitter_id.id", "=", emitter_partner_id.id),
- ("receiver_id.id", "=", recipient_partner_id.id), ("cfdi_type", "=", cfdi_type),
- ("journal_id", "!=", False)], limit=1)
- journal_id = cfdi_id.journal_id if cfdi_id else False
- account_id = journal_id.default_account_id if journal_id and journal_id.default_account_id else False
- if not account_id:
- partner_id = recipient_partner_id if cfdi_type in ["I", "E"] else emitter_partner_id
- account_id = self.get_expense_cfdi_account_id(cfdi_type, partner_id)
- return journal_id, account_id
- # Obtener la cuenta de gastos
- def get_expense_cfdi_account_id(self, cfdi_type, partner_id):
- if partner_id:
- expense_account_id = partner_id.x_account_expense_id
- # Buscamos un CFDI parecido para obtener la cuenta por pagar
- if not expense_account_id and cfdi_type in ['I', 'E']:
- cfdi_id = self.env["account.cfdi"].sudo().search(
- [("receiver_id.id", "=", partner_id.id), ("cfdi_type", "=", cfdi_type),
- ("account_id", "!=", False)], limit=1)
- expense_account_id = cfdi_id.payable_account_id if cfdi_id else False
- elif not expense_account_id and cfdi_type in ['SI', 'SE']:
- cfdi_id = self.env["account.cfdi"].sudo().search(
- [("emitter_id.id", "=", partner_id.id), ("cfdi_type", "=", cfdi_type),
- ("account_id", "!=", False)], limit=1)
- expense_account_id = cfdi_id.payable_account_id if cfdi_id else False
- return expense_account_id
- # Obtener cuenta por pagar dependiendo del tipo de comprobante
- def get_payable_cfdi_account_id(self, cfdi_type, partner_id):
- # Se busca si se tiene configurado la cuenta por pagar en la cuenta sino se busca por el contacto y por ultimo se busca un cfdi con el mismo contacto y tipo
- payable_account_id = self.env['account.account'].sudo().search([('x_cfdi_type', '=', cfdi_type), ("company_ids.id", "=", self.env.company.id)], limit=1)
- if not payable_account_id and partner_id:
- payable_account_id = partner_id.property_account_payable_id
- # Buscamos un CFDI parecido para obtener la cuenta por pagar
- if not payable_account_id and cfdi_type in ['I', 'E']:
- cfdi_id = self.env["account.cfdi"].sudo().search(
- [("receiver_id.id", "=", partner_id.id), ("cfdi_type", "=", cfdi_type),
- ("payable_account_id", "!=", False)], limit=1)
- payable_account_id = cfdi_id.payable_account_id if cfdi_id else False
- elif not payable_account_id and cfdi_type in ['SI', 'SE']:
- cfdi_id = self.env["account.cfdi"].sudo().search(
- [("emitter_id.id", "=", partner_id.id), ("cfdi_type", "=", cfdi_type),
- ("payable_account_id", "!=", False)], limit=1)
- payable_account_id = cfdi_id.payable_account_id if cfdi_id else False
- return payable_account_id
- # Obtener el emisor y receptor del cfdi
- def get_cfdi_partners(self, xml_data):
- emitter_id = self.env['res.partner'].sudo().search([('vat', '=', xml_data['Comprobante']['Emisor']['@Rfc'])], limit=1)
- receiver_id = self.env['res.partner'].sudo().search([('vat', '=', xml_data['Comprobante']['Receptor']['@Rfc'])], limit=1)
- if not emitter_id:
- emitter_id = self.create_cfdi_partner(xml_data, "Emisor")
- if not receiver_id:
- receiver_id = self.create_cfdi_partner(xml_data, "Receptor")
- return emitter_id, receiver_id
- # Crear los contactos del CFDI si es necesario
- def create_cfdi_partner(self, xml_data, partner_type):
- data = {
- 'name': xml_data['Comprobante'][partner_type]['@Nombre'],
- 'vat': xml_data['Comprobante'][partner_type]['@Rfc'],
- 'l10n_mx_edi_fiscal_regime': xml_data['Comprobante'][partner_type][
- '@RegimenFiscal'] if partner_type == "Emisor" else xml_data['Comprobante'][partner_type][
- '@RegimenFiscalReceptor'],
- 'country_id': self.env.company.country_id.id,
- 'company_type': 'company'
- }
- partner_id = self.env["res.partner"].create(data)
- return partner_id
- # --------------------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------Metodos de descarga masiva---------------------------------------------------------
- def download_massive_pdf_zip(self):
- return self.download_massive_zip("PDF Masivos.zip", "pdf_id")
- def download_massive_xml_zip(self):
- return self.download_massive_zip("XML Masivos.zip", "attachment_id")
- def download_massive_zip(self, filename, field_name):
- self = self.with_user(1)
- zip_file = self.env['ir.attachment'].sudo().search([('name', '=', filename)], limit=1)
- if zip_file:
- zip_file.sudo().unlink()
- # Funcion para decodificar el archivo
- def isBase64_decodestring(s):
- try:
- decode_archive = base64.decodebytes(s)
- return decode_archive
- except Exception as e:
- raise ValidationError('Error:', + str(e))
- tempdir_file = TemporaryDirectory()
- location_tempdir = tempdir_file.name
- # Creando ruta dinamica para poder guardar el archivo zip
- date_act = date.today()
- file_name = 'DescargaMasiva(Fecha de descarga' + " - " + str(date_act) + ")"
- file_name_zip = file_name + ".zip"
- zipfilepath = os.path.join(location_tempdir, file_name_zip)
- path_files = os.path.join(location_tempdir)
- # Creando zip
- for file in self.mapped(field_name):
- object_name = file.name
- ruta_ob = object_name
- object_handle = open(os.path.join(location_tempdir, ruta_ob), "wb")
- object_handle.write(isBase64_decodestring(file.datas))
- object_handle.close()
- with ZipFile(zipfilepath, 'w') as zip_obj:
- for file in os.listdir(path_files):
- file_path = os.path.join(path_files, file)
- if file_path != zipfilepath:
- zip_obj.write(file_path, basename(file_path))
- with open(zipfilepath, 'rb') as file_data:
- bytes_content = file_data.read()
- encoded = base64.b64encode(bytes_content)
- data = {
- 'name': filename,
- 'type': 'binary',
- 'datas': encoded,
- 'company_id': self.env.company.id
- }
- attachment = self.env['ir.attachment'].sudo().create(data)
- return self.download_zip(file_name_zip, attachment.id)
- def download_zip(self, filename, id_file):
- path = "/web/binary/download_document?"
- model = "ir.attachment"
- url = path + "model={}&id={}&filename={}".format(model, id_file, filename)
- return {
- 'type': 'ir.actions.act_url',
- 'url': url,
- 'target': 'self',
- }
- # ---------------------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------Metodos de conciliación---------------------------------------------------------
- def action_done(self):
- self = self.with_user(1)
- invoice_list = []
- folio_names = []
- for rec in self.filtered(lambda cfdi: not cfdi.move_id and cfdi.attachment_id and cfdi.cfdi_type in ["SI", "I","SE"]):
- cfdi_type = rec.cfdi_type
- partner_id = rec.emitter_id if cfdi_type in ["SI",'SE','SP'] else rec.receiver_id
- folio = f"{rec.serie}-{rec.folio}" if rec.serie and rec.folio else f"{rec.serie}" if rec.serie and not rec.folio else f"{rec.folio}" if not rec.serie and rec.folio else ''
- move_type = {
- "SI": "in_invoice",
- "I": "out_invoice",
- "SE": "in_refund",
- "E": "out_refund",
- }
- invoice_data = {
- 'move_type': move_type.get(cfdi_type),
- 'partner_id': partner_id.id,
- 'date': rec.date,
- 'invoice_date': rec.date,
- 'invoice_date_due': rec.date,
- 'fiscal_position_id': partner_id.property_account_position_id.id if partner_id.property_account_position_id else rec.fiscal_position_id.id,
- 'ref': folio,
- 'amount_total_signed': rec.total,
- 'amount_total': rec.total,
- 'journal_id': rec.journal_id.id,
- 'company_id': rec.company_id.id,
- 'cfdi_id': rec.id,
- 'x_uuid': rec.uuid,
- "x_delivery_number": rec.delivery_number,
- "x_invoice_qty": rec.invoice_qty
- }
- if folio != '' and not self.env["account.move"].sudo().search([("name", "=", folio), ("state", "=", "posted")], limit=1) and folio not in folio_names and cfdi_type == "I":
- invoice_data["name"] = folio
- folio_names.append(folio)
- i = 1
- line_list = []
- for line in rec.concept_ids:
- if float(line.amount) > 0:
- data_line = {
- 'sequence': i,
- 'name': line.description,
- 'quantity': float(line.quantity),
- 'product_uom_id': line.uom_id.id,
- 'discount': (float(line.discount) * 100) / float(line.amount),
- 'price_unit': float(line.unit_price),
- 'tax_ids': line.mapped("tax_ids.tax_id").ids,
- 'account_id': line.account_id.id if line.account_id else rec.account_id.id if rec.account_id else False,
- 'analytic_distribution': line.analytic_distribution if line.analytic_distribution else rec.analytic_distribution,
- 'partner_id': partner_id.id,
- }
- if line.product_tmpl_id:
- product_id = self.env['product.product'].sudo().search([('product_tmpl_id', '=', line.product_tmpl_id.id)], limit=1)
- data_line["product_id"] = product_id.id
- line_list.append(Command.create(data_line))
- i = i + 1
- invoice_data["invoice_line_ids"] = line_list
- invoice_list.append(invoice_data)
- invoice_ids = self.env['account.move'].with_context(check_move_validity=False).sudo().create(invoice_list)
- for invoice_id in invoice_ids:
- attachment_id = invoice_id.cfdi_id.attachment_id
- invoice_id.cfdi_id.write({
- "move_id": invoice_id.id,
- "state": "done"
- })
- attachment_id.write({
- 'res_model': 'account.move',
- 'res_id': invoice_id.id,
- })
- if invoice_id.move_type == "out_invoice":
- if attachment_id:
- id_edi_format = self.env['account.edi.format'].sudo().search([('name', '=', 'CFDI (4.0)')], limit=1)
- if not invoice_id.edi_document_ids:
- if id_edi_format:
- create_edi = self.env['account.edi.document'].sudo().create(
- {'edi_format_id': id_edi_format.id, 'attachment_id': attachment_id.id, 'state': 'sent',
- 'move_id': invoice_id.id, 'error': False})
- invoice_id.write({'edi_document_ids': [(6, False, [create_edi.id])], 'l10n_mx_edi_cfdi_uuid': invoice_id.l10n_mx_edi_cfdi_uuid_cusom})
- # Se colocaria la factura como timbrada
- elif id_edi_format and invoice_id.edi_document_ids:
- edi_format_id = invoice_id.edi_document_ids.filtered(
- lambda edi: edi.edi_format_id.id == id_edi_format.id)
- if edi_format_id:
- edi_format_id["attachment_id"] = attachment_id.id
- edi_format_id["error"] = False
- edi_format_id["state"] = "sent"
- else:
- data = {
- 'move_id': invoice_id.id,
- 'attachment_id': attachment_id.id,
- 'state': 'sent',
- 'error': False,
- 'edi_format_id': id_edi_format.id
- }
- self.env["account.edi.document"].sudo().create([data])
- invoice_id.write({'l10n_mx_edi_cfdi_uuid': invoice_id.l10n_mx_edi_cfdi_uuid_cusom})
- invoice_id.write({
- "state": "posted"
- })
- return {
- "name": _("Facturas"),
- "view_mode": "list,form",
- "res_model": "account.move",
- "type": "ir.actions.act_window",
- "target": "current",
- "domain": [('id', 'in', invoice_ids.ids)]
- }
- def action_unlink_move(self):
- for rec in self:
- if rec.move_id:
- rec.attachment_id.write({
- 'res_model': 'account.cfdi',
- 'res_id': rec.id,
- })
- rec.write({
- "state": "draft",
- "move_id": False
- })
- rec.move_id.write({
- 'cfdi_id': False,
- 'x_uuid': False
- })
|