|
@@ -0,0 +1,713 @@
|
|
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
|
|
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
|
+
|
|
|
|
|
+import logging
|
|
|
|
|
+import json
|
|
|
|
|
+from datetime import datetime
|
|
|
|
|
+from odoo import api, fields, models, _
|
|
|
|
|
+from odoo.exceptions import UserError
|
|
|
|
|
+
|
|
|
|
|
+_logger = logging.getLogger(__name__)
|
|
|
|
|
+
|
|
|
|
|
+TIMEOUT = 30
|
|
|
|
|
+GOOGLE_API_BASE_URL = 'https://www.googleapis.com'
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class GoogleDriveService(models.AbstractModel):
|
|
|
|
|
+ """Servicio para Google Drive API siguiendo las mejores prácticas de Odoo"""
|
|
|
|
|
+ _name = 'google.drive.service'
|
|
|
|
|
+ _description = 'Google Drive Service'
|
|
|
|
|
+
|
|
|
|
|
+ def _get_credentials(self, user=None):
|
|
|
|
|
+ """Obtener credenciales de configuración para el usuario especificado"""
|
|
|
|
|
+ if not user:
|
|
|
|
|
+ user = self.env.user
|
|
|
|
|
+
|
|
|
|
|
+ config = self.env['ir.config_parameter'].sudo()
|
|
|
|
|
+
|
|
|
|
|
+ enabled = config.get_param('google_api.enabled', 'False')
|
|
|
|
|
+ if not enabled or enabled == 'False':
|
|
|
|
|
+ raise UserError(_('Google API Integration is not enabled'))
|
|
|
|
|
+
|
|
|
|
|
+ client_id = config.get_param('google_api.client_id', '')
|
|
|
|
|
+ client_secret = config.get_param('google_api.client_secret', '')
|
|
|
|
|
+
|
|
|
|
|
+ if not client_id or not client_secret:
|
|
|
|
|
+ raise UserError(_('Google API credentials not configured'))
|
|
|
|
|
+
|
|
|
|
|
+ # Get access token from user settings
|
|
|
|
|
+ user_settings = user.res_users_settings_id
|
|
|
|
|
+ if not user_settings or not user_settings._google_authenticated():
|
|
|
|
|
+ raise UserError(_('User %s is not authenticated with Google. Please connect your account first.') % user.name)
|
|
|
|
|
+
|
|
|
|
|
+ access_token = user_settings._get_google_access_token()
|
|
|
|
|
+ if not access_token:
|
|
|
|
|
+ raise UserError(_('Could not obtain valid access token for user %s. Please reconnect your Google account.') % user.name)
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'client_id': client_id,
|
|
|
|
|
+ 'client_secret': client_secret,
|
|
|
|
|
+ 'access_token': access_token
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def _refresh_access_token(self):
|
|
|
|
|
+ """Refrescar el token de acceso"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ config = self.env['ir.config_parameter'].sudo()
|
|
|
|
|
+ refresh_token = config.get_param('google_api.refresh_token')
|
|
|
|
|
+ client_id = config.get_param('google_api.client_id')
|
|
|
|
|
+ client_secret = config.get_param('google_api.client_secret')
|
|
|
|
|
+
|
|
|
|
|
+ if not all([refresh_token, client_id, client_secret]):
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ token_url = 'https://oauth2.googleapis.com/token'
|
|
|
|
|
+ data = {
|
|
|
|
|
+ 'client_id': client_id,
|
|
|
|
|
+ 'client_secret': client_secret,
|
|
|
|
|
+ 'refresh_token': refresh_token,
|
|
|
|
|
+ 'grant_type': 'refresh_token'
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ import requests
|
|
|
|
|
+ response = requests.post(token_url, data=data, timeout=TIMEOUT)
|
|
|
|
|
+
|
|
|
|
|
+ if response.status_code == 200:
|
|
|
|
|
+ token_data = response.json()
|
|
|
|
|
+ new_access_token = token_data.get('access_token')
|
|
|
|
|
+
|
|
|
|
|
+ if new_access_token:
|
|
|
|
|
+ config.set_param('google_api.access_token', new_access_token)
|
|
|
|
|
+ _logger.info("Google API access token refreshed successfully")
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ _logger.error(f"Failed to refresh access token: {str(e)}")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ def _do_request(self, uri, params=None, headers=None, method='GET', timeout=TIMEOUT, json_data=None):
|
|
|
|
|
+ """Realizar petición HTTP a Google API con manejo de errores y reintentos"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ credentials = self._get_credentials()
|
|
|
|
|
+
|
|
|
|
|
+ # Headers por defecto
|
|
|
|
|
+ default_headers = {
|
|
|
|
|
+ 'Authorization': f'Bearer {credentials["access_token"]}',
|
|
|
|
|
+ 'Content-Type': 'application/json'
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if headers:
|
|
|
|
|
+ default_headers.update(headers)
|
|
|
|
|
+
|
|
|
|
|
+ url = f"{GOOGLE_API_BASE_URL}{uri}"
|
|
|
|
|
+
|
|
|
|
|
+ import requests
|
|
|
|
|
+
|
|
|
|
|
+ # Realizar petición
|
|
|
|
|
+ if method.upper() == 'GET':
|
|
|
|
|
+ response = requests.get(url, params=params, headers=default_headers, timeout=timeout)
|
|
|
|
|
+ elif method.upper() == 'POST':
|
|
|
|
|
+ response = requests.post(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
|
|
|
|
|
+ elif method.upper() == 'PUT':
|
|
|
|
|
+ response = requests.put(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
|
|
|
|
|
+ elif method.upper() == 'PATCH':
|
|
|
|
|
+ response = requests.patch(url, params=params, json=json_data or params, headers=default_headers, timeout=timeout)
|
|
|
|
|
+ elif method.upper() == 'DELETE':
|
|
|
|
|
+ response = requests.delete(url, params=params, headers=default_headers, timeout=timeout)
|
|
|
|
|
+ else:
|
|
|
|
|
+ raise UserError(_('Unsupported HTTP method: %s') % method)
|
|
|
|
|
+
|
|
|
|
|
+ # Manejar códigos de respuesta
|
|
|
|
|
+ if response.status_code == 200:
|
|
|
|
|
+ return response.json() if response.content else {}
|
|
|
|
|
+ elif response.status_code == 201:
|
|
|
|
|
+ return response.json() if response.content else {}
|
|
|
|
|
+ elif response.status_code == 204:
|
|
|
|
|
+ return {}
|
|
|
|
|
+ elif response.status_code == 401:
|
|
|
|
|
+ # Intentar refrescar token
|
|
|
|
|
+ if self._refresh_access_token():
|
|
|
|
|
+ # Reintentar la petición
|
|
|
|
|
+ return self._do_request(uri, params, headers, method, timeout)
|
|
|
|
|
+ else:
|
|
|
|
|
+ raise UserError(_('Authentication failed. Please reconnect your Google account.'))
|
|
|
|
|
+ elif response.status_code == 429:
|
|
|
|
|
+ raise UserError(_('Rate limit exceeded. Please try again later.'))
|
|
|
|
|
+ elif response.status_code >= 400:
|
|
|
|
|
+ error_msg = self._parse_error_response(response)
|
|
|
|
|
+ raise UserError(error_msg)
|
|
|
|
|
+
|
|
|
|
|
+ return response.json() if response.content else {}
|
|
|
|
|
+
|
|
|
|
|
+ except requests.exceptions.Timeout:
|
|
|
|
|
+ raise UserError(_('Request timeout. Please try again.'))
|
|
|
|
|
+ except requests.exceptions.ConnectionError:
|
|
|
|
|
+ raise UserError(_('Connection error. Please check your internet connection.'))
|
|
|
|
|
+ except UserError:
|
|
|
|
|
+ raise
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ raise UserError(_('Unexpected error: %s') % str(e))
|
|
|
|
|
+
|
|
|
|
|
+ def _parse_error_response(self, response):
|
|
|
|
|
+ """Parsear respuesta de error de Google API"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ error_data = response.json()
|
|
|
|
|
+ error_info = error_data.get('error', {})
|
|
|
|
|
+
|
|
|
|
|
+ message = error_info.get('message', 'Unknown error')
|
|
|
|
|
+ code = error_info.get('code', response.status_code)
|
|
|
|
|
+
|
|
|
|
|
+ return f"Google API Error {code}: {message}"
|
|
|
|
|
+ except:
|
|
|
|
|
+ return f"Google API Error {response.status_code}: {response.text}"
|
|
|
|
|
+
|
|
|
|
|
+ def _build_shared_drive_params(self, include_shared_drives=True):
|
|
|
|
|
+ """Construir parámetros para Shared Drives"""
|
|
|
|
|
+ params = {}
|
|
|
|
|
+ if include_shared_drives:
|
|
|
|
|
+ params.update({
|
|
|
|
|
+ 'supportsAllDrives': 'true',
|
|
|
|
|
+ 'includeItemsFromAllDrives': 'true'
|
|
|
|
|
+ })
|
|
|
|
|
+ return params
|
|
|
|
|
+
|
|
|
|
|
+ def validate_folder_id(self, folder_id):
|
|
|
|
|
+ """Validar que un folder ID existe y es accesible"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ if not folder_id:
|
|
|
|
|
+ raise UserError(_('Folder ID is required'))
|
|
|
|
|
+
|
|
|
|
|
+ if not isinstance(folder_id, str) or len(folder_id) < 10:
|
|
|
|
|
+ raise UserError(_('Invalid folder ID format'))
|
|
|
|
|
+
|
|
|
|
|
+ # Paso 1: Intentar como carpeta regular
|
|
|
|
|
+ try:
|
|
|
|
|
+ response = self._do_request(f'/drive/v3/files/{folder_id}', {
|
|
|
|
|
+ 'fields': 'id,name,mimeType'
|
|
|
|
|
+ })
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'valid': True,
|
|
|
|
|
+ 'type': 'regular_folder',
|
|
|
|
|
+ 'name': response.get('name', 'Unknown'),
|
|
|
|
|
+ 'id': response.get('id')
|
|
|
|
|
+ }
|
|
|
|
|
+ except UserError as e:
|
|
|
|
|
+ if '404' in str(e):
|
|
|
|
|
+ # Paso 2: Intentar como Shared Drive
|
|
|
|
|
+ try:
|
|
|
|
|
+ response = self._do_request(f'/drive/v3/drives/{folder_id}', {
|
|
|
|
|
+ 'fields': 'id,name'
|
|
|
|
|
+ })
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'valid': True,
|
|
|
|
|
+ 'type': 'shared_drive',
|
|
|
|
|
+ 'name': response.get('name', 'Unknown'),
|
|
|
|
|
+ 'id': response.get('id')
|
|
|
|
|
+ }
|
|
|
|
|
+ except UserError:
|
|
|
|
|
+ # Paso 3: Intentar como carpeta dentro de Shared Drive
|
|
|
|
|
+ try:
|
|
|
|
|
+ params = self._build_shared_drive_params()
|
|
|
|
|
+ params['fields'] = 'id,name,mimeType'
|
|
|
|
|
+ response = self._do_request(f'/drive/v3/files/{folder_id}', params)
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'valid': True,
|
|
|
|
|
+ 'type': 'folder_in_shared_drive',
|
|
|
|
|
+ 'name': response.get('name', 'Unknown'),
|
|
|
|
|
+ 'id': response.get('id')
|
|
|
|
|
+ }
|
|
|
|
|
+ except UserError:
|
|
|
|
|
+ raise UserError(_('Folder ID not found or not accessible'))
|
|
|
|
|
+ else:
|
|
|
|
|
+ raise e
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'valid': False,
|
|
|
|
|
+ 'error': str(e)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def create_folder(self, name, parent_folder_id=None, description=None):
|
|
|
|
|
+ """Crear una carpeta en Google Drive"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ folder_metadata = {
|
|
|
|
|
+ 'name': name,
|
|
|
|
|
+ 'mimeType': 'application/vnd.google-apps.folder'
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if parent_folder_id:
|
|
|
|
|
+ folder_metadata['parents'] = [parent_folder_id]
|
|
|
|
|
+
|
|
|
|
|
+ if description:
|
|
|
|
|
+ folder_metadata['description'] = description
|
|
|
|
|
+
|
|
|
|
|
+ # Usar parámetros de Shared Drive si hay parent folder
|
|
|
|
|
+ params = {}
|
|
|
|
|
+ if parent_folder_id:
|
|
|
|
|
+ params = self._build_shared_drive_params()
|
|
|
|
|
+
|
|
|
|
|
+ # Construir URL con parámetros
|
|
|
|
|
+ url = '/drive/v3/files'
|
|
|
|
|
+ if params:
|
|
|
|
|
+ param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
|
|
|
|
|
+ url = f"{url}?{param_string}"
|
|
|
|
|
+
|
|
|
|
|
+ response = self._do_request(url, folder_metadata, method='POST')
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': True,
|
|
|
|
|
+ 'folder_id': response.get('id'),
|
|
|
|
|
+ 'folder_name': response.get('name'),
|
|
|
|
|
+ 'folder_url': f"https://drive.google.com/drive/folders/{response.get('id')}"
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': False,
|
|
|
|
|
+ 'error': str(e)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def list_folders(self, parent_folder_id=None, include_shared_drives=True):
|
|
|
|
|
+ """Listar carpetas en Google Drive"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ query = "mimeType='application/vnd.google-apps.folder'"
|
|
|
|
|
+ if parent_folder_id:
|
|
|
|
|
+ query += f" and '{parent_folder_id}' in parents"
|
|
|
|
|
+
|
|
|
|
|
+ params = {
|
|
|
|
|
+ 'q': query,
|
|
|
|
|
+ 'fields': 'files(id,name,parents,createdTime,modifiedTime)',
|
|
|
|
|
+ 'orderBy': 'name'
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # Agregar parámetros de Shared Drive
|
|
|
|
|
+ if include_shared_drives:
|
|
|
|
|
+ params.update(self._build_shared_drive_params())
|
|
|
|
|
+
|
|
|
|
|
+ response = self._do_request('/drive/v3/files', params)
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': True,
|
|
|
|
|
+ 'folders': response.get('files', [])
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': False,
|
|
|
|
|
+ 'error': str(e)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def get_folder_url(self, folder_id):
|
|
|
|
|
+ """Obtener URL para abrir una carpeta"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ if not folder_id:
|
|
|
|
|
+ raise UserError(_('Folder ID is required'))
|
|
|
|
|
+
|
|
|
|
|
+ # Validar que existe
|
|
|
|
|
+ validation = self.validate_folder_id(folder_id)
|
|
|
|
|
+ if not validation.get('valid'):
|
|
|
|
|
+ raise UserError(validation.get('error', 'Invalid folder'))
|
|
|
|
|
+
|
|
|
|
|
+ folder_type = validation.get('type', 'regular_folder')
|
|
|
|
|
+
|
|
|
|
|
+ if folder_type == 'shared_drive':
|
|
|
|
|
+ return f"https://drive.google.com/drive/u/0/folders/{folder_id}"
|
|
|
|
|
+ else:
|
|
|
|
|
+ return f"https://drive.google.com/drive/folders/{folder_id}"
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def test_connection(self):
|
|
|
|
|
+ """Probar conexión al servicio"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ credentials = self._get_credentials()
|
|
|
|
|
+
|
|
|
|
|
+ # Test básico de autenticación
|
|
|
|
|
+ import requests
|
|
|
|
|
+ response = requests.get(
|
|
|
|
|
+ f"{GOOGLE_API_BASE_URL}/oauth2/v1/userinfo",
|
|
|
|
|
+ headers={'Authorization': f'Bearer {credentials["access_token"]}'},
|
|
|
|
|
+ timeout=10
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if response.status_code == 200:
|
|
|
|
|
+ user_info = response.json()
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': True,
|
|
|
|
|
+ 'user_email': user_info.get('email', 'Unknown'),
|
|
|
|
|
+ 'user_name': user_info.get('name', 'Unknown')
|
|
|
|
|
+ }
|
|
|
|
|
+ else:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': False,
|
|
|
|
|
+ 'error': f"Authentication failed: {response.status_code}"
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': False,
|
|
|
|
|
+ 'error': str(e)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # ============================================================================
|
|
|
|
|
+ # MÉTODOS DE NAVEGACIÓN Y ESTRUCTURA
|
|
|
|
|
+ # ============================================================================
|
|
|
|
|
+
|
|
|
|
|
+ def navigate_folder_hierarchy(self, folder_id, max_levels=5):
|
|
|
|
|
+ """Navigate up the folder hierarchy and return the complete path"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ hierarchy = []
|
|
|
|
|
+ current_id = folder_id
|
|
|
|
|
+
|
|
|
|
|
+ for level in range(max_levels):
|
|
|
|
|
+ try:
|
|
|
|
|
+ response = self._do_request(f'/drive/v3/files/{current_id}', {
|
|
|
|
|
+ 'fields': 'id,name,parents'
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ folder_info = {
|
|
|
|
|
+ 'id': response.get('id'),
|
|
|
|
|
+ 'name': response.get('name', ''),
|
|
|
|
|
+ 'level': level
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ hierarchy.append(folder_info)
|
|
|
|
|
+
|
|
|
|
|
+ parent_ids = response.get('parents', [])
|
|
|
|
|
+ if not parent_ids:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ current_id = parent_ids[0]
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ _logger.error(f"Error navigating folder hierarchy at level {level}: {str(e)}")
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ return hierarchy
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ _logger.error(f"Error in navigate_folder_hierarchy: {str(e)}")
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ def find_folders_by_name(self, parent_id, name_pattern):
|
|
|
|
|
+ """Find folders by name pattern in a parent folder"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ import re
|
|
|
|
|
+
|
|
|
|
|
+ query = f"'{parent_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false"
|
|
|
|
|
+
|
|
|
|
|
+ params = {
|
|
|
|
|
+ 'q': query,
|
|
|
|
|
+ 'fields': 'files(id,name)',
|
|
|
|
|
+ 'pageSize': 100
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # Add Shared Drive parameters
|
|
|
|
|
+ params.update(self._build_shared_drive_params())
|
|
|
|
|
+
|
|
|
|
|
+ response = self._do_request('/drive/v3/files', params)
|
|
|
|
|
+ folders = response.get('files', [])
|
|
|
|
|
+
|
|
|
|
|
+ # Filter by name pattern
|
|
|
|
|
+ pattern = re.compile(name_pattern)
|
|
|
|
|
+ matching_folders = [folder for folder in folders if pattern.match(folder.get('name', ''))]
|
|
|
|
|
+
|
|
|
|
|
+ return matching_folders
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ _logger.error(f"Error finding folders by name: {str(e)}")
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ def get_folder_info(self, folder_id):
|
|
|
|
|
+ """Get detailed information about a folder"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ validation = self.validate_folder_id(folder_id)
|
|
|
|
|
+ if not validation.get('valid'):
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ response = self._do_request(f'/drive/v3/files/{folder_id}', {
|
|
|
|
|
+ 'fields': 'id,name,parents,createdTime,modifiedTime,mimeType'
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ return response
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ _logger.error(f"Error getting folder info: {str(e)}")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ # ============================================================================
|
|
|
|
|
+ # MÉTODOS DE MANIPULACIÓN DE CARPETAS
|
|
|
|
|
+ # ============================================================================
|
|
|
|
|
+
|
|
|
|
|
+ def rename_folder(self, folder_id, new_name):
|
|
|
|
|
+ """Rename a folder in Google Drive"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ # For Google Drive API v3, we need to use PATCH with the correct parameters
|
|
|
|
|
+ # The issue was that we were passing params instead of json body
|
|
|
|
|
+ update_data = {
|
|
|
|
|
+ 'name': new_name
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # Use PATCH method with json body (not params)
|
|
|
|
|
+ response = self._do_request(f'/drive/v3/files/{folder_id}',
|
|
|
|
|
+ json_data=update_data,
|
|
|
|
|
+ method='PATCH')
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': True,
|
|
|
|
|
+ 'folder_id': response.get('id'),
|
|
|
|
|
+ 'new_name': response.get('name')
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': False,
|
|
|
|
|
+ 'error': str(e)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def move_folder(self, folder_id, new_parent_id):
|
|
|
|
|
+ """Move a folder to a new parent"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ # Get current folder info
|
|
|
|
|
+ response = self._do_request(f'/drive/v3/files/{folder_id}', {
|
|
|
|
|
+ 'fields': 'parents'
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ current_parents = response.get('parents', [])
|
|
|
|
|
+
|
|
|
|
|
+ if not current_parents:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': False,
|
|
|
|
|
+ 'error': 'Folder has no current parent'
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # Use PATCH to update the parents
|
|
|
|
|
+ update_data = {
|
|
|
|
|
+ 'addParents': new_parent_id,
|
|
|
|
|
+ 'removeParents': current_parents[0]
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # Update the file with new parent
|
|
|
|
|
+ update_response = self._do_request(
|
|
|
|
|
+ f'/drive/v3/files/{folder_id}',
|
|
|
|
|
+ update_data,
|
|
|
|
|
+ method='PATCH',
|
|
|
|
|
+ json_data=update_data
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': True,
|
|
|
|
|
+ 'folder_id': folder_id,
|
|
|
|
|
+ 'new_parent_id': new_parent_id
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': False,
|
|
|
|
|
+ 'error': str(e)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def delete_folder(self, folder_id):
|
|
|
|
|
+ """Delete a folder from Google Drive"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ self._do_request(f'/drive/v3/files/{folder_id}', method='DELETE')
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': True,
|
|
|
|
|
+ 'folder_id': folder_id
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'success': False,
|
|
|
|
|
+ 'error': str(e)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # ============================================================================
|
|
|
|
|
+ # MÉTODOS DE VALIDACIÓN Y UTILIDADES
|
|
|
|
|
+ # ============================================================================
|
|
|
|
|
+
|
|
|
|
|
+ def extract_folder_id_from_url(self, url):
|
|
|
|
|
+ """Extract folder ID from Google Drive URL - Generic utility method"""
|
|
|
|
|
+ if not url:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ # Handle different Google Drive URL formats
|
|
|
|
|
+ import re
|
|
|
|
|
+
|
|
|
|
|
+ # Format: https://drive.google.com/drive/folders/FOLDER_ID
|
|
|
|
|
+ folder_pattern = r'drive\.google\.com/drive/folders/([a-zA-Z0-9_-]+)'
|
|
|
|
|
+ match = re.search(folder_pattern, url)
|
|
|
|
|
+
|
|
|
|
|
+ if match:
|
|
|
|
|
+ return match.group(1)
|
|
|
|
|
+
|
|
|
|
|
+ # Format: https://drive.google.com/open?id=FOLDER_ID
|
|
|
|
|
+ open_pattern = r'drive\.google\.com/open\?id=([a-zA-Z0-9_-]+)'
|
|
|
|
|
+ match = re.search(open_pattern, url)
|
|
|
|
|
+
|
|
|
|
|
+ if match:
|
|
|
|
|
+ return match.group(1)
|
|
|
|
|
+
|
|
|
|
|
+ # Check if it's already a folder ID
|
|
|
|
|
+ if isinstance(url, str) and len(url) >= 10 and len(url) <= 50:
|
|
|
|
|
+ if re.match(r'^[a-zA-Z0-9_-]+$', url):
|
|
|
|
|
+ return url
|
|
|
|
|
+
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def sanitize_folder_name(self, name):
|
|
|
|
|
+ """Sanitize folder name to be Google Drive compatible - Generic utility method"""
|
|
|
|
|
+ if not name:
|
|
|
|
|
+ return 'Sin nombre'
|
|
|
|
|
+
|
|
|
|
|
+ import re
|
|
|
|
|
+
|
|
|
|
|
+ # Use regex for better performance
|
|
|
|
|
+ sanitized_name = re.sub(r'[<>:"|?*/\\]', '_', name)
|
|
|
|
|
+
|
|
|
|
|
+ # Remove leading/trailing spaces and dots
|
|
|
|
|
+ sanitized_name = sanitized_name.strip(' .')
|
|
|
|
|
+
|
|
|
|
|
+ # Ensure it's not empty after sanitization
|
|
|
|
|
+ if not sanitized_name:
|
|
|
|
|
+ return 'Sin nombre'
|
|
|
|
|
+
|
|
|
|
|
+ # Limit length to 255 characters (Google Drive limit)
|
|
|
|
|
+ if len(sanitized_name) > 255:
|
|
|
|
|
+ sanitized_name = sanitized_name[:252] + '...'
|
|
|
|
|
+
|
|
|
|
|
+ return sanitized_name
|
|
|
|
|
+
|
|
|
|
|
+ def create_or_get_folder(self, parent_folder_id, folder_name, description=None):
|
|
|
|
|
+ """Create a folder or get existing one by name - Generic utility method"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ # First, check if folder already exists
|
|
|
|
|
+ existing_folders = self.find_folders_by_name(parent_folder_id, f'^{folder_name}$')
|
|
|
|
|
+ if existing_folders:
|
|
|
|
|
+ return existing_folders[0]['id']
|
|
|
|
|
+
|
|
|
|
|
+ # Create new folder
|
|
|
|
|
+ result = self.create_folder(folder_name, parent_folder_id, description)
|
|
|
|
|
+
|
|
|
|
|
+ if result.get('success'):
|
|
|
|
|
+ return result.get('folder_id')
|
|
|
|
|
+ else:
|
|
|
|
|
+ error_msg = result.get('error', 'Unknown error')
|
|
|
|
|
+ raise UserError(_('Failed to create Google Drive folder "%s": %s') % (folder_name, error_msg))
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ _logger.error(f"Error creating or getting folder {folder_name}: {str(e)}")
|
|
|
|
|
+ raise
|
|
|
|
|
+
|
|
|
|
|
+ def create_folder_structure(self, root_folder_id, structure_components):
|
|
|
|
|
+ """Create folder structure from components - Generic utility method"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ created_folders = {}
|
|
|
|
|
+
|
|
|
|
|
+ for level, (folder_name, parent_id) in enumerate(structure_components):
|
|
|
|
|
+ if level == 0:
|
|
|
|
|
+ # First level uses root_folder_id as parent
|
|
|
|
|
+ parent_folder_id = root_folder_id
|
|
|
|
|
+ else:
|
|
|
|
|
+ # Other levels use the previous folder as parent
|
|
|
|
|
+ parent_folder_id = created_folders[level - 1]
|
|
|
|
|
+
|
|
|
|
|
+ folder_id = self.create_or_get_folder(parent_folder_id, folder_name)
|
|
|
|
|
+ created_folders[level] = folder_id
|
|
|
|
|
+
|
|
|
|
|
+ return created_folders
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ _logger.error(f"Error creating folder structure: {str(e)}")
|
|
|
|
|
+ raise
|
|
|
|
|
+
|
|
|
|
|
+ def find_existing_folder_structure(self, root_folder_id, structure_components):
|
|
|
|
|
+ """Find existing folder structure - Generic utility method"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ found_folders = {}
|
|
|
|
|
+
|
|
|
|
|
+ for level, (folder_name, parent_id) in enumerate(structure_components):
|
|
|
|
|
+ if level == 0:
|
|
|
|
|
+ # First level uses root_folder_id as parent
|
|
|
|
|
+ parent_folder_id = root_folder_id
|
|
|
|
|
+ else:
|
|
|
|
|
+ # Other levels use the previous folder as parent
|
|
|
|
|
+ parent_folder_id = found_folders[level - 1]
|
|
|
|
|
+
|
|
|
|
|
+ folders = self.find_folders_by_name(parent_folder_id, f'^{folder_name}$')
|
|
|
|
|
+ if not folders:
|
|
|
|
|
+ return None # Structure not found
|
|
|
|
|
+
|
|
|
|
|
+ found_folders[level] = folders[0]['id']
|
|
|
|
|
+
|
|
|
|
|
+ return found_folders
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ _logger.error(f"Error finding existing folder structure: {str(e)}")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def validate_folder_id_with_google_drive(self, folder_id):
|
|
|
|
|
+ """Validate if the folder ID exists and is accessible in Google Drive - Generic utility method"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ validation = self.validate_folder_id(folder_id)
|
|
|
|
|
+
|
|
|
|
|
+ if validation.get('valid'):
|
|
|
|
|
+ folder_name = validation.get('name', 'Unknown')
|
|
|
|
|
+ _logger.info(f"✅ Folder ID {folder_id} validated successfully in Google Drive")
|
|
|
|
|
+ return True, folder_name
|
|
|
|
|
+ else:
|
|
|
|
|
+ error_message = validation.get('error', 'Unknown error')
|
|
|
|
|
+ _logger.warning(f"❌ Folder ID {folder_id} validation failed: {error_message}")
|
|
|
|
|
+ return False, error_message
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ _logger.error(f"❌ Error validating folder ID {folder_id}: {str(e)}")
|
|
|
|
|
+ return False, str(e)
|
|
|
|
|
+
|
|
|
|
|
+ def check_folder_belongs_to_parent(self, folder_id, expected_parent_id, max_levels=5):
|
|
|
|
|
+ """Check if a folder belongs to a specific parent in the hierarchy"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ if not expected_parent_id:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ # Get current folder info
|
|
|
|
|
+ response = self._do_request(f'/drive/v3/files/{folder_id}', {
|
|
|
|
|
+ 'fields': 'parents'
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ parent_ids = response.get('parents', [])
|
|
|
|
|
+
|
|
|
|
|
+ if not parent_ids:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ # Navigate up to find if the folder is under the expected parent
|
|
|
|
|
+ current_parent_id = parent_ids[0]
|
|
|
|
|
+
|
|
|
|
|
+ for _ in range(max_levels):
|
|
|
|
|
+ # Check if current parent is the expected parent
|
|
|
|
|
+ if current_parent_id == expected_parent_id:
|
|
|
|
|
+ # Folder is under the expected parent
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ parent_response = self._do_request(f'/drive/v3/files/{current_parent_id}', {
|
|
|
|
|
+ 'fields': 'parents'
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ parent_parent_ids = parent_response.get('parents', [])
|
|
|
|
|
+
|
|
|
|
|
+ if not parent_parent_ids:
|
|
|
|
|
+ # Reached the top level, folder is not under the expected parent
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ current_parent_id = parent_parent_ids[0]
|
|
|
|
|
+
|
|
|
|
|
+ # If we reach here, folder is not under the expected parent
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ _logger.error(f"Error checking folder parent relationship: {str(e)}")
|
|
|
|
|
+ return False
|