google_drive_service.py 27 KB


  1. # -*- coding: utf-8 -*-
  2. # Part of Odoo. See LICENSE file for full copyright and licensing details.
  3. import logging
  4. import json
  5. from datetime import datetime
  6. from odoo import api, fields, models, _
  7. from odoo.exceptions import UserError
  8. _logger = logging.getLogger(__name__)
  9. TIMEOUT = 30
  10. GOOGLE_API_BASE_URL = 'https://www.googleapis.com'
  11. class GoogleDriveService(models.AbstractModel):
  12. """Servicio para Google Drive API siguiendo las mejores prácticas de Odoo"""
  13. _name = 'google.drive.service'
  14. _description = 'Google Drive Service'
  15. def _get_credentials(self, user=None):
  16. """Obtener credenciales de configuración para el usuario especificado"""
  17. if not user:
  18. user = self.env.user
  19. config = self.env['ir.config_parameter'].sudo()
  20. enabled = config.get_param('google_api.enabled', 'False')
  21. if not enabled or enabled == 'False':
  22. raise UserError(_('Google API Integration is not enabled'))
  23. client_id = config.get_param('google_api.client_id', '')
  24. client_secret = config.get_param('google_api.client_secret', '')
  25. if not client_id or not client_secret:
  26. raise UserError(_('Google API credentials not configured'))
  27. # Get access token from user settings
  28. user_settings = user.res_users_settings_id
  29. if not user_settings or not user_settings._google_authenticated():
  30. raise UserError(_('User %s is not authenticated with Google. Please connect your account first.') % user.name)
  31. access_token = user_settings._get_google_access_token()
  32. if not access_token:
  33. raise UserError(_('Could not obtain valid access token for user %s. Please reconnect your Google account.') % user.name)
  34. return {
  35. 'client_id': client_id,
  36. 'client_secret': client_secret,
  37. 'access_token': access_token
  38. }
  39. def _refresh_access_token(self):
  40. """Refrescar el token de acceso"""
  41. try:
  42. config = self.env['ir.config_parameter'].sudo()
  43. refresh_token = config.get_param('google_api.refresh_token')
  44. client_id = config.get_param('google_api.client_id')
  45. client_secret = config.get_param('google_api.client_secret')
  46. if not all([refresh_token, client_id, client_secret]):
  47. return False
  48. token_url = 'https://oauth2.googleapis.com/token'
  49. data = {
  50. 'client_id': client_id,
  51. 'client_secret': client_secret,
  52. 'refresh_token': refresh_token,
  53. 'grant_type': 'refresh_token'
  54. }
  55. import requests
  56. response = requests.post(token_url, data=data, timeout=TIMEOUT)
  57. if response.status_code == 200:
  58. token_data = response.json()
  59. new_access_token = token_data.get('access_token')
  60. if new_access_token:
  61. config.set_param('google_api.access_token', new_access_token)
  62. _logger.info("Google API access token refreshed successfully")
  63. return True
  64. return False
  65. except Exception as e:
  66. _logger.error(f"Failed to refresh access token: {str(e)}")
  67. return False
  68. def _do_request(self, uri, params=None, headers=None, method='GET', timeout=TIMEOUT, json_data=None):
  69. """Realizar petición HTTP a Google API con manejo de errores y reintentos"""
  70. try:
  71. credentials = self._get_credentials()
  72. # Headers por defecto
  73. default_headers = {
  74. 'Authorization': f'Bearer {credentials["access_token"]}',
  75. 'Content-Type': 'application/json'
  76. }
  77. if headers:
  78. default_headers.update(headers)
  79. url = f"{GOOGLE_API_BASE_URL}{uri}"
  80. import requests
  81. # Realizar petición
  82. if method.upper() == 'GET':
  83. response = requests.get(url, params=params, headers=default_headers, timeout=timeout)
  84. elif method.upper() == 'POST':
  85. response = requests.post(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
  86. elif method.upper() == 'PUT':
  87. response = requests.put(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
  88. elif method.upper() == 'PATCH':
  89. response = requests.patch(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
  90. elif method.upper() == 'DELETE':
  91. response = requests.delete(url, params=params, headers=default_headers, timeout=timeout)
  92. else:
  93. raise UserError(_('Unsupported HTTP method: %s') % method)
  94. # Manejar códigos de respuesta
  95. if response.status_code == 200:
  96. return response.json() if response.content else {}
  97. elif response.status_code == 201:
  98. return response.json() if response.content else {}
  99. elif response.status_code == 204:
  100. return {}
  101. elif response.status_code == 401:
  102. # Intentar refrescar token
  103. if self._refresh_access_token():
  104. # Reintentar la petición
  105. return self._do_request(uri, params, headers, method, timeout)
  106. else:
  107. raise UserError(_('Authentication failed. Please reconnect your Google account.'))
  108. elif response.status_code == 429:
  109. raise UserError(_('Rate limit exceeded. Please try again later.'))
  110. elif response.status_code >= 400:
  111. error_msg = self._parse_error_response(response)
  112. raise UserError(error_msg)
  113. return response.json() if response.content else {}
  114. except requests.exceptions.Timeout:
  115. raise UserError(_('Request timeout. Please try again.'))
  116. except requests.exceptions.ConnectionError:
  117. raise UserError(_('Connection error. Please check your internet connection.'))
  118. except UserError:
  119. raise
  120. except Exception as e:
  121. raise UserError(_('Unexpected error: %s') % str(e))
  122. def _parse_error_response(self, response):
  123. """Parsear respuesta de error de Google API"""
  124. try:
  125. error_data = response.json()
  126. error_info = error_data.get('error', {})
  127. message = error_info.get('message', 'Unknown error')
  128. code = error_info.get('code', response.status_code)
  129. return f"Google API Error {code}: {message}"
  130. except:
  131. return f"Google API Error {response.status_code}: {response.text}"
  132. def _build_shared_drive_params(self, include_shared_drives=True):
  133. """Construir parámetros para Shared Drives"""
  134. params = {}
  135. if include_shared_drives:
  136. params.update({
  137. 'supportsAllDrives': 'true',
  138. 'includeItemsFromAllDrives': 'true'
  139. })
  140. return params
  141. def validate_folder_id(self, folder_id):
  142. """Validar que un folder ID existe y es accesible"""
  143. try:
  144. if not folder_id:
  145. raise UserError(_('Folder ID is required'))
  146. if not isinstance(folder_id, str) or len(folder_id) < 10:
  147. raise UserError(_('Invalid folder ID format'))
  148. # Paso 1: Intentar como carpeta regular
  149. try:
  150. response = self._do_request(f'/drive/v3/files/{folder_id}', {
  151. 'fields': 'id,name,mimeType'
  152. })
  153. return {
  154. 'valid': True,
  155. 'type': 'regular_folder',
  156. 'name': response.get('name', 'Unknown'),
  157. 'id': response.get('id')
  158. }
  159. except UserError as e:
  160. if '404' in str(e):
  161. # Paso 2: Intentar como Shared Drive
  162. try:
  163. response = self._do_request(f'/drive/v3/drives/{folder_id}', {
  164. 'fields': 'id,name'
  165. })
  166. return {
  167. 'valid': True,
  168. 'type': 'shared_drive',
  169. 'name': response.get('name', 'Unknown'),
  170. 'id': response.get('id')
  171. }
  172. except UserError:
  173. # Paso 3: Intentar como carpeta dentro de Shared Drive
  174. try:
  175. params = self._build_shared_drive_params()
  176. params['fields'] = 'id,name,mimeType'
  177. response = self._do_request(f'/drive/v3/files/{folder_id}', params)
  178. return {
  179. 'valid': True,
  180. 'type': 'folder_in_shared_drive',
  181. 'name': response.get('name', 'Unknown'),
  182. 'id': response.get('id')
  183. }
  184. except UserError:
  185. raise UserError(_('Folder ID not found or not accessible'))
  186. else:
  187. raise e
  188. except Exception as e:
  189. return {
  190. 'valid': False,
  191. 'error': str(e)
  192. }
  193. def create_folder(self, name, parent_folder_id=None, description=None):
  194. """Crear una carpeta en Google Drive"""
  195. try:
  196. folder_metadata = {
  197. 'name': name,
  198. 'mimeType': 'application/vnd.google-apps.folder'
  199. }
  200. if parent_folder_id:
  201. folder_metadata['parents'] = [parent_folder_id]
  202. if description:
  203. folder_metadata['description'] = description
  204. # Usar parámetros de Shared Drive si hay parent folder
  205. params = {}
  206. if parent_folder_id:
  207. params = self._build_shared_drive_params()
  208. # Construir URL con parámetros
  209. url = '/drive/v3/files'
  210. if params:
  211. param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
  212. url = f"{url}?{param_string}"
  213. response = self._do_request(url, folder_metadata, method='POST')
  214. return {
  215. 'success': True,
  216. 'folder_id': response.get('id'),
  217. 'folder_name': response.get('name'),
  218. 'folder_url': f"https://drive.google.com/drive/folders/{response.get('id')}"
  219. }
  220. except Exception as e:
  221. return {
  222. 'success': False,
  223. 'error': str(e)
  224. }
  225. def list_folders(self, parent_folder_id=None, include_shared_drives=True):
  226. """Listar carpetas en Google Drive"""
  227. try:
  228. query = "mimeType='application/vnd.google-apps.folder'"
  229. if parent_folder_id:
  230. query += f" and '{parent_folder_id}' in parents"
  231. params = {
  232. 'q': query,
  233. 'fields': 'files(id,name,parents,createdTime,modifiedTime)',
  234. 'orderBy': 'name'
  235. }
  236. # Agregar parámetros de Shared Drive
  237. if include_shared_drives:
  238. params.update(self._build_shared_drive_params())
  239. response = self._do_request('/drive/v3/files', params)
  240. return {
  241. 'success': True,
  242. 'folders': response.get('files', [])
  243. }
  244. except Exception as e:
  245. return {
  246. 'success': False,
  247. 'error': str(e)
  248. }
  249. def get_folder_url(self, folder_id):
  250. """Obtener URL para abrir una carpeta"""
  251. try:
  252. if not folder_id:
  253. raise UserError(_('Folder ID is required'))
  254. # Validar que existe
  255. validation = self.validate_folder_id(folder_id)
  256. if not validation.get('valid'):
  257. raise UserError(validation.get('error', 'Invalid folder'))
  258. folder_type = validation.get('type', 'regular_folder')
  259. if folder_type == 'shared_drive':
  260. return f"https://drive.google.com/drive/u/0/folders/{folder_id}"
  261. else:
  262. return f"https://drive.google.com/drive/folders/{folder_id}"
  263. except Exception as e:
  264. return None
  265. def test_connection(self):
  266. """Probar conexión al servicio"""
  267. try:
  268. credentials = self._get_credentials()
  269. # Test básico de autenticación
  270. import requests
  271. response = requests.get(
  272. f"{GOOGLE_API_BASE_URL}/oauth2/v1/userinfo",
  273. headers={'Authorization': f'Bearer {credentials["access_token"]}'},
  274. timeout=10
  275. )
  276. if response.status_code == 200:
  277. user_info = response.json()
  278. return {
  279. 'success': True,
  280. 'user_email': user_info.get('email', 'Unknown'),
  281. 'user_name': user_info.get('name', 'Unknown')
  282. }
  283. else:
  284. return {
  285. 'success': False,
  286. 'error': f"Authentication failed: {response.status_code}"
  287. }
  288. except Exception as e:
  289. return {
  290. 'success': False,
  291. 'error': str(e)
  292. }
  293. # ============================================================================
  294. # MÉTODOS DE NAVEGACIÓN Y ESTRUCTURA
  295. # ============================================================================
  296. def navigate_folder_hierarchy(self, folder_id, max_levels=5):
  297. """Navigate up the folder hierarchy and return the complete path"""
  298. try:
  299. hierarchy = []
  300. current_id = folder_id
  301. for level in range(max_levels):
  302. try:
  303. response = self._do_request(f'/drive/v3/files/{current_id}', {
  304. 'fields': 'id,name,parents'
  305. })
  306. folder_info = {
  307. 'id': response.get('id'),
  308. 'name': response.get('name', ''),
  309. 'level': level
  310. }
  311. hierarchy.append(folder_info)
  312. parent_ids = response.get('parents', [])
  313. if not parent_ids:
  314. break
  315. current_id = parent_ids[0]
  316. except Exception as e:
  317. _logger.error(f"Error navigating folder hierarchy at level {level}: {str(e)}")
  318. break
  319. return hierarchy
  320. except Exception as e:
  321. _logger.error(f"Error in navigate_folder_hierarchy: {str(e)}")
  322. return []
  323. def find_folders_by_name(self, parent_id, name_pattern):
  324. """Find folders by name pattern in a parent folder"""
  325. try:
  326. import re
  327. query = f"'{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false"
  328. params = {
  329. 'q': query,
  330. 'fields': 'files(id,name)',
  331. 'pageSize': 100
  332. }
  333. # Add Shared Drive parameters
  334. params.update(self._build_shared_drive_params())
  335. response = self._do_request('/drive/v3/files', params)
  336. folders = response.get('files', [])
  337. # Filter by name pattern
  338. pattern = re.compile(name_pattern)
  339. matching_folders = [folder for folder in folders if pattern.match(folder.get('name', ''))]
  340. return matching_folders
  341. except Exception as e:
  342. _logger.error(f"Error finding folders by name: {str(e)}")
  343. return []
  344. def get_folder_info(self, folder_id):
  345. """Get detailed information about a folder"""
  346. try:
  347. validation = self.validate_folder_id(folder_id)
  348. if not validation.get('valid'):
  349. return None
  350. response = self._do_request(f'/drive/v3/files/{folder_id}', {
  351. 'fields': 'id,name,parents,createdTime,modifiedTime,mimeType'
  352. })
  353. return response
  354. except Exception as e:
  355. _logger.error(f"Error getting folder info: {str(e)}")
  356. return None
  357. # ============================================================================
  358. # MÉTODOS DE MANIPULACIÓN DE CARPETAS
  359. # ============================================================================
  360. def rename_folder(self, folder_id, new_name):
  361. """Rename a folder in Google Drive"""
  362. try:
  363. # For Google Drive API v3, we need to use PATCH with the correct parameters
  364. # The issue was that we were passing params instead of json body
  365. update_data = {
  366. 'name': new_name
  367. }
  368. # Use PATCH method with json body (not params)
  369. response = self._do_request(f'/drive/v3/files/{folder_id}',
  370. json_data=update_data,
  371. method='PATCH')
  372. return {
  373. 'success': True,
  374. 'folder_id': response.get('id'),
  375. 'new_name': response.get('name')
  376. }
  377. except Exception as e:
  378. return {
  379. 'success': False,
  380. 'error': str(e)
  381. }
  382. def move_folder(self, folder_id, new_parent_id):
  383. """Move a folder to a new parent"""
  384. try:
  385. # Get current folder info
  386. response = self._do_request(f'/drive/v3/files/{folder_id}', {
  387. 'fields': 'parents'
  388. })
  389. current_parents = response.get('parents', [])
  390. if not current_parents:
  391. return {
  392. 'success': False,
  393. 'error': 'Folder has no current parent'
  394. }
  395. # Use PATCH to update the parents
  396. update_data = {
  397. 'addParents': new_parent_id,
  398. 'removeParents': current_parents[0]
  399. }
  400. # Update the file with new parent
  401. update_response = self._do_request(
  402. f'/drive/v3/files/{folder_id}',
  403. update_data,
  404. method='PATCH',
  405. json_data=update_data
  406. )
  407. return {
  408. 'success': True,
  409. 'folder_id': folder_id,
  410. 'new_parent_id': new_parent_id
  411. }
  412. except Exception as e:
  413. return {
  414. 'success': False,
  415. 'error': str(e)
  416. }
  417. def delete_folder(self, folder_id):
  418. """Delete a folder from Google Drive"""
  419. try:
  420. self._do_request(f'/drive/v3/files/{folder_id}', method='DELETE')
  421. return {
  422. 'success': True,
  423. 'folder_id': folder_id
  424. }
  425. except Exception as e:
  426. return {
  427. 'success': False,
  428. 'error': str(e)
  429. }
  430. # ============================================================================
  431. # MÉTODOS DE VALIDACIÓN Y UTILIDADES
  432. # ============================================================================
  433. def extract_folder_id_from_url(self, url):
  434. """Extract folder ID from Google Drive URL - Generic utility method"""
  435. if not url:
  436. return None
  437. # Handle different Google Drive URL formats
  438. import re
  439. # Format: https://drive.google.com/drive/folders/FOLDER_ID
  440. folder_pattern = r'drive\.google\.com/drive/folders/([a-zA-Z0-9_-]+)'
  441. match = re.search(folder_pattern, url)
  442. if match:
  443. return match.group(1)
  444. # Format: https://drive.google.com/open?id=FOLDER_ID
  445. open_pattern = r'drive\.google\.com/open\?id=([a-zA-Z0-9_-]+)'
  446. match = re.search(open_pattern, url)
  447. if match:
  448. return match.group(1)
  449. # Check if it's already a folder ID
  450. if isinstance(url, str) and len(url) >= 10 and len(url) <= 50:
  451. if re.match(r'^[a-zA-Z0-9_-]+$', url):
  452. return url
  453. return None
  454. def sanitize_folder_name(self, name):
  455. """Sanitize folder name to be Google Drive compatible - Generic utility method"""
  456. if not name:
  457. return 'Sin nombre'
  458. import re
  459. # Use regex for better performance
  460. sanitized_name = re.sub(r'[<>:"|?*/\\]', '_', name)
  461. # Remove leading/trailing spaces and dots
  462. sanitized_name = sanitized_name.strip(' .')
  463. # Ensure it's not empty after sanitization
  464. if not sanitized_name:
  465. return 'Sin nombre'
  466. # Limit length to 255 characters (Google Drive limit)
  467. if len(sanitized_name) > 255:
  468. sanitized_name = sanitized_name[:252] + '...'
  469. return sanitized_name
  470. def create_or_get_folder(self, parent_folder_id, folder_name, description=None):
  471. """Create a folder or get existing one by name - Generic utility method"""
  472. try:
  473. # First, check if folder already exists
  474. existing_folders = self.find_folders_by_name(parent_folder_id, f'^{folder_name}$')
  475. if existing_folders:
  476. return existing_folders[0]['id']
  477. # Create new folder
  478. result = self.create_folder(folder_name, parent_folder_id, description)
  479. if result.get('success'):
  480. return result.get('folder_id')
  481. else:
  482. error_msg = result.get('error', 'Unknown error')
  483. raise UserError(_('Failed to create Google Drive folder "%s": %s') % (folder_name, error_msg))
  484. except Exception as e:
  485. _logger.error(f"Error creating or getting folder {folder_name}: {str(e)}")
  486. raise
  487. def create_folder_structure(self, root_folder_id, structure_components):
  488. """Create folder structure from components - Generic utility method"""
  489. try:
  490. created_folders = {}
  491. for level, (folder_name, parent_id) in enumerate(structure_components):
  492. if level == 0:
  493. # First level uses root_folder_id as parent
  494. parent_folder_id = root_folder_id
  495. else:
  496. # Other levels use the previous folder as parent
  497. parent_folder_id = created_folders[level - 1]
  498. folder_id = self.create_or_get_folder(parent_folder_id, folder_name)
  499. created_folders[level] = folder_id
  500. return created_folders
  501. except Exception as e:
  502. _logger.error(f"Error creating folder structure: {str(e)}")
  503. raise
  504. def find_existing_folder_structure(self, root_folder_id, structure_components):
  505. """Find existing folder structure - Generic utility method"""
  506. try:
  507. found_folders = {}
  508. for level, (folder_name, parent_id) in enumerate(structure_components):
  509. if level == 0:
  510. # First level uses root_folder_id as parent
  511. parent_folder_id = root_folder_id
  512. else:
  513. # Other levels use the previous folder as parent
  514. parent_folder_id = found_folders[level - 1]
  515. folders = self.find_folders_by_name(parent_folder_id, f'^{folder_name}$')
  516. if not folders:
  517. return None # Structure not found
  518. found_folders[level] = folders[0]['id']
  519. return found_folders
  520. except Exception as e:
  521. _logger.error(f"Error finding existing folder structure: {str(e)}")
  522. return None
  523. def validate_folder_id_with_google_drive(self, folder_id):
  524. """Validate if the folder ID exists and is accessible in Google Drive - Generic utility method"""
  525. try:
  526. validation = self.validate_folder_id(folder_id)
  527. if validation.get('valid'):
  528. folder_name = validation.get('name', 'Unknown')
  529. _logger.info(f"✅ Folder ID {folder_id} validated successfully in Google Drive")
  530. return True, folder_name
  531. else:
  532. error_message = validation.get('error', 'Unknown error')
  533. _logger.warning(f"❌ Folder ID {folder_id} validation failed: {error_message}")
  534. return False, error_message
  535. except Exception as e:
  536. _logger.error(f"❌ Error validating folder ID {folder_id}: {str(e)}")
  537. return False, str(e)
  538. def check_folder_belongs_to_parent(self, folder_id, expected_parent_id, max_levels=5):
  539. """Check if a folder belongs to a specific parent in the hierarchy"""
  540. try:
  541. if not expected_parent_id:
  542. return False
  543. # Get current folder info
  544. response = self._do_request(f'/drive/v3/files/{folder_id}', {
  545. 'fields': 'parents'
  546. })
  547. parent_ids = response.get('parents', [])
  548. if not parent_ids:
  549. return False
  550. # Navigate up to find if the folder is under the expected parent
  551. current_parent_id = parent_ids[0]
  552. for _ in range(max_levels):
  553. # Check if current parent is the expected parent
  554. if current_parent_id == expected_parent_id:
  555. # Folder is under the expected parent
  556. return True
  557. parent_response = self._do_request(f'/drive/v3/files/{current_parent_id}', {
  558. 'fields': 'parents'
  559. })
  560. parent_parent_ids = parent_response.get('parents', [])
  561. if not parent_parent_ids:
  562. # Reached the top level, folder is not under the expected parent
  563. return False
  564. current_parent_id = parent_parent_ids[0]
  565. # If we reach here, folder is not under the expected parent
  566. return False
  567. except Exception as e:
  568. _logger.error(f"Error checking folder parent relationship: {str(e)}")
  569. return False