portal_sat.py 35 KB


  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import base64
  4. import calendar
  5. import datetime
  6. from copy import deepcopy
  7. from html.parser import HTMLParser
  8. from uuid import UUID
  9. from OpenSSL import crypto
  10. from requests import exceptions, adapters
  11. import httpx
  12. import urllib3
  13. import ssl
  14. urllib3.disable_warnings()
  15. import logging
  16. _logger = logging.getLogger(__name__)
  17. TIMEOUT = 120
  18. TRY_COUNT = 3
  19. VERIFY_CERT = True
  20. CONTEXT = ssl.create_default_context()
  21. CONTEXT.set_ciphers('HIGH:!DH:!aNULL')
  22. #LE DA FORMATO A LOS VALORES DEL HTML
  23. class FormValues(HTMLParser):
  24. _description = 'Elementos del HTML'
  25. def __init__(self):
  26. super().__init__()
  27. self.values = {}
  28. def handle_starttag(self, tag, attrs):
  29. if tag in ('input', 'select'):
  30. a = dict(attrs)
  31. if a.get('type', '') and a['type'] == 'hidden':
  32. if 'name' in a and 'value' in a:
  33. self.values[a['name']] = a['value']
  34. #LE DA FORMATO A LOS VALORES DEL HTML DEL INICIO DE SESION
  35. class FormLoginValues(HTMLParser):
  36. _description = 'Elementos del HTML del inicio de sesión'
  37. def __init__(self):
  38. super().__init__()
  39. self.values = {}
  40. def handle_starttag(self, tag, attrs):
  41. if tag == 'input':
  42. attrib = dict(attrs)
  43. try:
  44. self.values[attrib['id']] = attrib['value']
  45. except:
  46. pass
  47. class Filters(object):
  48. _description = 'Filters'
  49. def __init__(self, args):
  50. self.date_from = args['date_from']
  51. self.day = args.get('day', False)
  52. self.emitidas = args['emitidas']
  53. self.date_to = None
  54. if self.date_from:
  55. self.date_to = args.get('date_to', self._now()).replace(hour=23, minute=59, second=59, microsecond=0)
  56. self.uuid = str(args.get('uuid', ''))
  57. self.stop = False
  58. self.hour = False
  59. self.minute = False
  60. self._init_values(args)
  61. def __str__(self):
  62. if self.uuid:
  63. msg = 'Descargar por UUID'
  64. elif self.hour:
  65. msg = 'Descargar por HORA'
  66. elif self.day:
  67. msg = 'Descargar por DIA'
  68. else:
  69. msg = 'Descargar por MES'
  70. tipo = 'Recibidas'
  71. if self.emitidas:
  72. tipo = 'Emitidas'
  73. if self.uuid:
  74. return '{} - {} - {}'.format(msg, self.uuid, tipo)
  75. else:
  76. return '{} - {} - {} - {}'.format(msg, self.date_from, self.date_to, tipo)
  77. def _now(self):
  78. if self.day:
  79. n = self.date_from
  80. else:
  81. last_day = calendar.monthrange(
  82. self.date_from.year, self.date_from.month)[1]
  83. n = datetime.datetime(self.date_from.year, self.date_from.month, last_day)
  84. return n
  85. def _init_values(self, args):
  86. status = '-1'
  87. type_cfdi = args.get('type_cfdi', '-1')
  88. center_filter = 'RdoFechas'
  89. if self.uuid:
  90. center_filter = 'RdoFolioFiscal'
  91. rfc_receptor = args.get('rfc_emisor', False)
  92. if self.emitidas:
  93. rfc_receptor = args.get('rfc_receptor', False)
  94. script_manager = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda'
  95. self._post = {
  96. '__ASYNCPOST': 'true',
  97. '__EVENTTARGET': '',
  98. '__EVENTARGUMENT': '',
  99. '__LASTFOCUS': '',
  100. '__VIEWSTATEENCRYPTED': '',
  101. 'ctl00$ScriptManager1': script_manager,
  102. 'ctl00$MainContent$hfInicialBool': 'false',
  103. 'ctl00$MainContent$BtnBusqueda': 'Buscar CFDI',
  104. 'ctl00$MainContent$TxtUUID': self.uuid,
  105. 'ctl00$MainContent$FiltroCentral': center_filter,
  106. 'ctl00$MainContent$DdlEstadoComprobante': status,
  107. 'ctl00$MainContent$ddlComplementos': type_cfdi,
  108. }
  109. return
  110. def get_post(self):
  111. start_hour = '0'
  112. start_minute = '0'
  113. start_second = '0'
  114. end_hour = '0'
  115. end_minute = '0'
  116. end_second = '0'
  117. if self.date_from:
  118. start_hour = str(self.date_from.hour)
  119. start_minute = str(self.date_from.minute)
  120. start_second = str(self.date_from.second)
  121. end_hour = str(self.date_to.hour)
  122. end_minute = str(self.date_to.minute)
  123. end_second = str(self.date_to.second)
  124. if self.emitidas:
  125. year1 = '0'
  126. year2 = '0'
  127. start = ''
  128. end = ''
  129. if self.date_from:
  130. year1 = str(self.date_from.year)
  131. year2 = str(self.date_to.year)
  132. start = self.date_from.strftime('%d/%m/%Y')
  133. end = self.date_to.strftime('%d/%m/%Y')
  134. data = {
  135. 'ctl00$MainContent$hfInicial': year1,
  136. 'ctl00$MainContent$CldFechaInicial2$Calendario_text': start,
  137. 'ctl00$MainContent$CldFechaInicial2$DdlHora': start_hour,
  138. 'ctl00$MainContent$CldFechaInicial2$DdlMinuto': start_minute,
  139. 'ctl00$MainContent$CldFechaInicial2$DdlSegundo': start_second,
  140. 'ctl00$MainContent$hfFinal': year2,
  141. 'ctl00$MainContent$CldFechaFinal2$Calendario_text': end,
  142. 'ctl00$MainContent$CldFechaFinal2$DdlHora': end_hour,
  143. 'ctl00$MainContent$CldFechaFinal2$DdlMinuto': end_minute,
  144. 'ctl00$MainContent$CldFechaFinal2$DdlSegundo': end_second,
  145. }
  146. else:
  147. year = '0'
  148. month = '0'
  149. if self.date_from:
  150. year = str(self.date_from.year)
  151. month = str(self.date_from.month)
  152. day = '00'
  153. if self.day:
  154. day = '{:02d}'.format(self.date_from.day)
  155. data = {
  156. 'ctl00$MainContent$CldFecha$DdlAnio': year,
  157. 'ctl00$MainContent$CldFecha$DdlMes': month,
  158. 'ctl00$MainContent$CldFecha$DdlDia': day,
  159. 'ctl00$MainContent$CldFecha$DdlHora': start_hour,
  160. 'ctl00$MainContent$CldFecha$DdlMinuto': start_minute,
  161. 'ctl00$MainContent$CldFecha$DdlSegundo': start_second,
  162. 'ctl00$MainContent$CldFecha$DdlHoraFin': end_hour,
  163. 'ctl00$MainContent$CldFecha$DdlMinutoFin': end_minute,
  164. 'ctl00$MainContent$CldFecha$DdlSegundoFin': end_second,
  165. }
  166. self._post.update(data)
  167. return self._post
  168. class Invoice(HTMLParser):
  169. _description = 'Invoice'
  170. START_PAGE = 'ContenedorDinamico'
  171. URL = 'https://portalcfdi.facturaelectronica.sat.gob.mx/'
  172. END_PAGE = 'ctl00_MainContent_pageNavPosition'
  173. LIMIT_RECORDS = 'ctl00_MainContent_PnlLimiteRegistros'
  174. NOT_RECORDS = 'ctl00_MainContent_PnlNoResultados'
  175. TEMPLATE_DATE = '%Y-%m-%dT%H:%M:%S'
  176. def __init__(self):
  177. super().__init__()
  178. self._is_div_page = False
  179. self._col = 0
  180. self._current_tag = ''
  181. self._last_link = ''
  182. self._last_link_pdf = ''
  183. self._last_uuid = ''
  184. self._last_status = ''
  185. self._last_date_cfdi = ''
  186. self._last_date_timbre = ''
  187. self._last_pac = ''
  188. self._last_total = ''
  189. self._last_type = ''
  190. self._last_date_cancel = ''
  191. self._last_emisor_rfc = ''
  192. self._last_emisor = ''
  193. self._last_receptor_rfc = ''
  194. self._last_receptor = ''
  195. self.invoices = []
  196. self.not_found = False
  197. self.limit = False
  198. def handle_starttag(self, tag, attrs):
  199. self._current_tag = tag
  200. if tag == 'div':
  201. attrib = dict(attrs)
  202. if 'id' in attrib and attrib['id'] == self.NOT_RECORDS \
  203. and 'inline' in attrib['style']:
  204. self.not_found = True
  205. elif 'id' in attrib and attrib['id'] == self.LIMIT_RECORDS:
  206. self.limit = True
  207. elif 'id' in attrib and attrib['id'] == self.START_PAGE:
  208. self._is_div_page = True
  209. elif 'id' in attrib and attrib['id'] == self.END_PAGE:
  210. self._is_div_page = False
  211. elif self._is_div_page and tag == 'td':
  212. self._col += 1
  213. elif tag == 'span':
  214. attrib = dict(attrs)
  215. if attrib.get('id', '') == 'BtnDescarga':
  216. self._last_link = attrib['onclick'].split("'")[1]
  217. if attrib.get('id', '') == 'BtnRI':
  218. self._last_link_pdf = attrib['onclick'].split("'")[1]
  219. def handle_endtag(self, tag):
  220. if self._is_div_page and tag == 'tr':
  221. if self._last_uuid:
  222. url_xml = ''
  223. if self._last_link:
  224. url_xml = '{}{}'.format(self.URL, self._last_link)
  225. self._last_link = ''
  226. url_pdf = ''
  227. if self._last_link_pdf:
  228. url_pdf = '{}{}{}'.format(self.URL, "RepresentacionImpresa.aspx?Datos=", self._last_link_pdf)
  229. date_cancel = None
  230. if self._last_date_cancel:
  231. date_cancel = datetime.datetime.strptime(
  232. self._last_date_cancel, self.TEMPLATE_DATE)
  233. invoice = (self._last_uuid,
  234. {
  235. 'url': url_xml,
  236. 'acuse': url_pdf,
  237. 'estatus': self._last_status,
  238. 'date_cfdi': datetime.datetime.strptime(
  239. self._last_date_cfdi, self.TEMPLATE_DATE),
  240. 'date_timbre': datetime.datetime.strptime(
  241. self._last_date_timbre, self.TEMPLATE_DATE),
  242. 'date_cancel': date_cancel,
  243. 'rfc_pac': self._last_pac,
  244. 'total': float(self._last_total),
  245. 'tipo': self._last_type,
  246. 'emisor': self._last_emisor,
  247. 'rfc_emisor': self._last_emisor_rfc,
  248. 'receptor': self._last_receptor,
  249. 'rfc_receptor': self._last_receptor_rfc,
  250. }
  251. )
  252. self.invoices.append(invoice)
  253. self._last_uuid = ''
  254. self._last_status = ''
  255. self._last_date_cancel = ''
  256. self._last_emisor_rfc = ''
  257. self._last_emisor = ''
  258. self._last_receptor_rfc = ''
  259. self._last_receptor = ''
  260. self._last_date_cfdi = ''
  261. self._last_date_timbre = ''
  262. self._last_pac = ''
  263. self._last_total = ''
  264. self._last_type = ''
  265. self._col = 0
  266. def handle_data(self, data):
  267. cv = data.strip()
  268. if self._is_div_page and self._current_tag == 'span' and cv:
  269. if self._col == 1:
  270. try:
  271. UUID(cv)
  272. self._last_uuid = cv
  273. except ValueError:
  274. pass
  275. elif self._col == 2:
  276. self._last_emisor_rfc = cv
  277. elif self._col == 3:
  278. self._last_emisor = cv
  279. elif self._col == 4:
  280. self._last_receptor_rfc = cv
  281. elif self._col == 5:
  282. self._last_receptor = cv
  283. elif self._col == 6:
  284. self._last_date_cfdi = cv
  285. elif self._col == 7:
  286. self._last_date_timbre = cv
  287. elif self._col == 8:
  288. self._last_pac = cv
  289. elif self._col == 9:
  290. self._last_total = cv.replace('$', '').replace(',', '')
  291. elif self._col == 10:
  292. self._last_type = cv.lower()
  293. elif self._col == 12:
  294. self._last_status = cv
  295. elif self._col == 14:
  296. self._last_date_cancel = cv
  297. # CONEXION Y OBTENCION DE ELEMENTOS DEL SAT
  298. class PortalSAT(object):
  299. _description = 'Conexion al portal del SAT inicio de sesion y descarga'
  300. # CONSTANTES PARA LA CONEXION
  301. URL_MAIN = 'https://portal.facturaelectronica.sat.gob.mx/'
  302. HOST = 'cfdiau.sat.gob.mx'
  303. BROWSER = 'Mozilla/5.0 (X11; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0'
  304. REFERER = 'https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0'
  305. PORTAL = 'portalcfdi.facturaelectronica.sat.gob.mx'
  306. URL_LOGIN = 'https://{}/nidp/app/login'.format(HOST)
  307. URL_FORM = 'https://{}/nidp/app/login?sid=0&sid=0'.format(HOST)
  308. URL_PORTAL = 'https://portalcfdi.facturaelectronica.sat.gob.mx/'
  309. URL_CONTROL = 'https://cfdicontribuyentes.accesscontrol.windows.net/v2/wsfederation'
  310. URL_CONSULTA = URL_PORTAL + 'Consulta.aspx'
  311. URL_RECEPTOR = URL_PORTAL + 'ConsultaReceptor.aspx'
  312. URL_EMISOR = URL_PORTAL + 'ConsultaEmisor.aspx'
  313. URL_LOGOUT = URL_PORTAL + 'logout.aspx?salir=y'
  314. DIR_EMITIDAS = 'emitidas'
  315. DIR_RECIBIDAS = 'recibidas'
  316. COMPANY_ID = ""
  317. def __init__(self, rfc, target, sin):
  318. self._rfc = rfc
  319. self.error = ''
  320. self.is_connect = False
  321. self.not_network = False
  322. self.only_search = False
  323. self.only_test = False
  324. self.sin_sub = sin
  325. self._only_status = False
  326. self._init_values(target)
  327. def _init_values(self, target):
  328. self._folder = target
  329. self._emitidas = False
  330. self._current_year = datetime.datetime.now().year
  331. self._session = httpx.Client(http2=True, timeout=TIMEOUT, verify=CONTEXT)
  332. a = adapters.HTTPAdapter(pool_connections=512, pool_maxsize=512, max_retries=5)
  333. return
  334. def _get_post_form_dates(self):
  335. post = {}
  336. post['__ASYNCPOST'] = 'true'
  337. post['__EVENTARGUMENT'] = ''
  338. post['__EVENTTARGET'] = 'ctl00$MainContent$RdoFechas'
  339. post['__LASTFOCUS'] = ''
  340. post['ctl00$MainContent$CldFecha$DdlAnio'] = str(self._current_year)
  341. post['ctl00$MainContent$CldFecha$DdlDia'] = '0'
  342. post['ctl00$MainContent$CldFecha$DdlHora'] = '0'
  343. post['ctl00$MainContent$CldFecha$DdlHoraFin'] = '23'
  344. post['ctl00$MainContent$CldFecha$DdlMes'] = '1'
  345. post['ctl00$MainContent$CldFecha$DdlMinuto'] = '0'
  346. post['ctl00$MainContent$CldFecha$DdlMinutoFin'] = '59'
  347. post['ctl00$MainContent$CldFecha$DdlSegundo'] = '0'
  348. post['ctl00$MainContent$CldFecha$DdlSegundoFin'] = '59'
  349. post['ctl00$MainContent$DdlEstadoComprobante'] = '-1'
  350. post['ctl00$MainContent$FiltroCentral'] = 'RdoFechas'
  351. post['ctl00$MainContent$TxtRfcReceptor'] = ''
  352. post['ctl00$MainContent$TxtUUID'] = ''
  353. post['ctl00$MainContent$ddlComplementos'] = '-1'
  354. post['ctl00$MainContent$hfInicialBool'] = 'true'
  355. post['ctl00$ScriptManager1'] = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$RdoFechas'
  356. return post
  357. #OBTENER RESPUESTAS DE LAS PETICIONES QUE SE REALIZAN AL SAT
  358. def _response(self, url, method='get', headers={}, data={}):
  359. try:
  360. if method == 'get':
  361. result = self._session.get(url, timeout=TIMEOUT)
  362. else:
  363. result = self._session.post(url, data=data, timeout=TIMEOUT)
  364. msg = '{} {} {}'.format(result.status_code, method.upper(), url)
  365. if result.status_code == 200:
  366. return result.text
  367. else:
  368. _logger.error(msg)
  369. return ''
  370. except exceptions.Timeout:
  371. msg = 'Tiempo de espera agotado'
  372. self.not_network = True
  373. _logger.error(msg)
  374. return ''
  375. except exceptions.ConnectionError:
  376. msg = 'Revisa la conexión a Internet'
  377. self.not_network = True
  378. _logger.error(msg)
  379. return ''
  380. #LECTURA Y OBTENCION DE CIERTOS ELEMENTOS QUE SE PRESENTAN EN EL HTML
  381. def _read_form(self, html, form=''):
  382. if form == 'login':
  383. parser = FormLoginValues()
  384. else:
  385. parser = FormValues()
  386. parser.feed(html)
  387. return parser.values
  388. #OBTENCION DEL CABECERO QUE SE ENVIARA EN ALGUNAS PETICIONES
  389. def _get_headers(self, host, referer, ajax=False):
  390. acept = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
  391. headers = {
  392. 'Accept': acept,
  393. 'Accept-Encoding': 'gzip, deflate, br',
  394. 'Accept-Language': 'es-ES,es;q=0.5',
  395. 'Connection': 'keep-alive',
  396. 'DNT': '1',
  397. 'Host': host,
  398. 'Referer': referer,
  399. 'Upgrade-Insecure-Requests': '1',
  400. 'User-Agent': self.BROWSER,
  401. 'Content-Type': 'application/x-www-form-urlencoded',
  402. }
  403. if ajax:
  404. headers.update({
  405. 'Cache-Control': 'no-cache',
  406. 'X-MicrosoftAjax': 'Delta=true',
  407. 'x-requested-with': 'XMLHttpRequest',
  408. 'Pragma': 'no-cache',
  409. })
  410. return headers
  411. #OBTENER LOS VALORES DE LOS CAMPOS Y BOTONES DE LA PANTALLA DE CONSULTA
  412. def _get_post_type_search(self, html):
  413. tipo_busqueda = 'RdoTipoBusquedaReceptor'
  414. if self._emitidas:
  415. tipo_busqueda = 'RdoTipoBusquedaEmisor'
  416. sm = 'ctl00$MainContent$UpnlBusqueda|ctl00$MainContent$BtnBusqueda'
  417. post = self._read_form(html)
  418. post['ctl00$MainContent$TipoBusqueda'] = tipo_busqueda
  419. post['__ASYNCPOST'] = 'true'
  420. post['__EVENTTARGET'] = ''
  421. post['__EVENTARGUMENT'] = ''
  422. post['ctl00$ScriptManager1'] = sm
  423. return post
  424. #OBTENER INFORMACION DEL CERTIFICADO
  425. def _get_data_cert(self, fiel_cert_data):
  426. cert = crypto.load_certificate(crypto.FILETYPE_ASN1, fiel_cert_data)
  427. rfc = cert.get_subject().x500UniqueIdentifier.split(' ')[0]
  428. serie = '{0:x}'.format(cert.get_serial_number())[1::2]
  429. fert = cert.get_notAfter().decode()[2:]
  430. return rfc, serie, fert
  431. #OBTENER UNA FIRMA PARA PODER OBTENER UN TOKEN MAS ADELANTE
  432. def _sign(self, fiel_pem_data, data):
  433. key = crypto.load_privatekey(crypto.FILETYPE_PEM, fiel_pem_data)
  434. sign = base64.b64encode(crypto.sign(key, data, 'sha256'))
  435. return base64.b64encode(sign).decode('utf-8')
  436. #OBTENCION DEL TOKEN QUE NOS PERMITIRA MANTENER LA SESION INICIADA
  437. def _get_token(self, firma, co):
  438. co = base64.b64encode(co.encode('utf-8')).decode('utf-8')
  439. data = '{}#{}'.format(co, firma).encode('utf-8')
  440. token = base64.b64encode(data).decode('utf-8')
  441. return token
  442. #OBTENCION DE LA INFORMACION QUE SE ENVIARA AL SAT PARA EL INICIO DE SESION
  443. def _make_data_form(self, fiel_cert_data, fiel_pem_data, values):
  444. rfc, serie, fert = self._get_data_cert(fiel_cert_data)
  445. co = '{}|{}|{}'.format(values['tokenuuid'], rfc, serie)
  446. firma = self._sign(fiel_pem_data, co)
  447. token = self._get_token(firma, co)
  448. keys = ('credentialsRequired', 'guid', 'ks', 'urlApplet')
  449. data = {k: values[k] for k in keys}
  450. data['fert'] = fert
  451. data['token'] = token
  452. data['arc'] = ''
  453. data['placer'] = ''
  454. data['secuence'] = ''
  455. data['seeder'] = ''
  456. data['tan'] = ''
  457. return data
  458. # CONEXION CON EL PORTAL DEL SAT
  459. def login_fiel(self, fiel_cert_data, fiel_pem_data, certificate, company_id):
  460. # CREAMOS SESION PERSISTENTE
  461. client = self._session
  462. # MANDAMOS LA SOLICITUD DE OBTENCION DEL SITIO WEB https://portal.facturaelectronica.sat.gob.mx/ PARA OBTENER REDIRECCIONAMIENTO
  463. response = client.get(url=self.URL_MAIN)
  464. # PETICION AL LOGIN CON FIEL
  465. headers = {
  466. "referer": self.get_url(response.url),
  467. }
  468. response = client.post(url="https://cfdiau.sat.gob.mx/nidp/wsfed/ep?id=SATUPCFDiCon&sid=0&option=credential&sid=0", headers=headers)
  469. # PETICION PARA OBTENER EL FORMULARIO
  470. headers["referer"] = self.get_url(response.url)
  471. response = client.get(url="https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0", headers=headers)
  472. values = self._read_form(response.text, 'login')
  473. data = self._make_data_form(fiel_cert_data, fiel_pem_data, values)
  474. headers["referer"] = self.get_url(response.url)
  475. headers.update(self._get_headers(self.HOST, self.get_url(response.url)))
  476. headers = {
  477. "cache-control": "max-age=0",
  478. "origin": "https://cfdiau.sat.gob.mx",
  479. "content-type": "application/x-www-form-urlencoded",
  480. "upgrade-insecure-requests": "1",
  481. "user-agent": "Mozilla/5.0 (X11; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0",
  482. "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
  483. "sec-gpc": "1",
  484. "accept-language": "es-ES,es;q=0.5",
  485. "sec-fetch-site": "same-origin",
  486. "sec-fetch-dest": "document",
  487. "referer": "https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0",
  488. "accept-encoding": "gzip, deflate, br, zstd",
  489. "priority": "u=0, i",
  490. }
  491. #NOS IDENTIFICAMOS EN EL SAT PARA PODER SEGUIR CON PETICIONES Y CONSULTAS
  492. response = client.post(url="https://cfdiau.sat.gob.mx/nidp/app/login?id=SATx509Custom&sid=0&option=credential&sid=0", headers=headers, data=data)
  493. headers["referer"] = "https://portal.facturaelectronica.sat.gob.mx/"
  494. headers["Host"] = self.HOST
  495. #SE OBTIENE LA PAGINA DE CONSULTA PARA OBTENER DATOS NECESARIOS PARA SU POSTERIOR USO EN LAS BUSQUEDAS
  496. response = client.get(url=self.URL_CONSULTA)
  497. data = self._read_form(response.text)
  498. #SE MANDA LA INFORMACION PARA PODER SER REDIRIGIDOS CORRECTAMENTE A LA PAGINA DE CONSULTA
  499. response = client.post(url=self.URL_CONSULTA, data=data)
  500. self._session.headers.update(headers=headers)
  501. self.is_connect = True
  502. return True
  503. def get_url(self, url_object):
  504. url = f"{url_object.scheme}/{url_object.host}{url_object.full_path}"
  505. return url
  506. def _merge(self, list1, list2):
  507. result = list1.copy()
  508. result.update(list2)
  509. return result
  510. def _last_day(self, date):
  511. last_day = calendar.monthrange(date.year, date.month)[1]
  512. return datetime.datetime(date.year, date.month, last_day)
  513. def _get_dates(self, d1, d2):
  514. end = d2
  515. dates = []
  516. while True:
  517. d2 = self._last_day(d1)
  518. if d2 >= end:
  519. dates.append((d1, end))
  520. break
  521. dates.append((d1, d2))
  522. d1 = d2 + datetime.timedelta(days=1)
  523. return dates
  524. def _get_dates_recibidas(self, d1, d2):
  525. days = (d2 - d1).days + 1
  526. return [d1 + datetime.timedelta(days=d) for d in range(days)]
  527. def _time_delta(self, days):
  528. now = datetime.datetime.now()
  529. date_from = now.replace(
  530. hour=0, minute=0, second=0, microsecond=0) - datetime.timedelta(days=days)
  531. date_to = now.replace(hour=23, minute=59, second=59, microsecond=0)
  532. return date_from, date_to
  533. def _time_delta_recibidas(self, days):
  534. now = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  535. return [now - datetime.timedelta(days=d) for d in range(days)]
  536. #FILTROS PARA LA CONSULTA
  537. def _get_filters(self, args, emitidas=True):
  538. filters = []
  539. data = {}
  540. data['day'] = bool(args['dia'])
  541. data['uuid'] = ''
  542. if args['uuid']:
  543. data['uuid'] = str(args['uuid'])
  544. data['emitidas'] = emitidas
  545. data['rfc_emisor'] = args.get('rfc_emisor', '')
  546. data['rfc_receptor'] = args.get('rfc_receptor', '')
  547. data['type_cfdi'] = args.get('tipo_complemento', '-1')
  548. if args['fecha_inicial'] and args['fecha_final'] and emitidas:
  549. dates = self._get_dates(args['fecha_inicial'], args['fecha_final'])
  550. for start, end in dates:
  551. data['date_from'] = start
  552. data['date_to'] = end
  553. filters.append(Filters(data))
  554. elif args['fecha_inicial'] and args['fecha_final']:
  555. dates = self._get_dates_recibidas(args['fecha_inicial'], args['fecha_final'])
  556. is_first_date = False
  557. for d in dates:
  558. if not is_first_date:
  559. data['date_from'] = d
  560. is_first_date = True
  561. else:
  562. d = d.replace(hour=0, minute=0, second=0, microsecond=0)
  563. data['date_from'] = d
  564. data['day'] = True
  565. filters.append(Filters(data))
  566. elif args['intervalo_dias'] and emitidas:
  567. data['date_from'], data['date_to'] = self._time_delta(args['intervalo_dias'])
  568. filters.append(Filters(data))
  569. elif args['intervalo_dias']:
  570. dates = self._time_delta_recibidas(args['intervalo_dias'])
  571. for d in dates:
  572. data['date_from'] = d
  573. data['day'] = True
  574. filters.append(Filters(data))
  575. elif args['uuid']:
  576. data['date_from'] = None
  577. filters.append(Filters(data))
  578. else:
  579. day = args['dia'] or 1
  580. data['date_from'] = datetime.datetime(args['ano'], args['mes'], day)
  581. filters.append(Filters(data))
  582. return tuple(filters)
  583. def _segment_filter(self, filters):
  584. new_filters = []
  585. if filters.stop:
  586. return new_filters
  587. date = filters.date_from
  588. date_to = filters.date_to
  589. if filters.minute:
  590. for m in range(10):
  591. nf = deepcopy(filters)
  592. nf.stop = True
  593. nf.date_from = date + datetime.timedelta(minutes=m)
  594. nf.date_to = date + datetime.timedelta(minutes=m + 1)
  595. new_filters.append(nf)
  596. elif filters.hour:
  597. minutes = tuple(range(0, 60, 10)) + (0,)
  598. minutes = tuple(zip(minutes, minutes[1:]))
  599. for m in minutes:
  600. nf = deepcopy(filters)
  601. nf.minute = True
  602. nf.date_from = date + datetime.timedelta(minutes=m[0])
  603. nf.date_to = date + datetime.timedelta(minutes=m[1])
  604. if m[0] == 50 and nf.date_to.hour == 23:
  605. nf.date_to = nf.date_to.replace(
  606. hour=nf.date_to.hour, minute=59, second=59)
  607. elif m[0] == 50 and nf.date_to.hour != 23:
  608. nf.date_to = nf.date_to.replace(
  609. hour=nf.date_to.hour + 1, minute=0, second=0)
  610. new_filters.append(nf)
  611. elif filters.day:
  612. hours = tuple(range(0, 25))
  613. hours = tuple(zip(hours, hours[1:]))
  614. for h in hours:
  615. nf = deepcopy(filters)
  616. nf.hour = True
  617. nf.date_from = date + datetime.timedelta(hours=h[0])
  618. nf.date_to = date + datetime.timedelta(hours=h[1])
  619. if h[1] == 24:
  620. nf.date_to = nf.date_from.replace(
  621. minute=59, second=59, microsecond=0)
  622. new_filters.append(nf)
  623. else:
  624. last_day = calendar.monthrange(date.year, date.month)[1]
  625. for d in range(last_day):
  626. nf = deepcopy(filters)
  627. nf.day = True
  628. nf.date_from = date + datetime.timedelta(days=d)
  629. nf.date_to = nf.date_from.replace(
  630. hour=23, minute=59, second=59, microsecond=0)
  631. new_filters.append(nf)
  632. if date_to == nf.date_to:
  633. break
  634. return new_filters
  635. #OBTENER INFORMACION QUE SE ENVIARA EN LA CONSULTA PARA OBTENER LOS CFDIS
  636. def _get_post(self, html):
  637. validos = ('EVENTTARGET', '__EVENTARGUMENT', '__LASTFOCUS', '__VIEWSTATE')
  638. values = html.split('|')
  639. post = {v: values[i + 1] for i, v in enumerate(values) if v in validos}
  640. return post
  641. #ASIGNAR UN HEADER PARA LA CONSULTA DE CFDIS
  642. def _set_search_headers(self):
  643. self._session.headers = {
  644. "cache-control": "no-cache",
  645. "x-requested-with": "XMLHttpRequest",
  646. "user-agent": "Mozilla/5.0 (X11; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0",
  647. "x-microsoftajax": "Delta=true",
  648. "content-type": "application/x-www-form-urlencoded; charset=UTF-8",
  649. "accept": "*/*",
  650. "sec-gpc": "1",
  651. "accept-language": "es-ES,es;q=0.5",
  652. "origin": "https://portalcfdi.facturaelectronica.sat.gob.mx",
  653. "sec-fetch-site": "same-origin",
  654. "sec-fetch-mode": "cors",
  655. "sec-fetch-dest": "empty",
  656. "referer": "https://portalcfdi.facturaelectronica.sat.gob.mx/ConsultaEmisor.aspx",
  657. "accept-encoding": "gzip, deflate, br, zstd",
  658. "priority": "u=1, i",
  659. }
  660. return True
  661. #OBTENCION DE LA PAGINA DE CONSULTA POR FECHAS
  662. def _change_to_date(self, url_search):
  663. client = self._session
  664. self._set_search_headers()
  665. response = client.get(url_search)
  666. data = self._read_form(response.text)
  667. post = self._merge(data, self._get_post_form_dates())
  668. headers = self._get_headers(self.PORTAL, url_search, True)
  669. response = client.post(url=url_search, headers=headers, data=post)
  670. post = self._get_post(response.text)
  671. return data, post
  672. #BUSQUEDA DE CFDIS RECIBIDOS CREADOS POR UN PROVEEDOR
  673. def _search_recibidas(self, filters):
  674. url_search = self.URL_RECEPTOR
  675. values, post_source = self._change_to_date(url_search)
  676. invoice_content = {}
  677. for f in filters:
  678. post = self._merge(values, f.get_post())
  679. post = self._merge(post, post_source)
  680. headers = self._get_headers(self.PORTAL, url_search, True)
  681. html = self._response(url_search, 'post', headers, post)
  682. not_found, limit, invoices = self._get_download_links(html)
  683. if not_found or not invoices:
  684. msg = '\n\tNo se encontraron documentos en el filtro:\n\t{}'.format(str(f))
  685. _logger.info(msg)
  686. else:
  687. data = self._download(invoices, limit, f)
  688. if data and type(data) == dict:
  689. invoice_content.update(data)
  690. return invoice_content
  691. #BUSQUEDA DE CFDIS EMITIDOS CREADOS POR LA EMPRESA
  692. def _search_emitidas(self, filters):
  693. url_search = self.URL_EMISOR
  694. values, post_source = self._change_to_date(url_search)
  695. invoice_content = {}
  696. for f in filters:
  697. _logger.info(str(f))
  698. post = self._merge(values, f.get_post())
  699. post = self._merge(post, post_source)
  700. headers = self._get_headers(self.PORTAL, url_search, True)
  701. html = self._response(url_search, 'post', headers, post)
  702. not_found, limit, invoices = self._get_download_links(html)
  703. if not_found or not invoices:
  704. msg = '\n\tNo se encontraron documentos en el filtro:\n\t{}'.format(str(f))
  705. _logger.info(msg)
  706. else:
  707. data = self._download(invoices, limit, f, self.DIR_EMITIDAS)
  708. if data and type(data) == dict:
  709. invoice_content.update(data)
  710. return invoice_content
  711. #PROCESO DE BUSQUEDA DE CFDIS PARA SU DESCARGA
  712. def search(self, opt, download_option='both'):
  713. self._only_status = opt['estatus']
  714. invoice_content_e, invoice_content_r = {}, {}
  715. if download_option == 'both':
  716. filters_e = self._get_filters(opt, True)
  717. invoice_content_e = self._search_emitidas(filters_e)
  718. filters_r = self._get_filters(opt, False)
  719. invoice_content_r = self._search_recibidas(filters_r)
  720. elif download_option == 'supplier':
  721. filters_r = self._get_filters(opt, False)
  722. invoice_content_r = self._search_recibidas(filters_r)
  723. elif download_option == 'customer':
  724. filters_e = self._get_filters(opt, True)
  725. invoice_content_e = self._search_emitidas(filters_e)
  726. return invoice_content_r, invoice_content_e
  727. #PROCESO DE DESCARGA
  728. def _download(self, invoices, limit=False, filters=None, folder=DIR_RECIBIDAS):
  729. if not invoices and not limit:
  730. msg = '\n\tTodos los documentos han sido previamente descargados para el filtro.\n\t{}'.format(str(filters))
  731. _logger.info(msg)
  732. return {}
  733. invoices_content = {}
  734. if invoices and not self.only_search:
  735. invoices_content = self._thread_download(invoices, folder, filters)
  736. if limit:
  737. sf = self._segment_filter(filters)
  738. if folder == self.DIR_RECIBIDAS:
  739. data = self._search_recibidas(sf)
  740. if data and type(data) == dict:
  741. invoices_content.update(data)
  742. else:
  743. data = self._search_emitidas(sf)
  744. if data and type(data) == dict:
  745. invoices_content.update(data)
  746. return invoices_content
  747. #OBTENCION DE LOS VALORE DE LA PETICION PARA OBTENER LAS URL DE LOS CFDI
  748. def _thread_download(self, invoices, folder, filters):
  749. for_download = invoices[:]
  750. current = 1
  751. total = len(for_download)
  752. invoice_content = {}
  753. for i in range(TRY_COUNT):
  754. for uuid, values in for_download:
  755. data = {
  756. 'url': values['url'],
  757. 'acuse': values['acuse'],
  758. }
  759. content = self._get_xml(uuid, data, current, total)
  760. pdf_content = self._get_pdf(uuid, data, current, total)
  761. if content:
  762. invoice_content.update({uuid: [values, content, pdf_content]})
  763. current += 1
  764. if len(invoice_content) == len(for_download):
  765. break
  766. if total:
  767. msg = '{} documentos por descargar en: {}'.format(total, str(filters))
  768. _logger.info(msg)
  769. return invoice_content
  770. #OBTENER EL DOCUMENTO XML POR MEDIO DE LA URL DE DESCARGAR
  771. def _get_xml(self, uuid, values, current, count):
  772. for i in range(TRY_COUNT):
  773. try:
  774. r = self._session.get(values['url'], timeout=TIMEOUT)
  775. if r.status_code == 200:
  776. return r.content
  777. except exceptions.Timeout:
  778. _logger.debug('Tiempo de espera sobrepasado')
  779. continue
  780. except Exception as e:
  781. _logger.error(str(e))
  782. return
  783. msg = 'Tiempo de espera agotado para el documento: {}'.format(uuid)
  784. _logger.error(msg)
  785. return
  786. # OBTENER EL DOCUMENTO XML POR MEDIO DE LA URL DE DESCARGAR
  787. def _get_pdf(self, uuid, values, current, count):
  788. for i in range(TRY_COUNT):
  789. try:
  790. r = self._session.get(values['acuse'], timeout=TIMEOUT)
  791. if r.status_code == 200:
  792. return r.content
  793. except exceptions.Timeout:
  794. _logger.debug('Tiempo de espera sobrepasado')
  795. continue
  796. except Exception as e:
  797. _logger.error(str(e))
  798. return
  799. msg = 'Tiempo de espera agotado para el documento: {}'.format(uuid)
  800. _logger.error(msg)
  801. return
  802. def _get_download_links(self, html):
  803. parser = Invoice()
  804. parser.feed(html)
  805. return parser.not_found, parser.limit, parser.invoices
  806. def logout(self):
  807. msg = 'Cerrando sessión en el SAT'
  808. _logger.debug(msg)
  809. response = self._response(self.URL_LOGOUT)
  810. self.is_connect = False
  811. self._session.close()
  812. msg = 'Sesión cerrada en el SAT'
  813. _logger.info(msg)
  814. return