# -*- coding: utf-8 -*- # !/usr/bin/env python import base64 import calendar import datetime from copy import deepcopy from html.parser import HTMLParser from uuid import UUID from OpenSSL import crypto from requests import exceptions, adapters import httpx import urllib3 import ssl urllib3.disable_warnings() import logging _logger = logging.getLogger(__name__) TIMEOUT = 120 TRY_COUNT = 3 VERIFY_CERT = True CONTEXT = ssl.create_default_context() CONTEXT.set_ciphers('HIGH:!DH:!aNULL') #LE DA FORMATO A LOS VALORES DEL HTML class FormValues(HTMLParser): _description = 'Elementos del HTML' def __init__(self): super().__init__() self.values = {} def handle_starttag(self, tag, attrs): if tag in ('input', 'select'): a = dict(attrs) if a.get('type', '') and a['type'] == 'hidden': if 'name' in a and 'value' in a: self.values[a['name']] = a['value'] #LE DA FORMATO A LOS VALORES DEL HTML DEL INICIO DE SESION class FormLoginValues(HTMLParser): _description = 'Elementos del HTML del inicio de sesión' def __init__(self): super().__init__() self.values = {} def handle_starttag(self, tag, attrs): if tag == 'input': attrib = dict(attrs) try: self.values[attrib['id']] = attrib['value'] except: pass class Filters(object): _description = 'Filters' def __init__(self, args): self.date_from = args['date_from'] self.day = args.get('day', False) self.emitidas = args['emitidas'] self.date_to = None if self.date_from: self.date_to = args.get('date_to', self._now()).replace(hour=23, minute=59, second=59, microsecond=0) self.uuid = str(args.get('uuid', '')) self.stop = False self.hour = False self.minute = False self._init_values(args) def __str__(self): if self.uuid: msg = 'Descargar por UUID' elif self.hour: msg = 'Descargar por HORA' elif self.day: msg = 'Descargar por DIA' else: msg = 'Descargar por MES' tipo = 'Recibidas' if self.emitidas: tipo = 'Emitidas' if self.uuid: return '{} - {} - {}'.format(msg, self.uuid, tipo) else: return '{} - {} - {} - {}'.format(msg, self.date_from, self.date_to, tipo) def _now(self): if self.day: n = self.date_from else: last_day = calendar.monthrange( self.date_from.year, self.date_from.month)[1] n = datetime.datetime(self.date_from.year, self.date_from.month, last_day) return n def _init_values(self, args): status = '-1' type_cfdi = args.get('type_cfdi', '-1') center_filter = 'RdoFechas' if self.uuid: center_filter = 'RdoFolioFiscal' rfc_receptor = args.get('rfc_emisor', False) if self.emitidas: rfc_receptor = args.get('rfc_receptor', False) script_manager = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda' self._post = { '__ASYNCPOST': 'true', '__EVENTTARGET': '', '__EVENTARGUMENT': '', '__LASTFOCUS': '', '__VIEWSTATEENCRYPTED': '', 'ctl00$ScriptManager1': script_manager, 'ctl00$MainContent$hfInicialBool': 'false', 'ctl00$MainContent$BtnBusqueda': 'Buscar CFDI', 'ctl00$MainContent$TxtUUID': self.uuid, 'ctl00$MainContent$FiltroCentral': center_filter, 'ctl00$MainContent$DdlEstadoComprobante': status, 'ctl00$MainContent$ddlComplementos': type_cfdi, } return def get_post(self): start_hour = '0' start_minute = '0' start_second = '0' end_hour = '0' end_minute = '0' end_second = '0' if self.date_from: start_hour = str(self.date_from.hour) start_minute = str(self.date_from.minute) start_second = str(self.date_from.second) end_hour = str(self.date_to.hour) end_minute = str(self.date_to.minute) end_second = str(self.date_to.second) if self.emitidas: year1 = '0' year2 = '0' start = '' end = '' if self.date_from: year1 = str(self.date_from.year) year2 = str(self.date_to.year) start = self.date_from.strftime('%d/%m/%Y') end = self.date_to.strftime('%d/%m/%Y') data = { 'ctl00$MainContent$hfInicial': year1, 'ctl00$MainContent$CldFechaInicial2$Calendario_text': start, 'ctl00$MainContent$CldFechaInicial2$DdlHora': start_hour, 'ctl00$MainContent$CldFechaInicial2$DdlMinuto': start_minute, 'ctl00$MainContent$CldFechaInicial2$DdlSegundo': start_second, 'ctl00$MainContent$hfFinal': year2, 'ctl00$MainContent$CldFechaFinal2$Calendario_text': end, 'ctl00$MainContent$CldFechaFinal2$DdlHora': end_hour, 'ctl00$MainContent$CldFechaFinal2$DdlMinuto': end_minute, 'ctl00$MainContent$CldFechaFinal2$DdlSegundo': end_second, } else: year = '0' month = '0' if self.date_from: year = str(self.date_from.year) month = str(self.date_from.month) day = '00' if self.day: day = '{:02d}'.format(self.date_from.day) data = { 'ctl00$MainContent$CldFecha$DdlAnio': year, 'ctl00$MainContent$CldFecha$DdlMes': month, 'ctl00$MainContent$CldFecha$DdlDia': day, 'ctl00$MainContent$CldFecha$DdlHora': start_hour, 'ctl00$MainContent$CldFecha$DdlMinuto': start_minute, 'ctl00$MainContent$CldFecha$DdlSegundo': start_second, 'ctl00$MainContent$CldFecha$DdlHoraFin': end_hour, 'ctl00$MainContent$CldFecha$DdlMinutoFin': end_minute, 'ctl00$MainContent$CldFecha$DdlSegundoFin': end_second, } self._post.update(data) return self._post class Invoice(HTMLParser): _description = 'Invoice' START_PAGE = 'ContenedorDinamico' URL = 'https://portalcfdi.facturaelectronica.sat.gob.mx/' END_PAGE = 'ctl00_MainContent_pageNavPosition' LIMIT_RECORDS = 'ctl00_MainContent_PnlLimiteRegistros' NOT_RECORDS = 'ctl00_MainContent_PnlNoResultados' TEMPLATE_DATE = '%Y-%m-%dT%H:%M:%S' def __init__(self): super().__init__() self._is_div_page = False self._col = 0 self._current_tag = '' self._last_link = '' self._last_link_pdf = '' self._last_uuid = '' self._last_status = '' self._last_date_cfdi = '' self._last_date_timbre = '' self._last_pac = '' self._last_total = '' self._last_type = '' self._last_date_cancel = '' self._last_emisor_rfc = '' self._last_emisor = '' self._last_receptor_rfc = '' self._last_receptor = '' self.invoices = [] self.not_found = False self.limit = False def handle_starttag(self, tag, attrs): self._current_tag = tag if tag == 'div': attrib = dict(attrs) if 'id' in attrib and attrib['id'] == self.NOT_RECORDS \ and 'inline' in attrib['style']: self.not_found = True elif 'id' in attrib and attrib['id'] == self.LIMIT_RECORDS: self.limit = True elif 'id' in attrib and attrib['id'] == self.START_PAGE: self._is_div_page = True elif 'id' in attrib and attrib['id'] == self.END_PAGE: self._is_div_page = False elif self._is_div_page and tag == 'td': self._col += 1 elif tag == 'span': attrib = dict(attrs) if attrib.get('id', '') == 'BtnDescarga': self._last_link = attrib['onclick'].split("'")[1] if attrib.get('id', '') == 'BtnRI': self._last_link_pdf = attrib['onclick'].split("'")[1] def handle_endtag(self, tag): if self._is_div_page and tag == 'tr': if self._last_uuid: url_xml = '' if self._last_link: url_xml = '{}{}'.format(self.URL, self._last_link) self._last_link = '' url_pdf = '' if self._last_link_pdf: url_pdf = '{}{}{}'.format(self.URL, "RepresentacionImpresa.aspx?Datos=", self._last_link_pdf) date_cancel = None if self._last_date_cancel: date_cancel = datetime.datetime.strptime( self._last_date_cancel, self.TEMPLATE_DATE) invoice = (self._last_uuid, { 'url': url_xml, 'acuse': url_pdf, 'estatus': self._last_status, 'date_cfdi': datetime.datetime.strptime( self._last_date_cfdi, self.TEMPLATE_DATE), 'date_timbre': datetime.datetime.strptime( self._last_date_timbre, self.TEMPLATE_DATE), 'date_cancel': date_cancel, 'rfc_pac': self._last_pac, 'total': float(self._last_total), 'tipo': self._last_type, 'emisor': self._last_emisor, 'rfc_emisor': self._last_emisor_rfc, 'receptor': self._last_receptor, 'rfc_receptor': self._last_receptor_rfc, } ) self.invoices.append(invoice) self._last_uuid = '' self._last_status = '' self._last_date_cancel = '' self._last_emisor_rfc = '' self._last_emisor = '' self._last_receptor_rfc = '' self._last_receptor = '' self._last_date_cfdi = '' self._last_date_timbre = '' self._last_pac = '' self._last_total = '' self._last_type = '' self._col = 0 def handle_data(self, data): cv = data.strip() if self._is_div_page and self._current_tag == 'span' and cv: if self._col == 1: try: UUID(cv) self._last_uuid = cv except ValueError: pass elif self._col == 2: self._last_emisor_rfc = cv elif self._col == 3: self._last_emisor = cv elif self._col == 4: self._last_receptor_rfc = cv elif self._col == 5: self._last_receptor = cv elif self._col == 6: self._last_date_cfdi = cv elif self._col == 7: self._last_date_timbre = cv elif self._col == 8: self._last_pac = cv elif self._col == 9: self._last_total = cv.replace('$', '').replace(',', '') elif self._col == 10: self._last_type = cv.lower() elif self._col == 12: self._last_status = cv elif self._col == 14: self._last_date_cancel = cv # CONEXION Y OBTENCION DE ELEMENTOS DEL SAT class PortalSAT(object): _description = 'Conexion al portal del SAT inicio de sesion y descarga' # CONSTANTES PARA LA CONEXION URL_MAIN = 'https://portal.facturaelectronica.sat.gob.mx/' HOST = 'cfdiau.sat.gob.mx' BROWSER = 'Mozilla/5.0 (X11; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0' REFERER = 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0' PORTAL = 'portalcfdi.facturaelectronica.sat.gob.mx' URL_LOGIN = 'https://{}/nidp/app/login'.format(HOST) URL_FORM = 'https://{}/nidp/app/login?sid=0&sid=0'.format(HOST) URL_PORTAL = 'https://portalcfdi.facturaelectronica.sat.gob.mx/' URL_CONTROL = 'https://cfdicontribuyentes.accesscontrol.windows.net/v2/wsfederation' URL_CONSULTA = URL_PORTAL + 'Consulta.aspx' URL_RECEPTOR = URL_PORTAL + 'ConsultaReceptor.aspx' URL_EMISOR = URL_PORTAL + 'ConsultaEmisor.aspx' URL_LOGOUT = URL_PORTAL + 'logout.aspx?salir=y' DIR_EMITIDAS = 'emitidas' DIR_RECIBIDAS = 'recibidas' COMPANY_ID = "" def __init__(self, rfc, target, sin): self._rfc = rfc self.error = '' self.is_connect = False self.not_network = False self.only_search = False self.only_test = False self.sin_sub = sin self._only_status = False self._init_values(target) def _init_values(self, target): self._folder = target self._emitidas = False self._current_year = datetime.datetime.now().year self._session = httpx.Client(http2=True, timeout=TIMEOUT, verify=CONTEXT) a = adapters.HTTPAdapter(pool_connections=512, pool_maxsize=512, max_retries=5) return def _get_post_form_dates(self): post = {} post['__ASYNCPOST'] = 'true' post['__EVENTARGUMENT'] = '' post['__EVENTTARGET'] = 'ctl00$MainContent$RdoFechas' post['__LASTFOCUS'] = '' post['ctl00$MainContent$CldFecha$DdlAnio'] = str(self._current_year) post['ctl00$MainContent$CldFecha$DdlDia'] = '0' post['ctl00$MainContent$CldFecha$DdlHora'] = '0' post['ctl00$MainContent$CldFecha$DdlHoraFin'] = '23' post['ctl00$MainContent$CldFecha$DdlMes'] = '1' post['ctl00$MainContent$CldFecha$DdlMinuto'] = '0' post['ctl00$MainContent$CldFecha$DdlMinutoFin'] = '59' post['ctl00$MainContent$CldFecha$DdlSegundo'] = '0' post['ctl00$MainContent$CldFecha$DdlSegundoFin'] = '59' post['ctl00$MainContent$DdlEstadoComprobante'] = '-1' post['ctl00$MainContent$FiltroCentral'] = 'RdoFechas' post['ctl00$MainContent$TxtRfcReceptor'] = '' post['ctl00$MainContent$TxtUUID'] = '' post['ctl00$MainContent$ddlComplementos'] = '-1' post['ctl00$MainContent$hfInicialBool'] = 'true' post['ctl00$ScriptManager1'] = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$RdoFechas' return post #OBTENER RESPUESTAS DE LAS PETICIONES QUE SE REALIZAN AL SAT def _response(self, url, method='get', headers={}, data={}): try: if method == 'get': result = self._session.get(url, timeout=TIMEOUT) else: result = self._session.post(url, data=data, timeout=TIMEOUT) msg = '{} {} {}'.format(result.status_code, method.upper(), url) if result.status_code == 200: return result.text else: _logger.error(msg) return '' except exceptions.Timeout: msg = 'Tiempo de espera agotado' self.not_network = True _logger.error(msg) return '' except exceptions.ConnectionError: msg = 'Revisa la conexión a Internet' self.not_network = True _logger.error(msg) return '' #LECTURA Y OBTENCION DE CIERTOS ELEMENTOS QUE SE PRESENTAN EN EL HTML def _read_form(self, html, form=''): if form == 'login': parser = FormLoginValues() else: parser = FormValues() parser.feed(html) return parser.values #OBTENCION DEL CABECERO QUE SE ENVIARA EN ALGUNAS PETICIONES def _get_headers(self, host, referer, ajax=False): acept = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' headers = { 'Accept': acept, 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'es-ES,es;q=0.5', 'Connection': 'keep-alive', 'DNT': '1', 'Host': host, 'Referer': referer, 'Upgrade-Insecure-Requests': '1', 'User-Agent': self.BROWSER, 'Content-Type': 'application/x-www-form-urlencoded', } if ajax: headers.update({ 'Cache-Control': 'no-cache', 'X-MicrosoftAjax': 'Delta=true', 'x-requested-with': 'XMLHttpRequest', 'Pragma': 'no-cache', }) return headers #OBTENER LOS VALORES DE LOS CAMPOS Y BOTONES DE LA PANTALLA DE CONSULTA def _get_post_type_search(self, html): tipo_busqueda = 'RdoTipoBusquedaReceptor' if self._emitidas: tipo_busqueda = 'RdoTipoBusquedaEmisor' sm = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda' post = self._read_form(html) post['ctl00$MainContent$TipoBusqueda'] = tipo_busqueda post['__ASYNCPOST'] = 'true' post['__EVENTTARGET'] = '' post['__EVENTARGUMENT'] = '' post['ctl00$ScriptManager1'] = sm return post #OBTENER INFORMACION DEL CERTIFICADO def _get_data_cert(self, fiel_cert_data): cert = crypto.load_certificate(crypto.FILETYPE_ASN1, fiel_cert_data) rfc = cert.get_subject().x500UniqueIdentifier.split(' ')[0] serie = '{0:x}'.format(cert.get_serial_number())[1::2] fert = cert.get_notAfter().decode()[2:] return rfc, serie, fert #OBTENER UNA FIRMA PARA PODER OBTENER UN TOKEN MAS ADELANTE def _sign(self, fiel_pem_data, data): key = crypto.load_privatekey(crypto.FILETYPE_PEM, fiel_pem_data) sign = base64.b64encode(crypto.sign(key, data, 'sha256')) return base64.b64encode(sign).decode('utf-8') #OBTENCION DEL TOKEN QUE NOS PERMITIRA MANTENER LA SESION INICIADA def _get_token(self, firma, co): co = base64.b64encode(co.encode('utf-8')).decode('utf-8') data = '{}#{}'.format(co, firma).encode('utf-8') token = base64.b64encode(data).decode('utf-8') return token #OBTENCION DE LA INFORMACION QUE SE ENVIARA AL SAT PARA EL INICIO DE SESION def _make_data_form(self, fiel_cert_data, fiel_pem_data, values): rfc, serie, fert = self._get_data_cert(fiel_cert_data) co = '{}|{}|{}'.format(values['tokenuuid'], rfc, serie) firma = self._sign(fiel_pem_data, co) token = self._get_token(firma, co) keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet') data = {k: values[k] for k in keys} data['fert'] = fert data['token'] = token data['arc'] = '' data['placer'] = '' data['secuence'] = '' data['seeder'] = '' data['tan'] = '' return data # CONEXION CON EL PORTAL DEL SAT def login_fiel(self, fiel_cert_data, fiel_pem_data, certificate, company_id): # CREAMOS SESION PERSISTENTE client = self._session # MANDAMOS LA SOLICITUD DE OBTENCION DEL SITIO WEB https://portal.facturaelectronica.sat.gob.mx/ PARA OBTENER REDIRECCIONAMIENTO response = client.get(url=self.URL_MAIN) # PETICION AL LOGIN CON FIEL headers = { "referer": self.get_url(response.url), } response = client.post(url="https://cfdiau.sat.gob.mx/nidp/wsfed/ep?id=SATUPCFDiCon&sid=0&option=credential&sid=0", headers=headers) # PETICION PARA OBTENER EL FORMULARIO headers["referer"] = self.get_url(response.url) response = client.get(url="https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0", headers=headers) values = self._read_form(response.text, 'login') data = self._make_data_form(fiel_cert_data, fiel_pem_data, values) headers["referer"] = self.get_url(response.url) headers.update(self._get_headers(self.HOST, self.get_url(response.url))) headers = { "cache-control": "max-age=0", "origin": "https://cfdiau.sat.gob.mx", "content-type": "application/x-www-form-urlencoded", "upgrade-insecure-requests": "1", "user-agent": "Mozilla/5.0 (X11; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0", "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", "sec-gpc": "1", "accept-language": "es-ES,es;q=0.5", "sec-fetch-site": "same-origin", "sec-fetch-dest": "document", "referer": "https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0", "accept-encoding": "gzip, deflate, br, zstd", "priority": "u=0, i", } #NOS IDENTIFICAMOS EN EL SAT PARA PODER SEGUIR CON PETICIONES Y CONSULTAS response = client.post(url="https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0", headers=headers, data=data) headers["referer"] = "https://portal.facturaelectronica.sat.gob.mx/" headers["Host"] = self.HOST #SE OBTIENE LA PAGINA DE CONSULTA PARA OBTENER DATOS NECESARIOS PARA SU POSTERIOR USO EN LAS BUSQUEDAS response = client.get(url=self.URL_CONSULTA) data = self._read_form(response.text) #SE MANDA LA INFORMACION PARA PODER SER REDIRIGIDOS CORRECTAMENTE A LA PAGINA DE CONSULTA response = client.post(url=self.URL_CONSULTA, data=data) self._session.headers.update(headers=headers) self.is_connect = True return True def get_url(self, url_object): url = f"{url_object.scheme}/{url_object.host}{url_object.full_path}" return url def _merge(self, list1, list2): result = list1.copy() result.update(list2) return result def _last_day(self, date): last_day = calendar.monthrange(date.year, date.month)[1] return datetime.datetime(date.year, date.month, last_day) def _get_dates(self, d1, d2): end = d2 dates = [] while True: d2 = self._last_day(d1) if d2 >= end: dates.append((d1, end)) break dates.append((d1, d2)) d1 = d2 + datetime.timedelta(days=1) return dates def _get_dates_recibidas(self, d1, d2): days = (d2 - d1).days + 1 return [d1 + datetime.timedelta(days=d) for d in range(days)] def _time_delta(self, days): now = datetime.datetime.now() date_from = now.replace( hour=0, minute=0, second=0, microsecond=0) - datetime.timedelta(days=days) date_to = now.replace(hour=23, minute=59, second=59, microsecond=0) return date_from, date_to def _time_delta_recibidas(self, days): now = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) return [now - datetime.timedelta(days=d) for d in range(days)] #FILTROS PARA LA CONSULTA def _get_filters(self, args, emitidas=True): filters = [] data = {} data['day'] = bool(args['dia']) data['uuid'] = '' if args['uuid']: data['uuid'] = str(args['uuid']) data['emitidas'] = emitidas data['rfc_emisor'] = args.get('rfc_emisor', '') data['rfc_receptor'] = args.get('rfc_receptor', '') data['type_cfdi'] = args.get('tipo_complemento', '-1') if args['fecha_inicial'] and args['fecha_final'] and emitidas: dates = self._get_dates(args['fecha_inicial'], args['fecha_final']) for start, end in dates: data['date_from'] = start data['date_to'] = end filters.append(Filters(data)) elif args['fecha_inicial'] and args['fecha_final']: dates = self._get_dates_recibidas(args['fecha_inicial'], args['fecha_final']) is_first_date = False for d in dates: if not is_first_date: data['date_from'] = d is_first_date = True else: d = d.replace(hour=0, minute=0, second=0, microsecond=0) data['date_from'] = d data['day'] = True filters.append(Filters(data)) elif args['intervalo_dias'] and emitidas: data['date_from'], data['date_to'] = self._time_delta(args['intervalo_dias']) filters.append(Filters(data)) elif args['intervalo_dias']: dates = self._time_delta_recibidas(args['intervalo_dias']) for d in dates: data['date_from'] = d data['day'] = True filters.append(Filters(data)) elif args['uuid']: data['date_from'] = None filters.append(Filters(data)) else: day = args['dia'] or 1 data['date_from'] = datetime.datetime(args['ano'], args['mes'], day) filters.append(Filters(data)) return tuple(filters) def _segment_filter(self, filters): new_filters = [] if filters.stop: return new_filters date = filters.date_from date_to = filters.date_to if filters.minute: for m in range(10): nf = deepcopy(filters) nf.stop = True nf.date_from = date + datetime.timedelta(minutes=m) nf.date_to = date + datetime.timedelta(minutes=m + 1) new_filters.append(nf) elif filters.hour: minutes = tuple(range(0, 60, 10)) + (0,) minutes = tuple(zip(minutes, minutes[1:])) for m in minutes: nf = deepcopy(filters) nf.minute = True nf.date_from = date + datetime.timedelta(minutes=m[0]) nf.date_to = date + datetime.timedelta(minutes=m[1]) if m[0] == 50 and nf.date_to.hour == 23: nf.date_to = nf.date_to.replace( hour=nf.date_to.hour, minute=59, second=59) elif m[0] == 50 and nf.date_to.hour != 23: nf.date_to = nf.date_to.replace( hour=nf.date_to.hour + 1, minute=0, second=0) new_filters.append(nf) elif filters.day: hours = tuple(range(0, 25)) hours = tuple(zip(hours, hours[1:])) for h in hours: nf = deepcopy(filters) nf.hour = True nf.date_from = date + datetime.timedelta(hours=h[0]) nf.date_to = date + datetime.timedelta(hours=h[1]) if h[1] == 24: nf.date_to = nf.date_from.replace( minute=59, second=59, microsecond=0) new_filters.append(nf) else: last_day = calendar.monthrange(date.year, date.month)[1] for d in range(last_day): nf = deepcopy(filters) nf.day = True nf.date_from = date + datetime.timedelta(days=d) nf.date_to = nf.date_from.replace( hour=23, minute=59, second=59, microsecond=0) new_filters.append(nf) if date_to == nf.date_to: break return new_filters #OBTENER INFORMACION QUE SE ENVIARA EN LA CONSULTA PARA OBTENER LOS CFDIS def _get_post(self, html): validos = ('EVENTTARGET', '__EVENTARGUMENT', '__LASTFOCUS', '__VIEWSTATE') values = html.split('|') post = {v: values[i + 1] for i, v in enumerate(values) if v in validos} return post #ASIGNAR UN HEADER PARA LA CONSULTA DE CFDIS def _set_search_headers(self): self._session.headers = { "cache-control": "no-cache", "x-requested-with": "XMLHttpRequest", "user-agent": "Mozilla/5.0 (X11; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0", "x-microsoftajax": "Delta=true", "content-type": "application/x-www-form-urlencoded; charset=UTF-8", "accept": "*/*", "sec-gpc": "1", "accept-language": "es-ES,es;q=0.5", "origin": "https://portalcfdi.facturaelectronica.sat.gob.mx", "sec-fetch-site": "same-origin", "sec-fetch-mode": "cors", "sec-fetch-dest": "empty", "referer": "https://portalcfdi.facturaelectronica.sat.gob.mx/ConsultaEmisor.aspx", "accept-encoding": "gzip, deflate, br, zstd", "priority": "u=1, i", } return True #OBTENCION DE LA PAGINA DE CONSULTA POR FECHAS def _change_to_date(self, url_search): client = self._session self._set_search_headers() response = client.get(url_search) data = self._read_form(response.text) post = self._merge(data, self._get_post_form_dates()) headers = self._get_headers(self.PORTAL, url_search, True) response = client.post(url=url_search, headers=headers, data=post) post = self._get_post(response.text) return data, post #BUSQUEDA DE CFDIS RECIBIDOS CREADOS POR UN PROVEEDOR def _search_recibidas(self, filters): url_search = self.URL_RECEPTOR values, post_source = self._change_to_date(url_search) invoice_content = {} for f in filters: post = self._merge(values, f.get_post()) post = self._merge(post, post_source) headers = self._get_headers(self.PORTAL, url_search, True) html = self._response(url_search, 'post', headers, post) not_found, limit, invoices = self._get_download_links(html) if not_found or not invoices: msg = '\n\tNo se encontraron documentos en el filtro:\n\t{}'.format(str(f)) _logger.info(msg) else: data = self._download(invoices, limit, f) if data and type(data) == dict: invoice_content.update(data) return invoice_content #BUSQUEDA DE CFDIS EMITIDOS CREADOS POR LA EMPRESA def _search_emitidas(self, filters): url_search = self.URL_EMISOR values, post_source = self._change_to_date(url_search) invoice_content = {} for f in filters: _logger.info(str(f)) post = self._merge(values, f.get_post()) post = self._merge(post, post_source) headers = self._get_headers(self.PORTAL, url_search, True) html = self._response(url_search, 'post', headers, post) not_found, limit, invoices = self._get_download_links(html) if not_found or not invoices: msg = '\n\tNo se encontraron documentos en el filtro:\n\t{}'.format(str(f)) _logger.info(msg) else: data = self._download(invoices, limit, f, self.DIR_EMITIDAS) if data and type(data) == dict: invoice_content.update(data) return invoice_content #PROCESO DE BUSQUEDA DE CFDIS PARA SU DESCARGA def search(self, opt, download_option='both'): self._only_status = opt['estatus'] invoice_content_e, invoice_content_r = {}, {} if download_option == 'both': filters_e = self._get_filters(opt, True) invoice_content_e = self._search_emitidas(filters_e) filters_r = self._get_filters(opt, False) invoice_content_r = self._search_recibidas(filters_r) elif download_option == 'supplier': filters_r = self._get_filters(opt, False) invoice_content_r = self._search_recibidas(filters_r) elif download_option == 'customer': filters_e = self._get_filters(opt, True) invoice_content_e = self._search_emitidas(filters_e) return invoice_content_r, invoice_content_e #PROCESO DE DESCARGA def _download(self, invoices, limit=False, filters=None, folder=DIR_RECIBIDAS): if not invoices and not limit: msg = '\n\tTodos los documentos han sido previamente descargados para el filtro.\n\t{}'.format(str(filters)) _logger.info(msg) return {} invoices_content = {} if invoices and not self.only_search: invoices_content = self._thread_download(invoices, folder, filters) if limit: sf = self._segment_filter(filters) if folder == self.DIR_RECIBIDAS: data = self._search_recibidas(sf) if data and type(data) == dict: invoices_content.update(data) else: data = self._search_emitidas(sf) if data and type(data) == dict: invoices_content.update(data) return invoices_content #OBTENCION DE LOS VALORE DE LA PETICION PARA OBTENER LAS URL DE LOS CFDI def _thread_download(self, invoices, folder, filters): for_download = invoices[:] current = 1 total = len(for_download) invoice_content = {} for i in range(TRY_COUNT): for uuid, values in for_download: data = { 'url': values['url'], 'acuse': values['acuse'], } content = self._get_xml(uuid, data, current, total) pdf_content = self._get_pdf(uuid, data, current, total) if content: invoice_content.update({uuid: [values, content, pdf_content]}) current += 1 if len(invoice_content) == len(for_download): break if total: msg = '{} documentos por descargar en: {}'.format(total, str(filters)) _logger.info(msg) return invoice_content #OBTENER EL DOCUMENTO XML POR MEDIO DE LA URL DE DESCARGAR def _get_xml(self, uuid, values, current, count): for i in range(TRY_COUNT): try: r = self._session.get(values['url'], timeout=TIMEOUT) if r.status_code == 200: return r.content except exceptions.Timeout: _logger.debug('Tiempo de espera sobrepasado') continue except Exception as e: _logger.error(str(e)) return msg = 'Tiempo de espera agotado para el documento: {}'.format(uuid) _logger.error(msg) return # OBTENER EL DOCUMENTO XML POR MEDIO DE LA URL DE DESCARGAR def _get_pdf(self, uuid, values, current, count): for i in range(TRY_COUNT): try: r = self._session.get(values['acuse'], timeout=TIMEOUT) if r.status_code == 200: return r.content except exceptions.Timeout: _logger.debug('Tiempo de espera sobrepasado') continue except Exception as e: _logger.error(str(e)) return msg = 'Tiempo de espera agotado para el documento: {}'.format(uuid) _logger.error(msg) return def _get_download_links(self, html): parser = Invoice() parser.feed(html) return parser.not_found, parser.limit, parser.invoices def logout(self): msg = 'Cerrando sessión en el SAT' _logger.debug(msg) response = self._response(self.URL_LOGOUT) self.is_connect = False self._session.close() msg = 'Sesión cerrada en el SAT' _logger.info(msg) return