| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- # -*- coding: utf-8 -*-
- # Part of Odoo. See LICENSE file for full copyright and licensing details.
- from datetime import timedelta
- import logging
- from odoo import api, fields, models, _
- from odoo.exceptions import UserError
- _logger = logging.getLogger(__name__)
- class ResUsersSettings(models.Model):
- _inherit = "res.users.settings"
- # Google API tokens and synchronization information
- google_rtoken = fields.Char('Google Refresh Token', copy=False, groups='base.group_system')
- google_token = fields.Char('Google User Token', copy=False, groups='base.group_system')
- google_token_validity = fields.Datetime('Google Token Validity', copy=False, groups='base.group_system')
- google_email = fields.Char('Google Email', copy=False, groups='base.group_system')
- google_sync_enabled = fields.Boolean('Google Sync Enabled', default=False, copy=False, groups='base.group_system')
-
- # CRM Meets Files Configuration
- google_crm_meets_folder_id = fields.Char(
- string='Archivos Meets CRM',
- help='ID de la carpeta en Google Drive donde se almacenan archivos de meets para sincronización con CRM',
- copy=False,
- groups='base.group_system'
- )
- @api.model
- def _get_fields_blacklist(self):
- """Get list of google drive fields that won't be formatted in session_info."""
- google_fields_blacklist = [
- 'google_rtoken',
- 'google_token',
- 'google_token_validity',
- 'google_email',
- 'google_sync_enabled',
- 'google_crm_meets_folder_id'
- ]
- return super()._get_fields_blacklist() + google_fields_blacklist
- def _set_google_auth_tokens(self, access_token, refresh_token, expires_at):
- """Set Google authentication tokens for the user"""
- self.sudo().write({
- 'google_rtoken': refresh_token,
- 'google_token': access_token,
- 'google_token_validity': expires_at if expires_at else False,
- })
- def _google_authenticated(self):
- """Check if user is authenticated with Google"""
- self.ensure_one()
- return bool(self.sudo().google_rtoken)
- def _is_google_token_valid(self):
- """Check if Google token is still valid"""
- self.ensure_one()
- return (self.sudo().google_token_validity and
- self.sudo().google_token_validity >= (fields.Datetime.now() + timedelta(minutes=1)))
- def _refresh_google_token(self):
- """Refresh Google access token using refresh token"""
- self.ensure_one()
- try:
- access_token, ttl = self.env['google.service']._refresh_google_token('drive', self.sudo().google_rtoken)
- self.sudo().write({
- 'google_token': access_token,
- 'google_token_validity': fields.Datetime.now() + timedelta(seconds=ttl),
- })
- _logger.info(f"Google token refreshed for user {self.user_id.name}")
- return True
- except Exception as e:
- _logger.error(f"Failed to refresh Google token for user {self.user_id.name}: {str(e)}")
- # Delete invalid tokens
- self.env.cr.rollback()
- self.sudo()._set_google_auth_tokens(False, False, False)
- self.env.cr.commit()
- return False
- def _get_google_access_token(self):
- """Get a valid Google access token, refreshing if necessary"""
- self.ensure_one()
-
- if not self._google_authenticated():
- return None
-
- if not self._is_google_token_valid():
- if not self._refresh_google_token():
- return None
-
- return self.sudo().google_token
- def action_connect_google(self):
- """Connect user's Google account"""
- self.ensure_one()
-
- # Get Google API credentials from system settings
- config = self.env['ir.config_parameter'].sudo()
- client_id = config.get_param('google_api.client_id', '')
- client_secret = config.get_param('google_api.client_secret', '')
- if not client_id or not client_secret:
- _logger.error("Google API credentials not configured.")
- raise UserError(_('Google API credentials not configured. Please contact your administrator.'))
-
- # Build OAuth authorization URL
- base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
- redirect_uri = f"{base_url}/web/google_oauth_callback"
-
- scope = 'https://www.googleapis.com/auth/drive https://www.googleapis.com/auth/userinfo.email https://www.googleapis.com/auth/calendar'
-
- # Generate the authorization URL using our own credentials
- authorize_uri = self._get_google_authorize_uri(
- client_id=client_id,
- redirect_uri=redirect_uri,
- scope=scope,
- state=str(self.id) # Pass user settings ID as state
- )
-
- return {
- 'type': 'ir.actions.act_url',
- 'url': authorize_uri,
- 'target': 'new',
- }
- def action_disconnect_google(self):
- """Disconnect user's Google account"""
- self.ensure_one()
-
- self.sudo()._set_google_auth_tokens(False, False, False)
- self.sudo().write({
- 'google_email': False,
- 'google_sync_enabled': False,
- })
-
- return {
- 'type': 'ir.actions.client',
- 'tag': 'display_notification',
- 'params': {
- 'title': _('Success'),
- 'message': _('Google account disconnected successfully.'),
- 'type': 'success',
- 'sticky': False,
- }
- }
- def action_test_google_connection(self):
- """Test Google connection for the user"""
- self.ensure_one()
-
- if not self._google_authenticated():
- raise UserError(_('Google account not connected. Please connect your account first.'))
-
- access_token = self._get_google_access_token()
- if not access_token:
- raise UserError(_('Could not obtain valid access token. Please reconnect your Google account.'))
-
- try:
- # Test Google Drive API access
- headers = {
- 'Authorization': f'Bearer {access_token}',
- 'Content-Type': 'application/json'
- }
-
- response = self.env['google.drive.service']._do_request(
- '/drive/v3/about',
- params={'fields': 'user'},
- headers=headers
- )
-
- if response and 'user' in response:
- user_email = response['user'].get('emailAddress', 'Unknown')
- self.sudo().write({'google_email': user_email})
-
- return {
- 'type': 'ir.actions.client',
- 'tag': 'display_notification',
- 'params': {
- 'title': _('Success'),
- 'message': _('Google connection successful! Connected as: %s') % user_email,
- 'type': 'success',
- 'sticky': False,
- }
- }
- else:
- raise UserError(_('Could not retrieve user information from Google.'))
-
- except Exception as e:
- _logger.error(f"Google connection test failed for user {self.user_id.name}: {str(e)}")
- raise UserError(_('Google connection test failed: %s') % str(e))
- def _get_google_authorize_uri(self, client_id, redirect_uri, scope, state):
- """Generate Google OAuth authorization URI using our own credentials"""
- import urllib.parse
-
- params = {
- 'response_type': 'code',
- 'client_id': client_id,
- 'redirect_uri': redirect_uri,
- 'scope': scope,
- 'state': state,
- 'access_type': 'offline',
- 'prompt': 'consent select_account', # Force account selection
- 'hd': '', # Allow any hosted domain
- 'include_granted_scopes': 'true'
- }
-
- base_url = 'https://accounts.google.com/o/oauth2/auth'
- query_string = urllib.parse.urlencode(params)
- return f"{base_url}?{query_string}"
- def _exchange_authorization_code_for_tokens(self, code, client_id, client_secret, redirect_uri):
- """Exchange authorization code for access and refresh tokens"""
- import requests
-
- token_url = 'https://oauth2.googleapis.com/token'
-
- data = {
- 'code': code,
- 'client_id': client_id,
- 'client_secret': client_secret,
- 'redirect_uri': redirect_uri,
- 'grant_type': 'authorization_code'
- }
-
- response = requests.post(token_url, data=data)
-
- if response.status_code != 200:
- _logger.error(f"Token exchange failed: {response.status_code} - {response.text}")
- raise UserError(_('Failed to exchange authorization code for tokens: %s') % response.text)
-
- token_data = response.json()
-
- access_token = token_data.get('access_token')
- refresh_token = token_data.get('refresh_token')
- expires_in = token_data.get('expires_in', 3600)
-
- if not access_token:
- raise UserError(_('No access token received from Google'))
-
- # Calculate expiration time
- from datetime import datetime, timedelta
- expires_at = datetime.now() + timedelta(seconds=expires_in)
-
- return access_token, refresh_token, expires_at
|