| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import base64
- import calendar
- import datetime
- from copy import deepcopy
- from html.parser import HTMLParser
- from uuid import UUID
- from OpenSSL import crypto
- from requests import exceptions, adapters
- import httpx
- import urllib3
- import ssl
- urllib3.disable_warnings()
- import logging
- _logger = logging.getLogger(__name__)
- TIMEOUT = 120
- TRY_COUNT = 3
- VERIFY_CERT = True
- CONTEXT = ssl.create_default_context()
- CONTEXT.set_ciphers('HIGH:!DH:!aNULL')
- #LE DA FORMATO A LOS VALORES DEL HTML
- class FormValues(HTMLParser):
- _description = 'Elementos del HTML'
- def __init__(self):
- super().__init__()
- self.values = {}
- def handle_starttag(self, tag, attrs):
- if tag in ('input', 'select'):
- a = dict(attrs)
- if a.get('type', '') and a['type'] == 'hidden':
- if 'name' in a and 'value' in a:
- self.values[a['name']] = a['value']
- #LE DA FORMATO A LOS VALORES DEL HTML DEL INICIO DE SESION
- class FormLoginValues(HTMLParser):
- _description = 'Elementos del HTML del inicio de sesión'
- def __init__(self):
- super().__init__()
- self.values = {}
- def handle_starttag(self, tag, attrs):
- if tag == 'input':
- attrib = dict(attrs)
- try:
- self.values[attrib['id']] = attrib['value']
- except:
- pass
- class Filters(object):
- _description = 'Filters'
- def __init__(self, args):
- self.date_from = args['date_from']
- self.day = args.get('day', False)
- self.emitidas = args['emitidas']
- self.date_to = None
- if self.date_from:
- self.date_to = args.get('date_to', self._now()).replace(hour=23, minute=59, second=59, microsecond=0)
- self.uuid = str(args.get('uuid', ''))
- self.stop = False
- self.hour = False
- self.minute = False
- self._init_values(args)
- def __str__(self):
- if self.uuid:
- msg = 'Descargar por UUID'
- elif self.hour:
- msg = 'Descargar por HORA'
- elif self.day:
- msg = 'Descargar por DIA'
- else:
- msg = 'Descargar por MES'
- tipo = 'Recibidas'
- if self.emitidas:
- tipo = 'Emitidas'
- if self.uuid:
- return '{} - {} - {}'.format(msg, self.uuid, tipo)
- else:
- return '{} - {} - {} - {}'.format(msg, self.date_from, self.date_to, tipo)
- def _now(self):
- if self.day:
- n = self.date_from
- else:
- last_day = calendar.monthrange(
- self.date_from.year, self.date_from.month)[1]
- n = datetime.datetime(self.date_from.year, self.date_from.month, last_day)
- return n
- def _init_values(self, args):
- status = '-1'
- type_cfdi = args.get('type_cfdi', '-1')
- center_filter = 'RdoFechas'
- if self.uuid:
- center_filter = 'RdoFolioFiscal'
- rfc_receptor = args.get('rfc_emisor', False)
- if self.emitidas:
- rfc_receptor = args.get('rfc_receptor', False)
- script_manager = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda'
- self._post = {
- '__ASYNCPOST': 'true',
- '__EVENTTARGET': '',
- '__EVENTARGUMENT': '',
- '__LASTFOCUS': '',
- '__VIEWSTATEENCRYPTED': '',
- 'ctl00$ScriptManager1': script_manager,
- 'ctl00$MainContent$hfInicialBool': 'false',
- 'ctl00$MainContent$BtnBusqueda': 'Buscar CFDI',
- 'ctl00$MainContent$TxtUUID': self.uuid,
- 'ctl00$MainContent$FiltroCentral': center_filter,
- 'ctl00$MainContent$DdlEstadoComprobante': status,
- 'ctl00$MainContent$ddlComplementos': type_cfdi,
- }
- return
- def get_post(self):
- start_hour = '0'
- start_minute = '0'
- start_second = '0'
- end_hour = '0'
- end_minute = '0'
- end_second = '0'
- if self.date_from:
- start_hour = str(self.date_from.hour)
- start_minute = str(self.date_from.minute)
- start_second = str(self.date_from.second)
- end_hour = str(self.date_to.hour)
- end_minute = str(self.date_to.minute)
- end_second = str(self.date_to.second)
- if self.emitidas:
- year1 = '0'
- year2 = '0'
- start = ''
- end = ''
- if self.date_from:
- year1 = str(self.date_from.year)
- year2 = str(self.date_to.year)
- start = self.date_from.strftime('%d/%m/%Y')
- end = self.date_to.strftime('%d/%m/%Y')
- data = {
- 'ctl00$MainContent$hfInicial': year1,
- 'ctl00$MainContent$CldFechaInicial2$Calendario_text': start,
- 'ctl00$MainContent$CldFechaInicial2$DdlHora': start_hour,
- 'ctl00$MainContent$CldFechaInicial2$DdlMinuto': start_minute,
- 'ctl00$MainContent$CldFechaInicial2$DdlSegundo': start_second,
- 'ctl00$MainContent$hfFinal': year2,
- 'ctl00$MainContent$CldFechaFinal2$Calendario_text': end,
- 'ctl00$MainContent$CldFechaFinal2$DdlHora': end_hour,
- 'ctl00$MainContent$CldFechaFinal2$DdlMinuto': end_minute,
- 'ctl00$MainContent$CldFechaFinal2$DdlSegundo': end_second,
- }
- else:
- year = '0'
- month = '0'
- if self.date_from:
- year = str(self.date_from.year)
- month = str(self.date_from.month)
- day = '00'
- if self.day:
- day = '{:02d}'.format(self.date_from.day)
- data = {
- 'ctl00$MainContent$CldFecha$DdlAnio': year,
- 'ctl00$MainContent$CldFecha$DdlMes': month,
- 'ctl00$MainContent$CldFecha$DdlDia': day,
- 'ctl00$MainContent$CldFecha$DdlHora': start_hour,
- 'ctl00$MainContent$CldFecha$DdlMinuto': start_minute,
- 'ctl00$MainContent$CldFecha$DdlSegundo': start_second,
- 'ctl00$MainContent$CldFecha$DdlHoraFin': end_hour,
- 'ctl00$MainContent$CldFecha$DdlMinutoFin': end_minute,
- 'ctl00$MainContent$CldFecha$DdlSegundoFin': end_second,
- }
- self._post.update(data)
- return self._post
- class Invoice(HTMLParser):
- _description = 'Invoice'
- START_PAGE = 'ContenedorDinamico'
- URL = 'https://portalcfdi.facturaelectronica.sat.gob.mx/'
- END_PAGE = 'ctl00_MainContent_pageNavPosition'
- LIMIT_RECORDS = 'ctl00_MainContent_PnlLimiteRegistros'
- NOT_RECORDS = 'ctl00_MainContent_PnlNoResultados'
- TEMPLATE_DATE = '%Y-%m-%dT%H:%M:%S'
- def __init__(self):
- super().__init__()
- self._is_div_page = False
- self._col = 0
- self._current_tag = ''
- self._last_link = ''
- self._last_link_pdf = ''
- self._last_uuid = ''
- self._last_status = ''
- self._last_date_cfdi = ''
- self._last_date_timbre = ''
- self._last_pac = ''
- self._last_total = ''
- self._last_type = ''
- self._last_date_cancel = ''
- self._last_emisor_rfc = ''
- self._last_emisor = ''
- self._last_receptor_rfc = ''
- self._last_receptor = ''
- self.invoices = []
- self.not_found = False
- self.limit = False
- def handle_starttag(self, tag, attrs):
- self._current_tag = tag
- if tag == 'div':
- attrib = dict(attrs)
- if 'id' in attrib and attrib['id'] == self.NOT_RECORDS \
- and 'inline' in attrib['style']:
- self.not_found = True
- elif 'id' in attrib and attrib['id'] == self.LIMIT_RECORDS:
- self.limit = True
- elif 'id' in attrib and attrib['id'] == self.START_PAGE:
- self._is_div_page = True
- elif 'id' in attrib and attrib['id'] == self.END_PAGE:
- self._is_div_page = False
- elif self._is_div_page and tag == 'td':
- self._col += 1
- elif tag == 'span':
- attrib = dict(attrs)
- if attrib.get('id', '') == 'BtnDescarga':
- self._last_link = attrib['onclick'].split("'")[1]
- if attrib.get('id', '') == 'BtnRI':
- self._last_link_pdf = attrib['onclick'].split("'")[1]
- def handle_endtag(self, tag):
- if self._is_div_page and tag == 'tr':
- if self._last_uuid:
- url_xml = ''
- if self._last_link:
- url_xml = '{}{}'.format(self.URL, self._last_link)
- self._last_link = ''
- url_pdf = ''
- if self._last_link_pdf:
- url_pdf = '{}{}{}'.format(self.URL, "RepresentacionImpresa.aspx?Datos=", self._last_link_pdf)
- date_cancel = None
- if self._last_date_cancel:
- date_cancel = datetime.datetime.strptime(
- self._last_date_cancel, self.TEMPLATE_DATE)
- invoice = (self._last_uuid,
- {
- 'url': url_xml,
- 'acuse': url_pdf,
- 'estatus': self._last_status,
- 'date_cfdi': datetime.datetime.strptime(
- self._last_date_cfdi, self.TEMPLATE_DATE),
- 'date_timbre': datetime.datetime.strptime(
- self._last_date_timbre, self.TEMPLATE_DATE),
- 'date_cancel': date_cancel,
- 'rfc_pac': self._last_pac,
- 'total': float(self._last_total),
- 'tipo': self._last_type,
- 'emisor': self._last_emisor,
- 'rfc_emisor': self._last_emisor_rfc,
- 'receptor': self._last_receptor,
- 'rfc_receptor': self._last_receptor_rfc,
- }
- )
- self.invoices.append(invoice)
- self._last_uuid = ''
- self._last_status = ''
- self._last_date_cancel = ''
- self._last_emisor_rfc = ''
- self._last_emisor = ''
- self._last_receptor_rfc = ''
- self._last_receptor = ''
- self._last_date_cfdi = ''
- self._last_date_timbre = ''
- self._last_pac = ''
- self._last_total = ''
- self._last_type = ''
- self._col = 0
- def handle_data(self, data):
- cv = data.strip()
- if self._is_div_page and self._current_tag == 'span' and cv:
- if self._col == 1:
- try:
- UUID(cv)
- self._last_uuid = cv
- except ValueError:
- pass
- elif self._col == 2:
- self._last_emisor_rfc = cv
- elif self._col == 3:
- self._last_emisor = cv
- elif self._col == 4:
- self._last_receptor_rfc = cv
- elif self._col == 5:
- self._last_receptor = cv
- elif self._col == 6:
- self._last_date_cfdi = cv
- elif self._col == 7:
- self._last_date_timbre = cv
- elif self._col == 8:
- self._last_pac = cv
- elif self._col == 9:
- self._last_total = cv.replace('$', '').replace(',', '')
- elif self._col == 10:
- self._last_type = cv.lower()
- elif self._col == 12:
- self._last_status = cv
- elif self._col == 14:
- self._last_date_cancel = cv
- # CONEXION Y OBTENCION DE ELEMENTOS DEL SAT
- class PortalSAT(object):
- _description = 'Conexion al portal del SAT inicio de sesion y descarga'
- # CONSTANTES PARA LA CONEXION
- URL_MAIN = 'https://portal.facturaelectronica.sat.gob.mx/'
- HOST = 'cfdiau.sat.gob.mx'
- BROWSER = 'Mozilla/5.0 (X11; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0'
- REFERER = 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0'
- PORTAL = 'portalcfdi.facturaelectronica.sat.gob.mx'
- URL_LOGIN = 'https://{}/nidp/app/login'.format(HOST)
- URL_FORM = 'https://{}/nidp/app/login?sid=0&sid=0'.format(HOST)
- URL_PORTAL = 'https://portalcfdi.facturaelectronica.sat.gob.mx/'
- URL_CONTROL = 'https://cfdicontribuyentes.accesscontrol.windows.net/v2/wsfederation'
- URL_CONSULTA = URL_PORTAL + 'Consulta.aspx'
- URL_RECEPTOR = URL_PORTAL + 'ConsultaReceptor.aspx'
- URL_EMISOR = URL_PORTAL + 'ConsultaEmisor.aspx'
- URL_LOGOUT = URL_PORTAL + 'logout.aspx?salir=y'
- DIR_EMITIDAS = 'emitidas'
- DIR_RECIBIDAS = 'recibidas'
- COMPANY_ID = ""
- def __init__(self, rfc, target, sin):
- self._rfc = rfc
- self.error = ''
- self.is_connect = False
- self.not_network = False
- self.only_search = False
- self.only_test = False
- self.sin_sub = sin
- self._only_status = False
- self._init_values(target)
- def _init_values(self, target):
- self._folder = target
- self._emitidas = False
- self._current_year = datetime.datetime.now().year
- self._session = httpx.Client(http2=True, timeout=TIMEOUT, verify=CONTEXT)
- a = adapters.HTTPAdapter(pool_connections=512, pool_maxsize=512, max_retries=5)
- return
- def _get_post_form_dates(self):
- post = {}
- post['__ASYNCPOST'] = 'true'
- post['__EVENTARGUMENT'] = ''
- post['__EVENTTARGET'] = 'ctl00$MainContent$RdoFechas'
- post['__LASTFOCUS'] = ''
- post['ctl00$MainContent$CldFecha$DdlAnio'] = str(self._current_year)
- post['ctl00$MainContent$CldFecha$DdlDia'] = '0'
- post['ctl00$MainContent$CldFecha$DdlHora'] = '0'
- post['ctl00$MainContent$CldFecha$DdlHoraFin'] = '23'
- post['ctl00$MainContent$CldFecha$DdlMes'] = '1'
- post['ctl00$MainContent$CldFecha$DdlMinuto'] = '0'
- post['ctl00$MainContent$CldFecha$DdlMinutoFin'] = '59'
- post['ctl00$MainContent$CldFecha$DdlSegundo'] = '0'
- post['ctl00$MainContent$CldFecha$DdlSegundoFin'] = '59'
- post['ctl00$MainContent$DdlEstadoComprobante'] = '-1'
- post['ctl00$MainContent$FiltroCentral'] = 'RdoFechas'
- post['ctl00$MainContent$TxtRfcReceptor'] = ''
- post['ctl00$MainContent$TxtUUID'] = ''
- post['ctl00$MainContent$ddlComplementos'] = '-1'
- post['ctl00$MainContent$hfInicialBool'] = 'true'
- post['ctl00$ScriptManager1'] = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$RdoFechas'
- return post
- #OBTENER RESPUESTAS DE LAS PETICIONES QUE SE REALIZAN AL SAT
- def _response(self, url, method='get', headers={}, data={}):
- try:
- if method == 'get':
- result = self._session.get(url, timeout=TIMEOUT)
- else:
- result = self._session.post(url, data=data, timeout=TIMEOUT)
- msg = '{} {} {}'.format(result.status_code, method.upper(), url)
- if result.status_code == 200:
- return result.text
- else:
- _logger.error(msg)
- return ''
- except exceptions.Timeout:
- msg = 'Tiempo de espera agotado'
- self.not_network = True
- _logger.error(msg)
- return ''
- except exceptions.ConnectionError:
- msg = 'Revisa la conexión a Internet'
- self.not_network = True
- _logger.error(msg)
- return ''
- #LECTURA Y OBTENCION DE CIERTOS ELEMENTOS QUE SE PRESENTAN EN EL HTML
- def _read_form(self, html, form=''):
- if form == 'login':
- parser = FormLoginValues()
- else:
- parser = FormValues()
- parser.feed(html)
- return parser.values
- #OBTENCION DEL CABECERO QUE SE ENVIARA EN ALGUNAS PETICIONES
- def _get_headers(self, host, referer, ajax=False):
- acept = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
- headers = {
- 'Accept': acept,
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Accept-Language': 'es-ES,es;q=0.5',
- 'Connection': 'keep-alive',
- 'DNT': '1',
- 'Host': host,
- 'Referer': referer,
- 'Upgrade-Insecure-Requests': '1',
- 'User-Agent': self.BROWSER,
- 'Content-Type': 'application/x-www-form-urlencoded',
- }
- if ajax:
- headers.update({
- 'Cache-Control': 'no-cache',
- 'X-MicrosoftAjax': 'Delta=true',
- 'x-requested-with': 'XMLHttpRequest',
- 'Pragma': 'no-cache',
- })
- return headers
- #OBTENER LOS VALORES DE LOS CAMPOS Y BOTONES DE LA PANTALLA DE CONSULTA
- def _get_post_type_search(self, html):
- tipo_busqueda = 'RdoTipoBusquedaReceptor'
- if self._emitidas:
- tipo_busqueda = 'RdoTipoBusquedaEmisor'
- sm = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda'
- post = self._read_form(html)
- post['ctl00$MainContent$TipoBusqueda'] = tipo_busqueda
- post['__ASYNCPOST'] = 'true'
- post['__EVENTTARGET'] = ''
- post['__EVENTARGUMENT'] = ''
- post['ctl00$ScriptManager1'] = sm
- return post
- #OBTENER INFORMACION DEL CERTIFICADO
- def _get_data_cert(self, fiel_cert_data):
- cert = crypto.load_certificate(crypto.FILETYPE_ASN1, fiel_cert_data)
- rfc = cert.get_subject().x500UniqueIdentifier.split(' ')[0]
- serie = '{0:x}'.format(cert.get_serial_number())[1::2]
- fert = cert.get_notAfter().decode()[2:]
- return rfc, serie, fert
- #OBTENER UNA FIRMA PARA PODER OBTENER UN TOKEN MAS ADELANTE
- def _sign(self, fiel_pem_data, data):
- key = crypto.load_privatekey(crypto.FILETYPE_PEM, fiel_pem_data)
- sign = base64.b64encode(crypto.sign(key, data, 'sha256'))
- return base64.b64encode(sign).decode('utf-8')
- #OBTENCION DEL TOKEN QUE NOS PERMITIRA MANTENER LA SESION INICIADA
- def _get_token(self, firma, co):
- co = base64.b64encode(co.encode('utf-8')).decode('utf-8')
- data = '{}#{}'.format(co, firma).encode('utf-8')
- token = base64.b64encode(data).decode('utf-8')
- return token
- #OBTENCION DE LA INFORMACION QUE SE ENVIARA AL SAT PARA EL INICIO DE SESION
- def _make_data_form(self, fiel_cert_data, fiel_pem_data, values):
- rfc, serie, fert = self._get_data_cert(fiel_cert_data)
- co = '{}|{}|{}'.format(values['tokenuuid'], rfc, serie)
- firma = self._sign(fiel_pem_data, co)
- token = self._get_token(firma, co)
- keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet')
- data = {k: values[k] for k in keys}
- data['fert'] = fert
- data['token'] = token
- data['arc'] = ''
- data['placer'] = ''
- data['secuence'] = ''
- data['seeder'] = ''
- data['tan'] = ''
- return data
- # CONEXION CON EL PORTAL DEL SAT
- def login_fiel(self, fiel_cert_data, fiel_pem_data, certificate, company_id):
- # CREAMOS SESION PERSISTENTE
- client = self._session
- # MANDAMOS LA SOLICITUD DE OBTENCION DEL SITIO WEB https://portal.facturaelectronica.sat.gob.mx/ PARA OBTENER REDIRECCIONAMIENTO
- response = client.get(url=self.URL_MAIN)
- # PETICION AL LOGIN CON FIEL
- headers = {
- "referer": self.get_url(response.url),
- }
- response = client.post(url="https://cfdiau.sat.gob.mx/nidp/wsfed/ep?id=SATUPCFDiCon&sid=0&option=credential&sid=0", headers=headers)
- # PETICION PARA OBTENER EL FORMULARIO
- headers["referer"] = self.get_url(response.url)
- response = client.get(url="https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0", headers=headers)
- values = self._read_form(response.text, 'login')
- data = self._make_data_form(fiel_cert_data, fiel_pem_data, values)
- headers["referer"] = self.get_url(response.url)
- headers.update(self._get_headers(self.HOST, self.get_url(response.url)))
- headers = {
- "cache-control": "max-age=0",
- "origin": "https://cfdiau.sat.gob.mx",
- "content-type": "application/x-www-form-urlencoded",
- "upgrade-insecure-requests": "1",
- "user-agent": "Mozilla/5.0 (X11; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0",
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
- "sec-gpc": "1",
- "accept-language": "es-ES,es;q=0.5",
- "sec-fetch-site": "same-origin",
- "sec-fetch-dest": "document",
- "referer": "https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0",
- "accept-encoding": "gzip, deflate, br, zstd",
- "priority": "u=0, i",
- }
- #NOS IDENTIFICAMOS EN EL SAT PARA PODER SEGUIR CON PETICIONES Y CONSULTAS
- response = client.post(url="https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0", headers=headers, data=data)
- headers["referer"] = "https://portal.facturaelectronica.sat.gob.mx/"
- headers["Host"] = self.HOST
- #SE OBTIENE LA PAGINA DE CONSULTA PARA OBTENER DATOS NECESARIOS PARA SU POSTERIOR USO EN LAS BUSQUEDAS
- response = client.get(url=self.URL_CONSULTA)
- data = self._read_form(response.text)
- #SE MANDA LA INFORMACION PARA PODER SER REDIRIGIDOS CORRECTAMENTE A LA PAGINA DE CONSULTA
- response = client.post(url=self.URL_CONSULTA, data=data)
- self._session.headers.update(headers=headers)
- self.is_connect = True
- return True
- def get_url(self, url_object):
- url = f"{url_object.scheme}/{url_object.host}{url_object.full_path}"
- return url
- def _merge(self, list1, list2):
- result = list1.copy()
- result.update(list2)
- return result
- def _last_day(self, date):
- last_day = calendar.monthrange(date.year, date.month)[1]
- return datetime.datetime(date.year, date.month, last_day)
- def _get_dates(self, d1, d2):
- end = d2
- dates = []
- while True:
- d2 = self._last_day(d1)
- if d2 >= end:
- dates.append((d1, end))
- break
- dates.append((d1, d2))
- d1 = d2 + datetime.timedelta(days=1)
- return dates
- def _get_dates_recibidas(self, d1, d2):
- days = (d2 - d1).days + 1
- return [d1 + datetime.timedelta(days=d) for d in range(days)]
- def _time_delta(self, days):
- now = datetime.datetime.now()
- date_from = now.replace(
- hour=0, minute=0, second=0, microsecond=0) - datetime.timedelta(days=days)
- date_to = now.replace(hour=23, minute=59, second=59, microsecond=0)
- return date_from, date_to
- def _time_delta_recibidas(self, days):
- now = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
- return [now - datetime.timedelta(days=d) for d in range(days)]
- #FILTROS PARA LA CONSULTA
- def _get_filters(self, args, emitidas=True):
- filters = []
- data = {}
- data['day'] = bool(args['dia'])
- data['uuid'] = ''
- if args['uuid']:
- data['uuid'] = str(args['uuid'])
- data['emitidas'] = emitidas
- data['rfc_emisor'] = args.get('rfc_emisor', '')
- data['rfc_receptor'] = args.get('rfc_receptor', '')
- data['type_cfdi'] = args.get('tipo_complemento', '-1')
- if args['fecha_inicial'] and args['fecha_final'] and emitidas:
- dates = self._get_dates(args['fecha_inicial'], args['fecha_final'])
- for start, end in dates:
- data['date_from'] = start
- data['date_to'] = end
- filters.append(Filters(data))
- elif args['fecha_inicial'] and args['fecha_final']:
- dates = self._get_dates_recibidas(args['fecha_inicial'], args['fecha_final'])
- is_first_date = False
- for d in dates:
- if not is_first_date:
- data['date_from'] = d
- is_first_date = True
- else:
- d = d.replace(hour=0, minute=0, second=0, microsecond=0)
- data['date_from'] = d
- data['day'] = True
- filters.append(Filters(data))
- elif args['intervalo_dias'] and emitidas:
- data['date_from'], data['date_to'] = self._time_delta(args['intervalo_dias'])
- filters.append(Filters(data))
- elif args['intervalo_dias']:
- dates = self._time_delta_recibidas(args['intervalo_dias'])
- for d in dates:
- data['date_from'] = d
- data['day'] = True
- filters.append(Filters(data))
- elif args['uuid']:
- data['date_from'] = None
- filters.append(Filters(data))
- else:
- day = args['dia'] or 1
- data['date_from'] = datetime.datetime(args['ano'], args['mes'], day)
- filters.append(Filters(data))
- return tuple(filters)
- def _segment_filter(self, filters):
- new_filters = []
- if filters.stop:
- return new_filters
- date = filters.date_from
- date_to = filters.date_to
- if filters.minute:
- for m in range(10):
- nf = deepcopy(filters)
- nf.stop = True
- nf.date_from = date + datetime.timedelta(minutes=m)
- nf.date_to = date + datetime.timedelta(minutes=m + 1)
- new_filters.append(nf)
- elif filters.hour:
- minutes = tuple(range(0, 60, 10)) + (0,)
- minutes = tuple(zip(minutes, minutes[1:]))
- for m in minutes:
- nf = deepcopy(filters)
- nf.minute = True
- nf.date_from = date + datetime.timedelta(minutes=m[0])
- nf.date_to = date + datetime.timedelta(minutes=m[1])
- if m[0] == 50 and nf.date_to.hour == 23:
- nf.date_to = nf.date_to.replace(
- hour=nf.date_to.hour, minute=59, second=59)
- elif m[0] == 50 and nf.date_to.hour != 23:
- nf.date_to = nf.date_to.replace(
- hour=nf.date_to.hour + 1, minute=0, second=0)
- new_filters.append(nf)
- elif filters.day:
- hours = tuple(range(0, 25))
- hours = tuple(zip(hours, hours[1:]))
- for h in hours:
- nf = deepcopy(filters)
- nf.hour = True
- nf.date_from = date + datetime.timedelta(hours=h[0])
- nf.date_to = date + datetime.timedelta(hours=h[1])
- if h[1] == 24:
- nf.date_to = nf.date_from.replace(
- minute=59, second=59, microsecond=0)
- new_filters.append(nf)
- else:
- last_day = calendar.monthrange(date.year, date.month)[1]
- for d in range(last_day):
- nf = deepcopy(filters)
- nf.day = True
- nf.date_from = date + datetime.timedelta(days=d)
- nf.date_to = nf.date_from.replace(
- hour=23, minute=59, second=59, microsecond=0)
- new_filters.append(nf)
- if date_to == nf.date_to:
- break
- return new_filters
- #OBTENER INFORMACION QUE SE ENVIARA EN LA CONSULTA PARA OBTENER LOS CFDIS
- def _get_post(self, html):
- validos = ('EVENTTARGET', '__EVENTARGUMENT', '__LASTFOCUS', '__VIEWSTATE')
- values = html.split('|')
- post = {v: values[i + 1] for i, v in enumerate(values) if v in validos}
- return post
- #ASIGNAR UN HEADER PARA LA CONSULTA DE CFDIS
- def _set_search_headers(self):
- self._session.headers = {
- "cache-control": "no-cache",
- "x-requested-with": "XMLHttpRequest",
- "user-agent": "Mozilla/5.0 (X11; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0",
- "x-microsoftajax": "Delta=true",
- "content-type": "application/x-www-form-urlencoded; charset=UTF-8",
- "accept": "*/*",
- "sec-gpc": "1",
- "accept-language": "es-ES,es;q=0.5",
- "origin": "https://portalcfdi.facturaelectronica.sat.gob.mx",
- "sec-fetch-site": "same-origin",
- "sec-fetch-mode": "cors",
- "sec-fetch-dest": "empty",
- "referer": "https://portalcfdi.facturaelectronica.sat.gob.mx/ConsultaEmisor.aspx",
- "accept-encoding": "gzip, deflate, br, zstd",
- "priority": "u=1, i",
- }
- return True
- #OBTENCION DE LA PAGINA DE CONSULTA POR FECHAS
- def _change_to_date(self, url_search):
- client = self._session
- self._set_search_headers()
- response = client.get(url_search)
- data = self._read_form(response.text)
- post = self._merge(data, self._get_post_form_dates())
- headers = self._get_headers(self.PORTAL, url_search, True)
- response = client.post(url=url_search, headers=headers, data=post)
- post = self._get_post(response.text)
- return data, post
- #BUSQUEDA DE CFDIS RECIBIDOS CREADOS POR UN PROVEEDOR
- def _search_recibidas(self, filters):
- url_search = self.URL_RECEPTOR
- values, post_source = self._change_to_date(url_search)
- invoice_content = {}
- for f in filters:
- post = self._merge(values, f.get_post())
- post = self._merge(post, post_source)
- headers = self._get_headers(self.PORTAL, url_search, True)
- html = self._response(url_search, 'post', headers, post)
- not_found, limit, invoices = self._get_download_links(html)
- if not_found or not invoices:
- msg = '\n\tNo se encontraron documentos en el filtro:\n\t{}'.format(str(f))
- _logger.info(msg)
- else:
- data = self._download(invoices, limit, f)
- if data and type(data) == dict:
- invoice_content.update(data)
- return invoice_content
- #BUSQUEDA DE CFDIS EMITIDOS CREADOS POR LA EMPRESA
- def _search_emitidas(self, filters):
- url_search = self.URL_EMISOR
- values, post_source = self._change_to_date(url_search)
- invoice_content = {}
- for f in filters:
- _logger.info(str(f))
- post = self._merge(values, f.get_post())
- post = self._merge(post, post_source)
- headers = self._get_headers(self.PORTAL, url_search, True)
- html = self._response(url_search, 'post', headers, post)
- not_found, limit, invoices = self._get_download_links(html)
- if not_found or not invoices:
- msg = '\n\tNo se encontraron documentos en el filtro:\n\t{}'.format(str(f))
- _logger.info(msg)
- else:
- data = self._download(invoices, limit, f, self.DIR_EMITIDAS)
- if data and type(data) == dict:
- invoice_content.update(data)
- return invoice_content
- #PROCESO DE BUSQUEDA DE CFDIS PARA SU DESCARGA
- def search(self, opt, download_option='both'):
- self._only_status = opt['estatus']
- invoice_content_e, invoice_content_r = {}, {}
- if download_option == 'both':
- filters_e = self._get_filters(opt, True)
- invoice_content_e = self._search_emitidas(filters_e)
- filters_r = self._get_filters(opt, False)
- invoice_content_r = self._search_recibidas(filters_r)
- elif download_option == 'supplier':
- filters_r = self._get_filters(opt, False)
- invoice_content_r = self._search_recibidas(filters_r)
- elif download_option == 'customer':
- filters_e = self._get_filters(opt, True)
- invoice_content_e = self._search_emitidas(filters_e)
- return invoice_content_r, invoice_content_e
- #PROCESO DE DESCARGA
- def _download(self, invoices, limit=False, filters=None, folder=DIR_RECIBIDAS):
- if not invoices and not limit:
- msg = '\n\tTodos los documentos han sido previamente descargados para el filtro.\n\t{}'.format(str(filters))
- _logger.info(msg)
- return {}
- invoices_content = {}
- if invoices and not self.only_search:
- invoices_content = self._thread_download(invoices, folder, filters)
- if limit:
- sf = self._segment_filter(filters)
- if folder == self.DIR_RECIBIDAS:
- data = self._search_recibidas(sf)
- if data and type(data) == dict:
- invoices_content.update(data)
- else:
- data = self._search_emitidas(sf)
- if data and type(data) == dict:
- invoices_content.update(data)
- return invoices_content
- #OBTENCION DE LOS VALORE DE LA PETICION PARA OBTENER LAS URL DE LOS CFDI
- def _thread_download(self, invoices, folder, filters):
- for_download = invoices[:]
- current = 1
- total = len(for_download)
- invoice_content = {}
- for i in range(TRY_COUNT):
- for uuid, values in for_download:
- data = {
- 'url': values['url'],
- 'acuse': values['acuse'],
- }
- content = self._get_xml(uuid, data, current, total)
- pdf_content = self._get_pdf(uuid, data, current, total)
- if content:
- invoice_content.update({uuid: [values, content, pdf_content]})
- current += 1
- if len(invoice_content) == len(for_download):
- break
- if total:
- msg = '{} documentos por descargar en: {}'.format(total, str(filters))
- _logger.info(msg)
- return invoice_content
- #OBTENER EL DOCUMENTO XML POR MEDIO DE LA URL DE DESCARGAR
- def _get_xml(self, uuid, values, current, count):
- for i in range(TRY_COUNT):
- try:
- r = self._session.get(values['url'], timeout=TIMEOUT)
- if r.status_code == 200:
- return r.content
- except exceptions.Timeout:
- _logger.debug('Tiempo de espera sobrepasado')
- continue
- except Exception as e:
- _logger.error(str(e))
- return
- msg = 'Tiempo de espera agotado para el documento: {}'.format(uuid)
- _logger.error(msg)
- return
- # OBTENER EL DOCUMENTO XML POR MEDIO DE LA URL DE DESCARGAR
- def _get_pdf(self, uuid, values, current, count):
- for i in range(TRY_COUNT):
- try:
- r = self._session.get(values['acuse'], timeout=TIMEOUT)
- if r.status_code == 200:
- return r.content
- except exceptions.Timeout:
- _logger.debug('Tiempo de espera sobrepasado')
- continue
- except Exception as e:
- _logger.error(str(e))
- return
- msg = 'Tiempo de espera agotado para el documento: {}'.format(uuid)
- _logger.error(msg)
- return
- def _get_download_links(self, html):
- parser = Invoice()
- parser.feed(html)
- return parser.not_found, parser.limit, parser.invoices
- def logout(self):
- msg = 'Cerrando sessión en el SAT'
- _logger.debug(msg)
- response = self._response(self.URL_LOGOUT)
- self.is_connect = False
- self._session.close()
- msg = 'Sesión cerrada en el SAT'
- _logger.info(msg)
- return
|