# -*- coding: utf-8 -*- import logging import requests from odoo import models, api from odoo.exceptions import UserError from odoo.tools.translate import _ _logger = logging.getLogger(__name__) class GoogleAuthService(models.AbstractModel): _name = 'google.auth.service' _description = 'Google Authentication Service' def test_connection(self): """Test Google Drive API connection""" try: access_token = self._get_access_token() if not access_token: return False headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } response = requests.get( 'https://www.googleapis.com/drive/v3/about', headers=headers, params={'fields': 'user'}, timeout=10 ) if response.status_code == 200: return True elif response.status_code == 401: _logger.warning("Google Drive access token is invalid (401)") # Try to refresh the token if self.refresh_access_token(): return self.test_connection() return False else: _logger.warning(f"Google Drive API test failed with status {response.status_code}") return False except requests.exceptions.Timeout: _logger.error("Google Drive API test timeout") return False except requests.exceptions.ConnectionError: _logger.error("Google Drive API connection error") return False except Exception as e: _logger.error(f"Google Drive API test error: {str(e)}") return False def refresh_access_token(self): """Refresh the access token using the refresh token""" try: refresh_token = self.env['ir.config_parameter'].sudo().get_param('google_api.refresh_token') client_id = self.env['ir.config_parameter'].sudo().get_param('google_api.client_id') client_secret = self.env['ir.config_parameter'].sudo().get_param('google_api.client_secret') if not all([refresh_token, client_id, client_secret]): _logger.error("Missing OAuth2 credentials") return False # Exchange refresh token for new access token token_url = 'https://oauth2.googleapis.com/token' data = { 'client_id': client_id, 'client_secret': client_secret, 'refresh_token': refresh_token, 'grant_type': 'refresh_token' } response = requests.post(token_url, data=data, timeout=30) if response.status_code == 200: token_data = response.json() new_access_token = token_data.get('access_token') if new_access_token: # Store the new access token self.env['ir.config_parameter'].sudo().set_param('google_api.access_token', new_access_token) _logger.info("Access token refreshed successfully") return True else: _logger.error("No access token in response") return False else: _logger.error(f"Failed to refresh token: {response.status_code} - {response.text}") return False except Exception as e: _logger.error(f"Failed to refresh access token: {str(e)}") return False def get_access_token(self, user=None): """Get a valid access token for the specified user, refreshing if necessary""" if not user: user = self.env.user try: # Get user's Google Drive settings user_settings = user.res_users_settings_id if not user_settings: return None # Get access token from user settings return user_settings._get_google_access_token() except Exception as e: _logger.error(f"Error getting access token for user {user.name}: {str(e)}") return None def _test_token_validity(self, access_token): """Test if an access token is still valid""" try: headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } response = requests.get( 'https://www.googleapis.com/drive/v3/about', headers=headers, params={'fields': 'user'}, timeout=5 ) return response.status_code == 200 except Exception as e: _logger.error(f"Error testing token validity: {str(e)}") return False def validate_credentials(self): """Validate that all required OAuth2 credentials are configured""" try: client_id = self.env['ir.config_parameter'].sudo().get_param('google_api.client_id') client_secret = self.env['ir.config_parameter'].sudo().get_param('google_api.client_secret') refresh_token = self.env['ir.config_parameter'].sudo().get_param('google_api.refresh_token') missing_credentials = [] if not client_id: missing_credentials.append('Client ID') if not client_secret: missing_credentials.append('Client Secret') if not refresh_token: missing_credentials.append('Refresh Token') if missing_credentials: raise UserError(_('Missing Google API credentials: %s') % ', '.join(missing_credentials)) return True except Exception as e: _logger.error(f"Error validating credentials: {str(e)}") raise def get_auth_headers(self, user=None): """Get authentication headers for API requests""" try: access_token = self.get_access_token(user) if not access_token: raise UserError(_('Could not obtain valid access token for user %s') % (user.name if user else self.env.user.name)) return { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } except Exception as e: _logger.error(f"Error getting auth headers: {str(e)}") raise