||
- # -*- coding: utf-8 -*-
- # Part of Odoo. See LICENSE file for full copyright and licensing details.
- import logging
- import json
- from datetime import datetime
- from odoo import api, fields, models, _
- from odoo.exceptions import UserError
- _logger = logging.getLogger(__name__)
- TIMEOUT = 30
- GOOGLE_API_BASE_URL = 'https://www.googleapis.com'
- class GoogleDriveService(models.AbstractModel):
- """Servicio para Google Drive API siguiendo las mejores prácticas de Odoo"""
- _name = 'google.drive.service'
- _description = 'Google Drive Service'
- def _get_credentials(self, user=None):
- """Obtener credenciales de configuración para el usuario especificado"""
- if not user:
- user = self.env.user
-
- config = self.env['ir.config_parameter'].sudo()
-
- enabled = config.get_param('google_api.enabled', 'False')
- if not enabled or enabled == 'False':
- raise UserError(_('Google API Integration is not enabled'))
-
- client_id = config.get_param('google_api.client_id', '')
- client_secret = config.get_param('google_api.client_secret', '')
-
- if not client_id or not client_secret:
- raise UserError(_('Google API credentials not configured'))
-
- # Get access token from user settings
- user_settings = user.res_users_settings_id
- if not user_settings or not user_settings._google_authenticated():
- raise UserError(_('User %s is not authenticated with Google. Please connect your account first.') % user.name)
-
- access_token = user_settings._get_google_access_token()
- if not access_token:
- raise UserError(_('Could not obtain valid access token for user %s. Please reconnect your Google account.') % user.name)
-
- return {
- 'client_id': client_id,
- 'client_secret': client_secret,
- 'access_token': access_token
- }
- def _refresh_access_token(self):
- """Refrescar el token de acceso"""
- try:
- config = self.env['ir.config_parameter'].sudo()
- refresh_token = config.get_param('google_api.refresh_token')
- client_id = config.get_param('google_api.client_id')
- client_secret = config.get_param('google_api.client_secret')
-
- if not all([refresh_token, client_id, client_secret]):
- return False
-
- token_url = 'https://oauth2.googleapis.com/token'
- data = {
- 'client_id': client_id,
- 'client_secret': client_secret,
- 'refresh_token': refresh_token,
- 'grant_type': 'refresh_token'
- }
-
- import requests
- response = requests.post(token_url, data=data, timeout=TIMEOUT)
-
- if response.status_code == 200:
- token_data = response.json()
- new_access_token = token_data.get('access_token')
-
- if new_access_token:
- config.set_param('google_api.access_token', new_access_token)
- _logger.info("Google API access token refreshed successfully")
- return True
-
- return False
-
- except Exception as e:
- _logger.error(f"Failed to refresh access token: {str(e)}")
- return False
- def _do_request(self, uri, params=None, headers=None, method='GET', timeout=TIMEOUT, json_data=None):
- """Realizar petición HTTP a Google API con manejo de errores y reintentos"""
- try:
- credentials = self._get_credentials()
-
- # Headers por defecto
- default_headers = {
- 'Authorization': f'Bearer {credentials["access_token"]}',
- 'Content-Type': 'application/json'
- }
-
- if headers:
- default_headers.update(headers)
-
- url = f"{GOOGLE_API_BASE_URL}{uri}"
-
- import requests
-
- # Realizar petición
- if method.upper() == 'GET':
- response = requests.get(url, params=params, headers=default_headers, timeout=timeout)
- elif method.upper() == 'POST':
- response = requests.post(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
- elif method.upper() == 'PUT':
- response = requests.put(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
- elif method.upper() == 'PATCH':
- response = requests.patch(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
- elif method.upper() == 'DELETE':
- response = requests.delete(url, params=params, headers=default_headers, timeout=timeout)
- else:
- raise UserError(_('Unsupported HTTP method: %s') % method)
-
- # Manejar códigos de respuesta
- if response.status_code == 200:
- return response.json() if response.content else {}
- elif response.status_code == 201:
- return response.json() if response.content else {}
- elif response.status_code == 204:
- return {}
- elif response.status_code == 401:
- # Intentar refrescar token
- if self._refresh_access_token():
- # Reintentar la petición
- return self._do_request(uri, params, headers, method, timeout)
- else:
- raise UserError(_('Authentication failed. Please reconnect your Google account.'))
- elif response.status_code == 429:
- raise UserError(_('Rate limit exceeded. Please try again later.'))
- elif response.status_code >= 400:
- error_msg = self._parse_error_response(response)
- raise UserError(error_msg)
-
- return response.json() if response.content else {}
-
- except requests.exceptions.Timeout:
- raise UserError(_('Request timeout. Please try again.'))
- except requests.exceptions.ConnectionError:
- raise UserError(_('Connection error. Please check your internet connection.'))
- except UserError:
- raise
- except Exception as e:
- raise UserError(_('Unexpected error: %s') % str(e))
- def _parse_error_response(self, response):
- """Parsear respuesta de error de Google API"""
- try:
- error_data = response.json()
- error_info = error_data.get('error', {})
-
- message = error_info.get('message', 'Unknown error')
- code = error_info.get('code', response.status_code)
-
- return f"Google API Error {code}: {message}"
- except:
- return f"Google API Error {response.status_code}: {response.text}"
- def _build_shared_drive_params(self, include_shared_drives=True):
- """Construir parámetros para Shared Drives"""
- params = {}
- if include_shared_drives:
- params.update({
- 'supportsAllDrives': 'true',
- 'includeItemsFromAllDrives': 'true'
- })
- return params
- def validate_folder_id(self, folder_id):
- """Validar que un folder ID existe y es accesible"""
- try:
- if not folder_id:
- raise UserError(_('Folder ID is required'))
-
- if not isinstance(folder_id, str) or len(folder_id) < 10:
- raise UserError(_('Invalid folder ID format'))
-
- # Paso 1: Intentar como carpeta regular
- try:
- response = self._do_request(f'/drive/v3/files/{folder_id}', {
- 'fields': 'id,name,mimeType'
- })
- return {
- 'valid': True,
- 'type': 'regular_folder',
- 'name': response.get('name', 'Unknown'),
- 'id': response.get('id')
- }
- except UserError as e:
- if '404' in str(e):
- # Paso 2: Intentar como Shared Drive
- try:
- response = self._do_request(f'/drive/v3/drives/{folder_id}', {
- 'fields': 'id,name'
- })
- return {
- 'valid': True,
- 'type': 'shared_drive',
- 'name': response.get('name', 'Unknown'),
- 'id': response.get('id')
- }
- except UserError:
- # Paso 3: Intentar como carpeta dentro de Shared Drive
- try:
- params = self._build_shared_drive_params()
- params['fields'] = 'id,name,mimeType'
- response = self._do_request(f'/drive/v3/files/{folder_id}', params)
- return {
- 'valid': True,
- 'type': 'folder_in_shared_drive',
- 'name': response.get('name', 'Unknown'),
- 'id': response.get('id')
- }
- except UserError:
- raise UserError(_('Folder ID not found or not accessible'))
- else:
- raise e
-
- except Exception as e:
- return {
- 'valid': False,
- 'error': str(e)
- }
- def create_folder(self, name, parent_folder_id=None, description=None):
- """Crear una carpeta en Google Drive"""
- try:
- folder_metadata = {
- 'name': name,
- 'mimeType': 'application/vnd.google-apps.folder'
- }
-
- if parent_folder_id:
- folder_metadata['parents'] = [parent_folder_id]
-
- if description:
- folder_metadata['description'] = description
-
- # Usar parámetros de Shared Drive si hay parent folder
- params = {}
- if parent_folder_id:
- params = self._build_shared_drive_params()
-
- # Construir URL con parámetros
- url = '/drive/v3/files'
- if params:
- param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
- url = f"{url}?{param_string}"
-
- response = self._do_request(url, folder_metadata, method='POST')
-
- return {
- 'success': True,
- 'folder_id': response.get('id'),
- 'folder_name': response.get('name'),
- 'folder_url': f"https://drive.google.com/drive/folders/{response.get('id')}"
- }
-
- except Exception as e:
- return {
- 'success': False,
- 'error': str(e)
- }
- def list_folders(self, parent_folder_id=None, include_shared_drives=True):
- """Listar carpetas en Google Drive"""
- try:
- query = "mimeType='application/vnd.google-apps.folder'"
- if parent_folder_id:
- query += f" and '{parent_folder_id}' in parents"
-
- params = {
- 'q': query,
- 'fields': 'files(id,name,parents,createdTime,modifiedTime)',
- 'orderBy': 'name'
- }
-
- # Agregar parámetros de Shared Drive
- if include_shared_drives:
- params.update(self._build_shared_drive_params())
-
- response = self._do_request('/drive/v3/files', params)
-
- return {
- 'success': True,
- 'folders': response.get('files', [])
- }
-
- except Exception as e:
- return {
- 'success': False,
- 'error': str(e)
- }
- def get_folder_url(self, folder_id):
- """Obtener URL para abrir una carpeta"""
- try:
- if not folder_id:
- raise UserError(_('Folder ID is required'))
-
- # Validar que existe
- validation = self.validate_folder_id(folder_id)
- if not validation.get('valid'):
- raise UserError(validation.get('error', 'Invalid folder'))
-
- folder_type = validation.get('type', 'regular_folder')
-
- if folder_type == 'shared_drive':
- return f"https://drive.google.com/drive/u/0/folders/{folder_id}"
- else:
- return f"https://drive.google.com/drive/folders/{folder_id}"
-
- except Exception as e:
- return None
- def test_connection(self):
- """Probar conexión al servicio"""
- try:
- credentials = self._get_credentials()
-
- # Test básico de autenticación
- import requests
- response = requests.get(
- f"{GOOGLE_API_BASE_URL}/oauth2/v1/userinfo",
- headers={'Authorization': f'Bearer {credentials["access_token"]}'},
- timeout=10
- )
-
- if response.status_code == 200:
- user_info = response.json()
- return {
- 'success': True,
- 'user_email': user_info.get('email', 'Unknown'),
- 'user_name': user_info.get('name', 'Unknown')
- }
- else:
- return {
- 'success': False,
- 'error': f"Authentication failed: {response.status_code}"
- }
-
- except Exception as e:
- return {
- 'success': False,
- 'error': str(e)
- }
- # ============================================================================
- # MÉTODOS DE NAVEGACIÓN Y ESTRUCTURA
- # ============================================================================
- def navigate_folder_hierarchy(self, folder_id, max_levels=5):
- """Navigate up the folder hierarchy and return the complete path"""
- try:
- hierarchy = []
- current_id = folder_id
-
- for level in range(max_levels):
- try:
- # Use Shared Drive parameters to handle both regular and shared drive folders
- params = self._build_shared_drive_params()
- params['fields'] = 'id,name,parents'
-
- response = self._do_request(f'/drive/v3/files/{current_id}', params)
-
- folder_info = {
- 'id': response.get('id'),
- 'name': response.get('name', ''),
- 'level': level
- }
-
- hierarchy.append(folder_info)
-
- parent_ids = response.get('parents', [])
- if not parent_ids:
- break
-
- current_id = parent_ids[0]
-
- except UserError as e:
- # Handle 404 errors gracefully - folder might not exist or be accessible
- if '404' in str(e):
- _logger.warning(f"Folder {current_id} not found or not accessible at level {level}")
- break
- else:
- _logger.error(f"Error navigating folder hierarchy at level {level}: {str(e)}")
- break
- except Exception as e:
- _logger.error(f"Unexpected error navigating folder hierarchy at level {level}: {str(e)}")
- break
-
- return hierarchy
-
- except Exception as e:
- _logger.error(f"Error in navigate_folder_hierarchy: {str(e)}")
- return []
- def find_folders_by_name(self, parent_id, name_pattern):
- """Find folders by name pattern in a parent folder"""
- try:
- import re
-
- query = f"'{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false"
-
- params = {
- 'q': query,
- 'fields': 'files(id,name)',
- 'pageSize': 100
- }
-
- # Add Shared Drive parameters
- params.update(self._build_shared_drive_params())
-
- response = self._do_request('/drive/v3/files', params)
- folders = response.get('files', [])
-
- # Filter by name pattern
- pattern = re.compile(name_pattern)
- matching_folders = [folder for folder in folders if pattern.match(folder.get('name', ''))]
-
- return matching_folders
-
- except UserError as e:
- # Handle access errors gracefully
- if '404' in str(e) or '403' in str(e):
- _logger.warning(f"Cannot access parent folder {parent_id}: {str(e)}")
- return []
- else:
- _logger.error(f"Error finding folders by name: {str(e)}")
- return []
- except Exception as e:
- _logger.error(f"Unexpected error finding folders by name: {str(e)}")
- return []
- def get_folder_info(self, folder_id):
- """Get detailed information about a folder"""
- try:
- validation = self.validate_folder_id(folder_id)
- if not validation.get('valid'):
- return None
-
- # Use Shared Drive parameters to handle both regular and shared drive folders
- params = self._build_shared_drive_params()
- params['fields'] = 'id,name,parents,createdTime,modifiedTime,mimeType'
-
- response = self._do_request(f'/drive/v3/files/{folder_id}', params)
-
- return response
-
- except Exception as e:
- _logger.error(f"Error getting folder info: {str(e)}")
- return None
- # ============================================================================
- # MÉTODOS DE MANIPULACIÓN DE CARPETAS
- # ============================================================================
- def rename_folder(self, folder_id, new_name):
- """Rename a folder in Google Drive"""
- try:
- # For Google Drive API v3, we need to use PATCH with the correct parameters
- # The issue was that we were passing params instead of json body
- update_data = {
- 'name': new_name
- }
-
- # Use PATCH method with json body (not params)
- response = self._do_request(f'/drive/v3/files/{folder_id}',
- json_data=update_data,
- method='PATCH')
-
- return {
- 'success': True,
- 'folder_id': response.get('id'),
- 'new_name': response.get('name')
- }
-
- except Exception as e:
- return {
- 'success': False,
- 'error': str(e)
- }
- def move_folder(self, folder_id, new_parent_id):
- """Move a folder to a new parent with Shared Drive support"""
- try:
- # Build Shared Drive parameters
- params = self._build_shared_drive_params()
- params['fields'] = 'parents'
-
- # Get current folder info with Shared Drive support
- response = self._do_request(f'/drive/v3/files/{folder_id}', params)
-
- current_parents = response.get('parents', [])
-
- if not current_parents:
- return {
- 'success': False,
- 'error': 'Folder has no current parent'
- }
-
- # Use PATCH to update the parents with Shared Drive support
- update_params = self._build_shared_drive_params()
- update_params.update({
- 'addParents': new_parent_id,
- 'removeParents': current_parents[0]
- })
-
- # Update the file with new parent
- update_response = self._do_request(
- f'/drive/v3/files/{folder_id}',
- update_params,
- method='PATCH'
- )
-
- return {
- 'success': True,
- 'folder_id': folder_id,
- 'new_parent_id': new_parent_id
- }
-
- except Exception as e:
- return {
- 'success': False,
- 'error': str(e)
- }
- def delete_folder(self, folder_id):
- """Delete a folder from Google Drive"""
- try:
- self._do_request(f'/drive/v3/files/{folder_id}', method='DELETE')
-
- return {
- 'success': True,
- 'folder_id': folder_id
- }
-
- except Exception as e:
- return {
- 'success': False,
- 'error': str(e)
- }
- # ============================================================================
- # MÉTODOS DE VALIDACIÓN Y UTILIDADES
- # ============================================================================
- def extract_folder_id_from_url(self, url):
- """Extract folder ID from Google Drive URL - Generic utility method"""
- if not url:
- return None
-
- # Handle different Google Drive URL formats
- import re
-
- # Format: https://drive.google.com/drive/folders/FOLDER_ID
- folder_pattern = r'drive\.google\.com/drive/folders/([a-zA-Z0-9_-]+)'
- match = re.search(folder_pattern, url)
-
- if match:
- return match.group(1)
-
- # Format: https://drive.google.com/open?id=FOLDER_ID
- open_pattern = r'drive\.google\.com/open\?id=([a-zA-Z0-9_-]+)'
- match = re.search(open_pattern, url)
-
- if match:
- return match.group(1)
-
- # Check if it's already a folder ID
- if isinstance(url, str) and len(url) >= 10 and len(url) <= 50:
- if re.match(r'^[a-zA-Z0-9_-]+$', url):
- return url
-
- return None
- def sanitize_folder_name(self, name):
- """Sanitize folder name to be Google Drive compatible - Generic utility method"""
- if not name:
- return 'Sin nombre'
-
- import re
-
- # Use regex for better performance
- sanitized_name = re.sub(r'[<>:"|?*/\\]', '_', name)
-
- # Remove leading/trailing spaces and dots
- sanitized_name = sanitized_name.strip(' .')
-
- # Ensure it's not empty after sanitization
- if not sanitized_name:
- return 'Sin nombre'
-
- # Limit length to 255 characters (Google Drive limit)
- if len(sanitized_name) > 255:
- sanitized_name = sanitized_name[:252] + '...'
-
- return sanitized_name
- def create_or_get_folder(self, parent_folder_id, folder_name, description=None):
- """Create a folder or get existing one by name - Generic utility method"""
- try:
- # First, check if folder already exists
- existing_folders = self.find_folders_by_name(parent_folder_id, f'^{folder_name}$')
- if existing_folders:
- return existing_folders[0]['id']
-
- # Create new folder
- result = self.create_folder(folder_name, parent_folder_id, description)
-
- if result.get('success'):
- return result.get('folder_id')
- else:
- error_msg = result.get('error', 'Unknown error')
- raise UserError(_('Failed to create Google Drive folder "%s": %s') % (folder_name, error_msg))
-
- except Exception as e:
- _logger.error(f"Error creating or getting folder {folder_name}: {str(e)}")
- raise
- def create_folder_structure(self, root_folder_id, structure_components):
- """Create folder structure from components - Generic utility method"""
- try:
- created_folders = {}
-
- for level, (folder_name, parent_id) in enumerate(structure_components):
- if level == 0:
- # First level uses root_folder_id as parent
- parent_folder_id = root_folder_id
- else:
- # Other levels use the previous folder as parent
- parent_folder_id = created_folders[level - 1]
-
- folder_id = self.create_or_get_folder(parent_folder_id, folder_name)
- created_folders[level] = folder_id
-
- return created_folders
-
- except Exception as e:
- _logger.error(f"Error creating folder structure: {str(e)}")
- raise
- def find_existing_folder_structure(self, root_folder_id, structure_components):
- """Find existing folder structure - Generic utility method"""
- try:
- found_folders = {}
-
- for level, (folder_name, parent_id) in enumerate(structure_components):
- if level == 0:
- # First level uses root_folder_id as parent
- parent_folder_id = root_folder_id
- else:
- # Other levels use the previous folder as parent
- parent_folder_id = found_folders[level - 1]
-
- folders = self.find_folders_by_name(parent_folder_id, f'^{folder_name}$')
- if not folders:
- return None # Structure not found
-
- found_folders[level] = folders[0]['id']
-
- return found_folders
-
- except Exception as e:
- _logger.error(f"Error finding existing folder structure: {str(e)}")
- return None
- def validate_folder_id_with_google_drive(self, folder_id):
- """Validate if the folder ID exists and is accessible in Google Drive - Generic utility method"""
- try:
- validation = self.validate_folder_id(folder_id)
-
- if validation.get('valid'):
- folder_name = validation.get('name', 'Unknown')
- _logger.info(f"✅ Folder ID {folder_id} validated successfully in Google Drive")
- return True, folder_name
- else:
- error_message = validation.get('error', 'Unknown error')
- _logger.warning(f"❌ Folder ID {folder_id} validation failed: {error_message}")
- return False, error_message
-
- except Exception as e:
- _logger.error(f"❌ Error validating folder ID {folder_id}: {str(e)}")
- return False, str(e)
- def check_folder_belongs_to_parent(self, folder_id, expected_parent_id, max_levels=5):
- """Check if a folder belongs to a specific parent in the hierarchy"""
- try:
- if not expected_parent_id:
- return False
-
- # Get current folder info
- response = self._do_request(f'/drive/v3/files/{folder_id}', {
- 'fields': 'parents'
- })
-
- parent_ids = response.get('parents', [])
-
- if not parent_ids:
- return False
-
- # Navigate up to find if the folder is under the expected parent
- current_parent_id = parent_ids[0]
-
- for _ in range(max_levels):
- # Check if current parent is the expected parent
- if current_parent_id == expected_parent_id:
- # Folder is under the expected parent
- return True
-
- parent_response = self._do_request(f'/drive/v3/files/{current_parent_id}', {
- 'fields': 'parents'
- })
-
- parent_parent_ids = parent_response.get('parents', [])
-
- if not parent_parent_ids:
- # Reached the top level, folder is not under the expected parent
- return False
-
- current_parent_id = parent_parent_ids[0]
-
- # If we reach here, folder is not under the expected parent
- return False
-
- except Exception as e:
- _logger.error(f"Error checking folder parent relationship: {str(e)}")
- return False
|