# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. from datetime import timedelta import logging from odoo import api, fields, models, _ from odoo.exceptions import UserError _logger = logging.getLogger(__name__) class ResUsersSettings(models.Model): _inherit = "res.users.settings" # Google API tokens and synchronization information google_rtoken = fields.Char('Google Refresh Token', copy=False, groups='base.group_system') google_token = fields.Char('Google User Token', copy=False, groups='base.group_system') google_token_validity = fields.Datetime('Google Token Validity', copy=False, groups='base.group_system') google_email = fields.Char('Google Email', copy=False, groups='base.group_system') google_sync_enabled = fields.Boolean('Google Sync Enabled', default=False, copy=False, groups='base.group_system') # CRM Meets Files Configuration google_crm_meets_folder_id = fields.Char( string='Archivos Meets CRM', help='ID de la carpeta en Google Drive donde se almacenan archivos de meets para sincronización con CRM', copy=False, groups='base.group_system' ) @api.model def _get_fields_blacklist(self): """Get list of google drive fields that won't be formatted in session_info.""" google_fields_blacklist = [ 'google_rtoken', 'google_token', 'google_token_validity', 'google_email', 'google_sync_enabled', 'google_crm_meets_folder_id' ] return super()._get_fields_blacklist() + google_fields_blacklist def _set_google_auth_tokens(self, access_token, refresh_token, expires_at): """Set Google authentication tokens for the user""" self.sudo().write({ 'google_rtoken': refresh_token, 'google_token': access_token, 'google_token_validity': expires_at if expires_at else False, }) def _google_authenticated(self): """Check if user is authenticated with Google""" self.ensure_one() return bool(self.sudo().google_rtoken) def _is_google_token_valid(self): """Check if Google token is still valid""" self.ensure_one() return (self.sudo().google_token_validity and self.sudo().google_token_validity >= (fields.Datetime.now() + timedelta(minutes=1))) def _refresh_google_token(self): """Refresh Google access token using refresh token""" self.ensure_one() try: access_token, ttl = self.env['google.service']._refresh_google_token('drive', self.sudo().google_rtoken) self.sudo().write({ 'google_token': access_token, 'google_token_validity': fields.Datetime.now() + timedelta(seconds=ttl), }) _logger.info(f"Google token refreshed for user {self.user_id.name}") return True except Exception as e: _logger.error(f"Failed to refresh Google token for user {self.user_id.name}: {str(e)}") # Delete invalid tokens self.env.cr.rollback() self.sudo()._set_google_auth_tokens(False, False, False) self.env.cr.commit() return False def _get_google_access_token(self): """Get a valid Google access token, refreshing if necessary""" self.ensure_one() if not self._google_authenticated(): return None if not self._is_google_token_valid(): if not self._refresh_google_token(): return None return self.sudo().google_token def action_connect_google(self): """Connect user's Google account""" self.ensure_one() # Get Google API credentials from system settings config = self.env['ir.config_parameter'].sudo() client_id = config.get_param('google_api.client_id', '') client_secret = config.get_param('google_api.client_secret', '') if not client_id or not client_secret: _logger.error("Google API credentials not configured.") raise UserError(_('Google API credentials not configured. Please contact your administrator.')) # Build OAuth authorization URL base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url') redirect_uri = f"{base_url}/web/google_oauth_callback" scope = 'https://www.googleapis.com/auth/drive https://www.googleapis.com/auth/userinfo.email https://www.googleapis.com/auth/calendar' # Generate the authorization URL using our own credentials authorize_uri = self._get_google_authorize_uri( client_id=client_id, redirect_uri=redirect_uri, scope=scope, state=str(self.id) # Pass user settings ID as state ) return { 'type': 'ir.actions.act_url', 'url': authorize_uri, 'target': 'new', } def action_disconnect_google(self): """Disconnect user's Google account""" self.ensure_one() self.sudo()._set_google_auth_tokens(False, False, False) self.sudo().write({ 'google_email': False, 'google_sync_enabled': False, }) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('Success'), 'message': _('Google account disconnected successfully.'), 'type': 'success', 'sticky': False, } } def action_test_google_connection(self): """Test Google connection for the user""" self.ensure_one() if not self._google_authenticated(): raise UserError(_('Google account not connected. Please connect your account first.')) access_token = self._get_google_access_token() if not access_token: raise UserError(_('Could not obtain valid access token. Please reconnect your Google account.')) try: # Test Google Drive API access headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } response = self.env['google.drive.service']._do_request( '/drive/v3/about', params={'fields': 'user'}, headers=headers ) if response and 'user' in response: user_email = response['user'].get('emailAddress', 'Unknown') self.sudo().write({'google_email': user_email}) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('Success'), 'message': _('Google connection successful! Connected as: %s') % user_email, 'type': 'success', 'sticky': False, } } else: raise UserError(_('Could not retrieve user information from Google.')) except Exception as e: _logger.error(f"Google connection test failed for user {self.user_id.name}: {str(e)}") raise UserError(_('Google connection test failed: %s') % str(e)) def _get_google_authorize_uri(self, client_id, redirect_uri, scope, state): """Generate Google OAuth authorization URI using our own credentials""" import urllib.parse params = { 'response_type': 'code', 'client_id': client_id, 'redirect_uri': redirect_uri, 'scope': scope, 'state': state, 'access_type': 'offline', 'prompt': 'consent select_account', # Force account selection 'hd': '', # Allow any hosted domain 'include_granted_scopes': 'true' } base_url = 'https://accounts.google.com/o/oauth2/auth' query_string = urllib.parse.urlencode(params) return f"{base_url}?{query_string}" def _exchange_authorization_code_for_tokens(self, code, client_id, client_secret, redirect_uri): """Exchange authorization code for access and refresh tokens""" import requests token_url = 'https://oauth2.googleapis.com/token' data = { 'code': code, 'client_id': client_id, 'client_secret': client_secret, 'redirect_uri': redirect_uri, 'grant_type': 'authorization_code' } response = requests.post(token_url, data=data) if response.status_code != 200: _logger.error(f"Token exchange failed: {response.status_code} - {response.text}") raise UserError(_('Failed to exchange authorization code for tokens: %s') % response.text) token_data = response.json() access_token = token_data.get('access_token') refresh_token = token_data.get('refresh_token') expires_in = token_data.get('expires_in', 3600) if not access_token: raise UserError(_('No access token received from Google')) # Calculate expiration time from datetime import datetime, timedelta expires_at = datetime.now() + timedelta(seconds=expires_in) return access_token, refresh_token, expires_at