google_drive_service.py 28 KB


  1. # -*- coding: utf-8 -*-
  2. # Part of Odoo. See LICENSE file for full copyright and licensing details.
  3. import logging
  4. import json
  5. from datetime import datetime
  6. from odoo import api, fields, models, _
  7. from odoo.exceptions import UserError
  8. _logger = logging.getLogger(__name__)
  9. TIMEOUT = 30
  10. GOOGLE_API_BASE_URL = 'https://www.googleapis.com'
  11. class GoogleDriveService(models.AbstractModel):
  12. """Servicio para Google Drive API siguiendo las mejores prácticas de Odoo"""
  13. _name = 'google.drive.service'
  14. _description = 'Google Drive Service'
  15. def _get_credentials(self, user=None):
  16. """Obtener credenciales de configuración para el usuario especificado"""
  17. if not user:
  18. user = self.env.user
  19. config = self.env['ir.config_parameter'].sudo()
  20. enabled = config.get_param('google_api.enabled', 'False')
  21. if not enabled or enabled == 'False':
  22. raise UserError(_('Google API Integration is not enabled'))
  23. client_id = config.get_param('google_api.client_id', '')
  24. client_secret = config.get_param('google_api.client_secret', '')
  25. if not client_id or not client_secret:
  26. raise UserError(_('Google API credentials not configured'))
  27. # Get access token from user settings
  28. user_settings = user.res_users_settings_id
  29. if not user_settings or not user_settings._google_authenticated():
  30. raise UserError(_('User %s is not authenticated with Google. Please connect your account first.') % user.name)
  31. access_token = user_settings._get_google_access_token()
  32. if not access_token:
  33. raise UserError(_('Could not obtain valid access token for user %s. Please reconnect your Google account.') % user.name)
  34. return {
  35. 'client_id': client_id,
  36. 'client_secret': client_secret,
  37. 'access_token': access_token
  38. }
  39. def _refresh_access_token(self):
  40. """Refrescar el token de acceso"""
  41. try:
  42. config = self.env['ir.config_parameter'].sudo()
  43. refresh_token = config.get_param('google_api.refresh_token')
  44. client_id = config.get_param('google_api.client_id')
  45. client_secret = config.get_param('google_api.client_secret')
  46. if not all([refresh_token, client_id, client_secret]):
  47. return False
  48. token_url = 'https://oauth2.googleapis.com/token'
  49. data = {
  50. 'client_id': client_id,
  51. 'client_secret': client_secret,
  52. 'refresh_token': refresh_token,
  53. 'grant_type': 'refresh_token'
  54. }
  55. import requests
  56. response = requests.post(token_url, data=data, timeout=TIMEOUT)
  57. if response.status_code == 200:
  58. token_data = response.json()
  59. new_access_token = token_data.get('access_token')
  60. if new_access_token:
  61. config.set_param('google_api.access_token', new_access_token)
  62. _logger.info("Google API access token refreshed successfully")
  63. return True
  64. return False
  65. except Exception as e:
  66. _logger.error(f"Failed to refresh access token: {str(e)}")
  67. return False
  68. def _do_request(self, uri, params=None, headers=None, method='GET', timeout=TIMEOUT, json_data=None):
  69. """Realizar petición HTTP a Google API con manejo de errores y reintentos"""
  70. try:
  71. credentials = self._get_credentials()
  72. # Headers por defecto
  73. default_headers = {
  74. 'Authorization': f'Bearer {credentials["access_token"]}',
  75. 'Content-Type': 'application/json'
  76. }
  77. if headers:
  78. default_headers.update(headers)
  79. url = f"{GOOGLE_API_BASE_URL}{uri}"
  80. import requests
  81. # Realizar petición
  82. if method.upper() == 'GET':
  83. response = requests.get(url, params=params, headers=default_headers, timeout=timeout)
  84. elif method.upper() == 'POST':
  85. response = requests.post(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
  86. elif method.upper() == 'PUT':
  87. response = requests.put(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
  88. elif method.upper() == 'PATCH':
  89. response = requests.patch(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
  90. elif method.upper() == 'DELETE':
  91. response = requests.delete(url, params=params, headers=default_headers, timeout=timeout)
  92. else:
  93. raise UserError(_('Unsupported HTTP method: %s') % method)
  94. # Manejar códigos de respuesta
  95. if response.status_code == 200:
  96. return response.json() if response.content else {}
  97. elif response.status_code == 201:
  98. return response.json() if response.content else {}
  99. elif response.status_code == 204:
  100. return {}
  101. elif response.status_code == 401:
  102. # Intentar refrescar token
  103. if self._refresh_access_token():
  104. # Reintentar la petición
  105. return self._do_request(uri, params, headers, method, timeout)
  106. else:
  107. raise UserError(_('Authentication failed. Please reconnect your Google account.'))
  108. elif response.status_code == 429:
  109. raise UserError(_('Rate limit exceeded. Please try again later.'))
  110. elif response.status_code >= 400:
  111. error_msg = self._parse_error_response(response)
  112. raise UserError(error_msg)
  113. return response.json() if response.content else {}
  114. except requests.exceptions.Timeout:
  115. raise UserError(_('Request timeout. Please try again.'))
  116. except requests.exceptions.ConnectionError:
  117. raise UserError(_('Connection error. Please check your internet connection.'))
  118. except UserError:
  119. raise
  120. except Exception as e:
  121. raise UserError(_('Unexpected error: %s') % str(e))
  122. def _parse_error_response(self, response):
  123. """Parsear respuesta de error de Google API"""
  124. try:
  125. error_data = response.json()
  126. error_info = error_data.get('error', {})
  127. message = error_info.get('message', 'Unknown error')
  128. code = error_info.get('code', response.status_code)
  129. return f"Google API Error {code}: {message}"
  130. except:
  131. return f"Google API Error {response.status_code}: {response.text}"
  132. def _build_shared_drive_params(self, include_shared_drives=True):
  133. """Construir parámetros para Shared Drives"""
  134. params = {}
  135. if include_shared_drives:
  136. params.update({
  137. 'supportsAllDrives': 'true',
  138. 'includeItemsFromAllDrives': 'true'
  139. })
  140. return params
  141. def validate_folder_id(self, folder_id):
  142. """Validar que un folder ID existe y es accesible"""
  143. try:
  144. if not folder_id:
  145. raise UserError(_('Folder ID is required'))
  146. if not isinstance(folder_id, str) or len(folder_id) < 10:
  147. raise UserError(_('Invalid folder ID format'))
  148. # Paso 1: Intentar como carpeta regular
  149. try:
  150. response = self._do_request(f'/drive/v3/files/{folder_id}', {
  151. 'fields': 'id,name,mimeType'
  152. })
  153. return {
  154. 'valid': True,
  155. 'type': 'regular_folder',
  156. 'name': response.get('name', 'Unknown'),
  157. 'id': response.get('id')
  158. }
  159. except UserError as e:
  160. if '404' in str(e):
  161. # Paso 2: Intentar como Shared Drive
  162. try:
  163. response = self._do_request(f'/drive/v3/drives/{folder_id}', {
  164. 'fields': 'id,name'
  165. })
  166. return {
  167. 'valid': True,
  168. 'type': 'shared_drive',
  169. 'name': response.get('name', 'Unknown'),
  170. 'id': response.get('id')
  171. }
  172. except UserError:
  173. # Paso 3: Intentar como carpeta dentro de Shared Drive
  174. try:
  175. params = self._build_shared_drive_params()
  176. params['fields'] = 'id,name,mimeType'
  177. response = self._do_request(f'/drive/v3/files/{folder_id}', params)
  178. return {
  179. 'valid': True,
  180. 'type': 'folder_in_shared_drive',
  181. 'name': response.get('name', 'Unknown'),
  182. 'id': response.get('id')
  183. }
  184. except UserError:
  185. raise UserError(_('Folder ID not found or not accessible'))
  186. else:
  187. raise e
  188. except Exception as e:
  189. return {
  190. 'valid': False,
  191. 'error': str(e)
  192. }
  193. def create_folder(self, name, parent_folder_id=None, description=None):
  194. """Crear una carpeta en Google Drive"""
  195. try:
  196. folder_metadata = {
  197. 'name': name,
  198. 'mimeType': 'application/vnd.google-apps.folder'
  199. }
  200. if parent_folder_id:
  201. folder_metadata['parents'] = [parent_folder_id]
  202. if description:
  203. folder_metadata['description'] = description
  204. # Usar parámetros de Shared Drive si hay parent folder
  205. params = {}
  206. if parent_folder_id:
  207. params = self._build_shared_drive_params()
  208. # Construir URL con parámetros
  209. url = '/drive/v3/files'
  210. if params:
  211. param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
  212. url = f"{url}?{param_string}"
  213. response = self._do_request(url, folder_metadata, method='POST')
  214. return {
  215. 'success': True,
  216. 'folder_id': response.get('id'),
  217. 'folder_name': response.get('name'),
  218. 'folder_url': f"https://drive.google.com/drive/folders/{response.get('id')}"
  219. }
  220. except Exception as e:
  221. return {
  222. 'success': False,
  223. 'error': str(e)
  224. }
  225. def list_folders(self, parent_folder_id=None, include_shared_drives=True):
  226. """Listar carpetas en Google Drive"""
  227. try:
  228. query = "mimeType='application/vnd.google-apps.folder'"
  229. if parent_folder_id:
  230. query += f" and '{parent_folder_id}' in parents"
  231. params = {
  232. 'q': query,
  233. 'fields': 'files(id,name,parents,createdTime,modifiedTime)',
  234. 'orderBy': 'name'
  235. }
  236. # Agregar parámetros de Shared Drive
  237. if include_shared_drives:
  238. params.update(self._build_shared_drive_params())
  239. response = self._do_request('/drive/v3/files', params)
  240. return {
  241. 'success': True,
  242. 'folders': response.get('files', [])
  243. }
  244. except Exception as e:
  245. return {
  246. 'success': False,
  247. 'error': str(e)
  248. }
  249. def get_folder_url(self, folder_id):
  250. """Obtener URL para abrir una carpeta"""
  251. try:
  252. if not folder_id:
  253. raise UserError(_('Folder ID is required'))
  254. # Validar que existe
  255. validation = self.validate_folder_id(folder_id)
  256. if not validation.get('valid'):
  257. raise UserError(validation.get('error', 'Invalid folder'))
  258. folder_type = validation.get('type', 'regular_folder')
  259. if folder_type == 'shared_drive':
  260. return f"https://drive.google.com/drive/u/0/folders/{folder_id}"
  261. else:
  262. return f"https://drive.google.com/drive/folders/{folder_id}"
  263. except Exception as e:
  264. return None
  265. def test_connection(self):
  266. """Probar conexión al servicio"""
  267. try:
  268. credentials = self._get_credentials()
  269. # Test básico de autenticación
  270. import requests
  271. response = requests.get(
  272. f"{GOOGLE_API_BASE_URL}/oauth2/v1/userinfo",
  273. headers={'Authorization': f'Bearer {credentials["access_token"]}'},
  274. timeout=10
  275. )
  276. if response.status_code == 200:
  277. user_info = response.json()
  278. return {
  279. 'success': True,
  280. 'user_email': user_info.get('email', 'Unknown'),
  281. 'user_name': user_info.get('name', 'Unknown')
  282. }
  283. else:
  284. return {
  285. 'success': False,
  286. 'error': f"Authentication failed: {response.status_code}"
  287. }
  288. except Exception as e:
  289. return {
  290. 'success': False,
  291. 'error': str(e)
  292. }
  293. # ============================================================================
  294. # MÉTODOS DE NAVEGACIÓN Y ESTRUCTURA
  295. # ============================================================================
  296. def navigate_folder_hierarchy(self, folder_id, max_levels=5):
  297. """Navigate up the folder hierarchy and return the complete path"""
  298. try:
  299. hierarchy = []
  300. current_id = folder_id
  301. for level in range(max_levels):
  302. try:
  303. # Use Shared Drive parameters to handle both regular and shared drive folders
  304. params = self._build_shared_drive_params()
  305. params['fields'] = 'id,name,parents'
  306. response = self._do_request(f'/drive/v3/files/{current_id}', params)
  307. folder_info = {
  308. 'id': response.get('id'),
  309. 'name': response.get('name', ''),
  310. 'level': level
  311. }
  312. hierarchy.append(folder_info)
  313. parent_ids = response.get('parents', [])
  314. if not parent_ids:
  315. break
  316. current_id = parent_ids[0]
  317. except UserError as e:
  318. # Handle 404 errors gracefully - folder might not exist or be accessible
  319. if '404' in str(e):
  320. _logger.warning(f"Folder {current_id} not found or not accessible at level {level}")
  321. break
  322. else:
  323. _logger.error(f"Error navigating folder hierarchy at level {level}: {str(e)}")
  324. break
  325. except Exception as e:
  326. _logger.error(f"Unexpected error navigating folder hierarchy at level {level}: {str(e)}")
  327. break
  328. return hierarchy
  329. except Exception as e:
  330. _logger.error(f"Error in navigate_folder_hierarchy: {str(e)}")
  331. return []
  332. def find_folders_by_name(self, parent_id, name_pattern):
  333. """Find folders by name pattern in a parent folder"""
  334. try:
  335. import re
  336. query = f"'{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false"
  337. params = {
  338. 'q': query,
  339. 'fields': 'files(id,name)',
  340. 'pageSize': 100
  341. }
  342. # Add Shared Drive parameters
  343. params.update(self._build_shared_drive_params())
  344. response = self._do_request('/drive/v3/files', params)
  345. folders = response.get('files', [])
  346. # Filter by name pattern
  347. pattern = re.compile(name_pattern)
  348. matching_folders = [folder for folder in folders if pattern.match(folder.get('name', ''))]
  349. return matching_folders
  350. except UserError as e:
  351. # Handle access errors gracefully
  352. if '404' in str(e) or '403' in str(e):
  353. _logger.warning(f"Cannot access parent folder {parent_id}: {str(e)}")
  354. return []
  355. else:
  356. _logger.error(f"Error finding folders by name: {str(e)}")
  357. return []
  358. except Exception as e:
  359. _logger.error(f"Unexpected error finding folders by name: {str(e)}")
  360. return []
  361. def get_folder_info(self, folder_id):
  362. """Get detailed information about a folder"""
  363. try:
  364. validation = self.validate_folder_id(folder_id)
  365. if not validation.get('valid'):
  366. return None
  367. # Use Shared Drive parameters to handle both regular and shared drive folders
  368. params = self._build_shared_drive_params()
  369. params['fields'] = 'id,name,parents,createdTime,modifiedTime,mimeType'
  370. response = self._do_request(f'/drive/v3/files/{folder_id}', params)
  371. return response
  372. except Exception as e:
  373. _logger.error(f"Error getting folder info: {str(e)}")
  374. return None
  375. # ============================================================================
  376. # MÉTODOS DE MANIPULACIÓN DE CARPETAS
  377. # ============================================================================
  378. def rename_folder(self, folder_id, new_name):
  379. """Rename a folder in Google Drive"""
  380. try:
  381. # For Google Drive API v3, we need to use PATCH with the correct parameters
  382. # The issue was that we were passing params instead of json body
  383. update_data = {
  384. 'name': new_name
  385. }
  386. # Use PATCH method with json body (not params)
  387. response = self._do_request(f'/drive/v3/files/{folder_id}',
  388. json_data=update_data,
  389. method='PATCH')
  390. return {
  391. 'success': True,
  392. 'folder_id': response.get('id'),
  393. 'new_name': response.get('name')
  394. }
  395. except Exception as e:
  396. return {
  397. 'success': False,
  398. 'error': str(e)
  399. }
  400. def move_folder(self, folder_id, new_parent_id):
  401. """Move a folder to a new parent"""
  402. try:
  403. # Get current folder info
  404. response = self._do_request(f'/drive/v3/files/{folder_id}', {
  405. 'fields': 'parents'
  406. })
  407. current_parents = response.get('parents', [])
  408. if not current_parents:
  409. return {
  410. 'success': False,
  411. 'error': 'Folder has no current parent'
  412. }
  413. # Use PATCH to update the parents
  414. update_data = {
  415. 'addParents': new_parent_id,
  416. 'removeParents': current_parents[0]
  417. }
  418. # Update the file with new parent
  419. update_response = self._do_request(
  420. f'/drive/v3/files/{folder_id}',
  421. update_data,
  422. method='PATCH',
  423. json_data=update_data
  424. )
  425. return {
  426. 'success': True,
  427. 'folder_id': folder_id,
  428. 'new_parent_id': new_parent_id
  429. }
  430. except Exception as e:
  431. return {
  432. 'success': False,
  433. 'error': str(e)
  434. }
  435. def delete_folder(self, folder_id):
  436. """Delete a folder from Google Drive"""
  437. try:
  438. self._do_request(f'/drive/v3/files/{folder_id}', method='DELETE')
  439. return {
  440. 'success': True,
  441. 'folder_id': folder_id
  442. }
  443. except Exception as e:
  444. return {
  445. 'success': False,
  446. 'error': str(e)
  447. }
  448. # ============================================================================
  449. # MÉTODOS DE VALIDACIÓN Y UTILIDADES
  450. # ============================================================================
  451. def extract_folder_id_from_url(self, url):
  452. """Extract folder ID from Google Drive URL - Generic utility method"""
  453. if not url:
  454. return None
  455. # Handle different Google Drive URL formats
  456. import re
  457. # Format: https://drive.google.com/drive/folders/FOLDER_ID
  458. folder_pattern = r'drive\.google\.com/drive/folders/([a-zA-Z0-9_-]+)'
  459. match = re.search(folder_pattern, url)
  460. if match:
  461. return match.group(1)
  462. # Format: https://drive.google.com/open?id=FOLDER_ID
  463. open_pattern = r'drive\.google\.com/open\?id=([a-zA-Z0-9_-]+)'
  464. match = re.search(open_pattern, url)
  465. if match:
  466. return match.group(1)
  467. # Check if it's already a folder ID
  468. if isinstance(url, str) and len(url) >= 10 and len(url) <= 50:
  469. if re.match(r'^[a-zA-Z0-9_-]+$', url):
  470. return url
  471. return None
  472. def sanitize_folder_name(self, name):
  473. """Sanitize folder name to be Google Drive compatible - Generic utility method"""
  474. if not name:
  475. return 'Sin nombre'
  476. import re
  477. # Use regex for better performance
  478. sanitized_name = re.sub(r'[<>:"|?*/\\]', '_', name)
  479. # Remove leading/trailing spaces and dots
  480. sanitized_name = sanitized_name.strip(' .')
  481. # Ensure it's not empty after sanitization
  482. if not sanitized_name:
  483. return 'Sin nombre'
  484. # Limit length to 255 characters (Google Drive limit)
  485. if len(sanitized_name) > 255:
  486. sanitized_name = sanitized_name[:252] + '...'
  487. return sanitized_name
  488. def create_or_get_folder(self, parent_folder_id, folder_name, description=None):
  489. """Create a folder or get existing one by name - Generic utility method"""
  490. try:
  491. # First, check if folder already exists
  492. existing_folders = self.find_folders_by_name(parent_folder_id, f'^{folder_name}$')
  493. if existing_folders:
  494. return existing_folders[0]['id']
  495. # Create new folder
  496. result = self.create_folder(folder_name, parent_folder_id, description)
  497. if result.get('success'):
  498. return result.get('folder_id')
  499. else:
  500. error_msg = result.get('error', 'Unknown error')
  501. raise UserError(_('Failed to create Google Drive folder "%s": %s') % (folder_name, error_msg))
  502. except Exception as e:
  503. _logger.error(f"Error creating or getting folder {folder_name}: {str(e)}")
  504. raise
  505. def create_folder_structure(self, root_folder_id, structure_components):
  506. """Create folder structure from components - Generic utility method"""
  507. try:
  508. created_folders = {}
  509. for level, (folder_name, parent_id) in enumerate(structure_components):
  510. if level == 0:
  511. # First level uses root_folder_id as parent
  512. parent_folder_id = root_folder_id
  513. else:
  514. # Other levels use the previous folder as parent
  515. parent_folder_id = created_folders[level - 1]
  516. folder_id = self.create_or_get_folder(parent_folder_id, folder_name)
  517. created_folders[level] = folder_id
  518. return created_folders
  519. except Exception as e:
  520. _logger.error(f"Error creating folder structure: {str(e)}")
  521. raise
  522. def find_existing_folder_structure(self, root_folder_id, structure_components):
  523. """Find existing folder structure - Generic utility method"""
  524. try:
  525. found_folders = {}
  526. for level, (folder_name, parent_id) in enumerate(structure_components):
  527. if level == 0:
  528. # First level uses root_folder_id as parent
  529. parent_folder_id = root_folder_id
  530. else:
  531. # Other levels use the previous folder as parent
  532. parent_folder_id = found_folders[level - 1]
  533. folders = self.find_folders_by_name(parent_folder_id, f'^{folder_name}$')
  534. if not folders:
  535. return None # Structure not found
  536. found_folders[level] = folders[0]['id']
  537. return found_folders
  538. except Exception as e:
  539. _logger.error(f"Error finding existing folder structure: {str(e)}")
  540. return None
  541. def validate_folder_id_with_google_drive(self, folder_id):
  542. """Validate if the folder ID exists and is accessible in Google Drive - Generic utility method"""
  543. try:
  544. validation = self.validate_folder_id(folder_id)
  545. if validation.get('valid'):
  546. folder_name = validation.get('name', 'Unknown')
  547. _logger.info(f"✅ Folder ID {folder_id} validated successfully in Google Drive")
  548. return True, folder_name
  549. else:
  550. error_message = validation.get('error', 'Unknown error')
  551. _logger.warning(f"❌ Folder ID {folder_id} validation failed: {error_message}")
  552. return False, error_message
  553. except Exception as e:
  554. _logger.error(f"❌ Error validating folder ID {folder_id}: {str(e)}")
  555. return False, str(e)
  556. def check_folder_belongs_to_parent(self, folder_id, expected_parent_id, max_levels=5):
  557. """Check if a folder belongs to a specific parent in the hierarchy"""
  558. try:
  559. if not expected_parent_id:
  560. return False
  561. # Get current folder info
  562. response = self._do_request(f'/drive/v3/files/{folder_id}', {
  563. 'fields': 'parents'
  564. })
  565. parent_ids = response.get('parents', [])
  566. if not parent_ids:
  567. return False
  568. # Navigate up to find if the folder is under the expected parent
  569. current_parent_id = parent_ids[0]
  570. for _ in range(max_levels):
  571. # Check if current parent is the expected parent
  572. if current_parent_id == expected_parent_id:
  573. # Folder is under the expected parent
  574. return True
  575. parent_response = self._do_request(f'/drive/v3/files/{current_parent_id}', {
  576. 'fields': 'parents'
  577. })
  578. parent_parent_ids = parent_response.get('parents', [])
  579. if not parent_parent_ids:
  580. # Reached the top level, folder is not under the expected parent
  581. return False
  582. current_parent_id = parent_parent_ids[0]
  583. # If we reach here, folder is not under the expected parent
  584. return False
  585. except Exception as e:
  586. _logger.error(f"Error checking folder parent relationship: {str(e)}")
  587. return False