# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import logging import json from datetime import datetime from odoo import api, fields, models, _ from odoo.exceptions import UserError _logger = logging.getLogger(__name__) TIMEOUT = 30 GOOGLE_API_BASE_URL = 'https://www.googleapis.com' class GoogleDriveService(models.AbstractModel): """Servicio para Google Drive API siguiendo las mejores prácticas de Odoo""" _name = 'google.drive.service' _description = 'Google Drive Service' def _get_credentials(self, user=None): """Obtener credenciales de configuración para el usuario especificado""" if not user: user = self.env.user config = self.env['ir.config_parameter'].sudo() enabled = config.get_param('google_api.enabled', 'False') if not enabled or enabled == 'False': raise UserError(_('Google API Integration is not enabled')) client_id = config.get_param('google_api.client_id', '') client_secret = config.get_param('google_api.client_secret', '') if not client_id or not client_secret: raise UserError(_('Google API credentials not configured')) # Get access token from user settings user_settings = user.res_users_settings_id if not user_settings or not user_settings._google_authenticated(): raise UserError(_('User %s is not authenticated with Google. Please connect your account first.') % user.name) access_token = user_settings._get_google_access_token() if not access_token: raise UserError(_('Could not obtain valid access token for user %s. Please reconnect your Google account.') % user.name) return { 'client_id': client_id, 'client_secret': client_secret, 'access_token': access_token } def _refresh_access_token(self): """Refrescar el token de acceso""" try: config = self.env['ir.config_parameter'].sudo() refresh_token = config.get_param('google_api.refresh_token') client_id = config.get_param('google_api.client_id') client_secret = config.get_param('google_api.client_secret') if not all([refresh_token, client_id, client_secret]): return False token_url = 'https://oauth2.googleapis.com/token' data = { 'client_id': client_id, 'client_secret': client_secret, 'refresh_token': refresh_token, 'grant_type': 'refresh_token' } import requests response = requests.post(token_url, data=data, timeout=TIMEOUT) if response.status_code == 200: token_data = response.json() new_access_token = token_data.get('access_token') if new_access_token: config.set_param('google_api.access_token', new_access_token) _logger.info("Google API access token refreshed successfully") return True return False except Exception as e: _logger.error(f"Failed to refresh access token: {str(e)}") return False def _do_request(self, uri, params=None, headers=None, method='GET', timeout=TIMEOUT, json_data=None): """Realizar petición HTTP a Google API con manejo de errores y reintentos""" try: credentials = self._get_credentials() # Headers por defecto default_headers = { 'Authorization': f'Bearer {credentials["access_token"]}', 'Content-Type': 'application/json' } if headers: default_headers.update(headers) url = f"{GOOGLE_API_BASE_URL}{uri}" import requests # Realizar petición if method.upper() == 'GET': response = requests.get(url, params=params, headers=default_headers, timeout=timeout) elif method.upper() == 'POST': response = requests.post(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout) elif method.upper() == 'PUT': response = requests.put(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout) elif method.upper() == 'PATCH': response = requests.patch(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout) elif method.upper() == 'DELETE': response = requests.delete(url, params=params, headers=default_headers, timeout=timeout) else: raise UserError(_('Unsupported HTTP method: %s') % method) # Manejar códigos de respuesta if response.status_code == 200: return response.json() if response.content else {} elif response.status_code == 201: return response.json() if response.content else {} elif response.status_code == 204: return {} elif response.status_code == 401: # Intentar refrescar token if self._refresh_access_token(): # Reintentar la petición return self._do_request(uri, params, headers, method, timeout) else: raise UserError(_('Authentication failed. Please reconnect your Google account.')) elif response.status_code == 429: raise UserError(_('Rate limit exceeded. Please try again later.')) elif response.status_code >= 400: error_msg = self._parse_error_response(response) raise UserError(error_msg) return response.json() if response.content else {} except requests.exceptions.Timeout: raise UserError(_('Request timeout. Please try again.')) except requests.exceptions.ConnectionError: raise UserError(_('Connection error. Please check your internet connection.')) except UserError: raise except Exception as e: raise UserError(_('Unexpected error: %s') % str(e)) def _parse_error_response(self, response): """Parsear respuesta de error de Google API""" try: error_data = response.json() error_info = error_data.get('error', {}) message = error_info.get('message', 'Unknown error') code = error_info.get('code', response.status_code) return f"Google API Error {code}: {message}" except: return f"Google API Error {response.status_code}: {response.text}" def _build_shared_drive_params(self, include_shared_drives=True): """Construir parámetros para Shared Drives""" params = {} if include_shared_drives: params.update({ 'supportsAllDrives': 'true', 'includeItemsFromAllDrives': 'true' }) return params def validate_folder_id(self, folder_id): """Validar que un folder ID existe y es accesible""" try: if not folder_id: raise UserError(_('Folder ID is required')) if not isinstance(folder_id, str) or len(folder_id) < 10: raise UserError(_('Invalid folder ID format')) # Paso 1: Intentar como carpeta regular try: response = self._do_request(f'/drive/v3/files/{folder_id}', { 'fields': 'id,name,mimeType' }) return { 'valid': True, 'type': 'regular_folder', 'name': response.get('name', 'Unknown'), 'id': response.get('id') } except UserError as e: if '404' in str(e): # Paso 2: Intentar como Shared Drive try: response = self._do_request(f'/drive/v3/drives/{folder_id}', { 'fields': 'id,name' }) return { 'valid': True, 'type': 'shared_drive', 'name': response.get('name', 'Unknown'), 'id': response.get('id') } except UserError: # Paso 3: Intentar como carpeta dentro de Shared Drive try: params = self._build_shared_drive_params() params['fields'] = 'id,name,mimeType' response = self._do_request(f'/drive/v3/files/{folder_id}', params) return { 'valid': True, 'type': 'folder_in_shared_drive', 'name': response.get('name', 'Unknown'), 'id': response.get('id') } except UserError: raise UserError(_('Folder ID not found or not accessible')) else: raise e except Exception as e: return { 'valid': False, 'error': str(e) } def create_folder(self, name, parent_folder_id=None, description=None): """Crear una carpeta en Google Drive""" try: folder_metadata = { 'name': name, 'mimeType': 'application/vnd.google-apps.folder' } if parent_folder_id: folder_metadata['parents'] = [parent_folder_id] if description: folder_metadata['description'] = description # Usar parámetros de Shared Drive si hay parent folder params = {} if parent_folder_id: params = self._build_shared_drive_params() # Construir URL con parámetros url = '/drive/v3/files' if params: param_string = '&'.join([f"{k}={v}" for k, v in params.items()]) url = f"{url}?{param_string}" response = self._do_request(url, folder_metadata, method='POST') return { 'success': True, 'folder_id': response.get('id'), 'folder_name': response.get('name'), 'folder_url': f"https://drive.google.com/drive/folders/{response.get('id')}" } except Exception as e: return { 'success': False, 'error': str(e) } def list_folders(self, parent_folder_id=None, include_shared_drives=True): """Listar carpetas en Google Drive""" try: query = "mimeType='application/vnd.google-apps.folder'" if parent_folder_id: query += f" and '{parent_folder_id}' in parents" params = { 'q': query, 'fields': 'files(id,name,parents,createdTime,modifiedTime)', 'orderBy': 'name' } # Agregar parámetros de Shared Drive if include_shared_drives: params.update(self._build_shared_drive_params()) response = self._do_request('/drive/v3/files', params) return { 'success': True, 'folders': response.get('files', []) } except Exception as e: return { 'success': False, 'error': str(e) } def get_folder_url(self, folder_id): """Obtener URL para abrir una carpeta""" try: if not folder_id: raise UserError(_('Folder ID is required')) # Validar que existe validation = self.validate_folder_id(folder_id) if not validation.get('valid'): raise UserError(validation.get('error', 'Invalid folder')) folder_type = validation.get('type', 'regular_folder') if folder_type == 'shared_drive': return f"https://drive.google.com/drive/u/0/folders/{folder_id}" else: return f"https://drive.google.com/drive/folders/{folder_id}" except Exception as e: return None def test_connection(self): """Probar conexión al servicio""" try: credentials = self._get_credentials() # Test básico de autenticación import requests response = requests.get( f"{GOOGLE_API_BASE_URL}/oauth2/v1/userinfo", headers={'Authorization': f'Bearer {credentials["access_token"]}'}, timeout=10 ) if response.status_code == 200: user_info = response.json() return { 'success': True, 'user_email': user_info.get('email', 'Unknown'), 'user_name': user_info.get('name', 'Unknown') } else: return { 'success': False, 'error': f"Authentication failed: {response.status_code}" } except Exception as e: return { 'success': False, 'error': str(e) } # ============================================================================ # MÉTODOS DE NAVEGACIÓN Y ESTRUCTURA # ============================================================================ def navigate_folder_hierarchy(self, folder_id, max_levels=5): """Navigate up the folder hierarchy and return the complete path""" try: hierarchy = [] current_id = folder_id for level in range(max_levels): try: response = self._do_request(f'/drive/v3/files/{current_id}', { 'fields': 'id,name,parents' }) folder_info = { 'id': response.get('id'), 'name': response.get('name', ''), 'level': level } hierarchy.append(folder_info) parent_ids = response.get('parents', []) if not parent_ids: break current_id = parent_ids[0] except Exception as e: _logger.error(f"Error navigating folder hierarchy at level {level}: {str(e)}") break return hierarchy except Exception as e: _logger.error(f"Error in navigate_folder_hierarchy: {str(e)}") return [] def find_folders_by_name(self, parent_id, name_pattern): """Find folders by name pattern in a parent folder""" try: import re query = f"'{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false" params = { 'q': query, 'fields': 'files(id,name)', 'pageSize': 100 } # Add Shared Drive parameters params.update(self._build_shared_drive_params()) response = self._do_request('/drive/v3/files', params) folders = response.get('files', []) # Filter by name pattern pattern = re.compile(name_pattern) matching_folders = [folder for folder in folders if pattern.match(folder.get('name', ''))] return matching_folders except Exception as e: _logger.error(f"Error finding folders by name: {str(e)}") return [] def get_folder_info(self, folder_id): """Get detailed information about a folder""" try: validation = self.validate_folder_id(folder_id) if not validation.get('valid'): return None response = self._do_request(f'/drive/v3/files/{folder_id}', { 'fields': 'id,name,parents,createdTime,modifiedTime,mimeType' }) return response except Exception as e: _logger.error(f"Error getting folder info: {str(e)}") return None # ============================================================================ # MÉTODOS DE MANIPULACIÓN DE CARPETAS # ============================================================================ def rename_folder(self, folder_id, new_name): """Rename a folder in Google Drive""" try: # For Google Drive API v3, we need to use PATCH with the correct parameters # The issue was that we were passing params instead of json body update_data = { 'name': new_name } # Use PATCH method with json body (not params) response = self._do_request(f'/drive/v3/files/{folder_id}', json_data=update_data, method='PATCH') return { 'success': True, 'folder_id': response.get('id'), 'new_name': response.get('name') } except Exception as e: return { 'success': False, 'error': str(e) } def move_folder(self, folder_id, new_parent_id): """Move a folder to a new parent""" try: # Get current folder info response = self._do_request(f'/drive/v3/files/{folder_id}', { 'fields': 'parents' }) current_parents = response.get('parents', []) if not current_parents: return { 'success': False, 'error': 'Folder has no current parent' } # Use PATCH to update the parents update_data = { 'addParents': new_parent_id, 'removeParents': current_parents[0] } # Update the file with new parent update_response = self._do_request( f'/drive/v3/files/{folder_id}', update_data, method='PATCH', json_data=update_data ) return { 'success': True, 'folder_id': folder_id, 'new_parent_id': new_parent_id } except Exception as e: return { 'success': False, 'error': str(e) } def delete_folder(self, folder_id): """Delete a folder from Google Drive""" try: self._do_request(f'/drive/v3/files/{folder_id}', method='DELETE') return { 'success': True, 'folder_id': folder_id } except Exception as e: return { 'success': False, 'error': str(e) } # ============================================================================ # MÉTODOS DE VALIDACIÓN Y UTILIDADES # ============================================================================ def extract_folder_id_from_url(self, url): """Extract folder ID from Google Drive URL - Generic utility method""" if not url: return None # Handle different Google Drive URL formats import re # Format: https://drive.google.com/drive/folders/FOLDER_ID folder_pattern = r'drive\.google\.com/drive/folders/([a-zA-Z0-9_-]+)' match = re.search(folder_pattern, url) if match: return match.group(1) # Format: https://drive.google.com/open?id=FOLDER_ID open_pattern = r'drive\.google\.com/open\?id=([a-zA-Z0-9_-]+)' match = re.search(open_pattern, url) if match: return match.group(1) # Check if it's already a folder ID if isinstance(url, str) and len(url) >= 10 and len(url) <= 50: if re.match(r'^[a-zA-Z0-9_-]+$', url): return url return None def sanitize_folder_name(self, name): """Sanitize folder name to be Google Drive compatible - Generic utility method""" if not name: return 'Sin nombre' import re # Use regex for better performance sanitized_name = re.sub(r'[<>:"|?*/\\]', '_', name) # Remove leading/trailing spaces and dots sanitized_name = sanitized_name.strip(' .') # Ensure it's not empty after sanitization if not sanitized_name: return 'Sin nombre' # Limit length to 255 characters (Google Drive limit) if len(sanitized_name) > 255: sanitized_name = sanitized_name[:252] + '...' return sanitized_name def create_or_get_folder(self, parent_folder_id, folder_name, description=None): """Create a folder or get existing one by name - Generic utility method""" try: # First, check if folder already exists existing_folders = self.find_folders_by_name(parent_folder_id, f'^{folder_name}$') if existing_folders: return existing_folders[0]['id'] # Create new folder result = self.create_folder(folder_name, parent_folder_id, description) if result.get('success'): return result.get('folder_id') else: error_msg = result.get('error', 'Unknown error') raise UserError(_('Failed to create Google Drive folder "%s": %s') % (folder_name, error_msg)) except Exception as e: _logger.error(f"Error creating or getting folder {folder_name}: {str(e)}") raise def create_folder_structure(self, root_folder_id, structure_components): """Create folder structure from components - Generic utility method""" try: created_folders = {} for level, (folder_name, parent_id) in enumerate(structure_components): if level == 0: # First level uses root_folder_id as parent parent_folder_id = root_folder_id else: # Other levels use the previous folder as parent parent_folder_id = created_folders[level - 1] folder_id = self.create_or_get_folder(parent_folder_id, folder_name) created_folders[level] = folder_id return created_folders except Exception as e: _logger.error(f"Error creating folder structure: {str(e)}") raise def find_existing_folder_structure(self, root_folder_id, structure_components): """Find existing folder structure - Generic utility method""" try: found_folders = {} for level, (folder_name, parent_id) in enumerate(structure_components): if level == 0: # First level uses root_folder_id as parent parent_folder_id = root_folder_id else: # Other levels use the previous folder as parent parent_folder_id = found_folders[level - 1] folders = self.find_folders_by_name(parent_folder_id, f'^{folder_name}$') if not folders: return None # Structure not found found_folders[level] = folders[0]['id'] return found_folders except Exception as e: _logger.error(f"Error finding existing folder structure: {str(e)}") return None def validate_folder_id_with_google_drive(self, folder_id): """Validate if the folder ID exists and is accessible in Google Drive - Generic utility method""" try: validation = self.validate_folder_id(folder_id) if validation.get('valid'): folder_name = validation.get('name', 'Unknown') _logger.info(f"✅ Folder ID {folder_id} validated successfully in Google Drive") return True, folder_name else: error_message = validation.get('error', 'Unknown error') _logger.warning(f"❌ Folder ID {folder_id} validation failed: {error_message}") return False, error_message except Exception as e: _logger.error(f"❌ Error validating folder ID {folder_id}: {str(e)}") return False, str(e) def check_folder_belongs_to_parent(self, folder_id, expected_parent_id, max_levels=5): """Check if a folder belongs to a specific parent in the hierarchy""" try: if not expected_parent_id: return False # Get current folder info response = self._do_request(f'/drive/v3/files/{folder_id}', { 'fields': 'parents' }) parent_ids = response.get('parents', []) if not parent_ids: return False # Navigate up to find if the folder is under the expected parent current_parent_id = parent_ids[0] for _ in range(max_levels): # Check if current parent is the expected parent if current_parent_id == expected_parent_id: # Folder is under the expected parent return True parent_response = self._do_request(f'/drive/v3/files/{current_parent_id}', { 'fields': 'parents' }) parent_parent_ids = parent_response.get('parents', []) if not parent_parent_ids: # Reached the top level, folder is not under the expected parent return False current_parent_id = parent_parent_ids[0] # If we reach here, folder is not under the expected parent return False except Exception as e: _logger.error(f"Error checking folder parent relationship: {str(e)}") return False