| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- # -*- coding: utf-8 -*-
- import logging
- from odoo import models, api
- from odoo.exceptions import UserError
- from odoo.tools.translate import _
- _logger = logging.getLogger(__name__)
- class GoogleWorkspaceService(models.AbstractModel):
- _name = 'google.workspace.service'
- _description = 'Google Workspace Service'
- def list_shared_drives(self):
- """List all accessible Shared Drives"""
- try:
- drive_service = self.env['google.drive.service']
-
- # Use the drive service to make the request
- response = drive_service._do_request('/drive/v3/drives', {
- 'pageSize': 100,
- 'fields': 'drives(id,name,createdTime,capabilities)'
- })
-
- shared_drives = response.get('drives', [])
-
- return {
- 'success': True,
- 'shared_drives': shared_drives
- }
-
- except Exception as e:
- _logger.error(f"Error listing shared drives: {str(e)}")
- return {
- 'success': False,
- 'error': str(e)
- }
- def get_shared_drive_info(self, drive_id):
- """Get information about a specific Shared Drive"""
- try:
- drive_service = self.env['google.drive.service']
-
- response = drive_service._do_request(f'/drive/v3/drives/{drive_id}', {
- 'fields': 'id,name,createdTime,capabilities,restrictions'
- })
-
- return {
- 'success': True,
- 'drive_info': response
- }
-
- except Exception as e:
- _logger.error(f"Error getting shared drive info: {str(e)}")
- return {
- 'success': False,
- 'error': str(e)
- }
- def list_files_in_shared_drive(self, drive_id, folder_id=None):
- """List files in a Shared Drive or specific folder within it"""
- try:
- drive_service = self.env['google.drive.service']
-
- # Build query
- query = f"'{drive_id}' in parents and trashed=false"
- if folder_id:
- query = f"'{folder_id}' in parents and trashed=false"
-
- params = {
- 'q': query,
- 'fields': 'files(id,name,mimeType,createdTime,modifiedTime,parents)',
- 'pageSize': 100,
- 'orderBy': 'name'
- }
-
- # Add Shared Drive parameters
- params.update(drive_service._build_shared_drive_params())
-
- response = drive_service._do_request('/drive/v3/files', params)
-
- return {
- 'success': True,
- 'files': response.get('files', [])
- }
-
- except Exception as e:
- _logger.error(f"Error listing files in shared drive: {str(e)}")
- return {
- 'success': False,
- 'error': str(e)
- }
- def check_shared_drive_permissions(self, drive_id):
- """Check current user's permissions on a Shared Drive"""
- try:
- drive_service = self.env['google.drive.service']
-
- # Get drive info with capabilities
- response = drive_service._do_request(f'/drive/v3/drives/{drive_id}', {
- 'fields': 'capabilities'
- })
-
- capabilities = response.get('capabilities', {})
-
- permissions = {
- 'can_add_children': capabilities.get('canAddChildren', False),
- 'can_comment': capabilities.get('canComment', False),
- 'can_copy': capabilities.get('canCopy', False),
- 'can_delete': capabilities.get('canDelete', False),
- 'can_download': capabilities.get('canDownload', False),
- 'can_edit': capabilities.get('canEdit', False),
- 'can_list_children': capabilities.get('canListChildren', False),
- 'can_move_item_into_team_drive': capabilities.get('canMoveItemIntoTeamDrive', False),
- 'can_move_item_out_of_team_drive': capabilities.get('canMoveItemOutOfTeamDrive', False),
- 'can_move_item_within_team_drive': capabilities.get('canMoveItemWithinTeamDrive', False),
- 'can_read': capabilities.get('canRead', False),
- 'can_read_revisions': capabilities.get('canReadRevisions', False),
- 'can_remove_children': capabilities.get('canRemoveChildren', False),
- 'can_rename': capabilities.get('canRename', False),
- 'can_share': capabilities.get('canShare', False),
- 'can_trash': capabilities.get('canTrash', False),
- 'can_trash_children': capabilities.get('canTrashChildren', False),
- }
-
- return {
- 'success': True,
- 'permissions': permissions
- }
-
- except Exception as e:
- _logger.error(f"Error checking shared drive permissions: {str(e)}")
- return {
- 'success': False,
- 'error': str(e)
- }
- def create_folder_in_shared_drive(self, name, drive_id, parent_folder_id=None, description=None):
- """Create a folder within a Shared Drive"""
- try:
- drive_service = self.env['google.drive.service']
-
- folder_metadata = {
- 'name': name,
- 'mimeType': 'application/vnd.google-apps.folder'
- }
-
- if parent_folder_id:
- folder_metadata['parents'] = [parent_folder_id]
-
- if description:
- folder_metadata['description'] = description
-
- # Add Shared Drive parameters
- params = drive_service._build_shared_drive_params()
-
- # Build URL with parameters
- url = '/drive/v3/files'
- if params:
- param_string = '&'.join([f"{k}={v}" for k, v in params.items()])
- url = f"{url}?{param_string}"
-
- response = drive_service._do_request(url, folder_metadata, method='POST')
-
- return {
- 'success': True,
- 'folder_id': response.get('id'),
- 'folder_name': response.get('name'),
- 'folder_url': f"https://drive.google.com/drive/folders/{response.get('id')}"
- }
-
- except Exception as e:
- _logger.error(f"Error creating folder in shared drive: {str(e)}")
- return {
- 'success': False,
- 'error': str(e)
- }
- def validate_shared_drive_access(self, drive_id):
- """Validate that the current user has access to a Shared Drive"""
- try:
- # Try to get drive info
- drive_info = self.get_shared_drive_info(drive_id)
-
- if not drive_info.get('success'):
- return {
- 'valid': False,
- 'error': drive_info.get('error', 'Unknown error')
- }
-
- # Check permissions
- permissions = self.check_shared_drive_permissions(drive_id)
-
- if not permissions.get('success'):
- return {
- 'valid': False,
- 'error': permissions.get('error', 'Cannot check permissions')
- }
-
- # Check if user has at least read access
- user_permissions = permissions.get('permissions', {})
- if not user_permissions.get('can_read', False):
- return {
- 'valid': False,
- 'error': 'No read access to this Shared Drive'
- }
-
- return {
- 'valid': True,
- 'drive_name': drive_info.get('drive_info', {}).get('name', 'Unknown'),
- 'permissions': user_permissions
- }
-
- except Exception as e:
- _logger.error(f"Error validating shared drive access: {str(e)}")
- return {
- 'valid': False,
- 'error': str(e)
- }
- def search_shared_drives_by_name(self, name_pattern):
- """Search for Shared Drives by name pattern"""
- try:
- import re
-
- # Get all shared drives
- shared_drives_result = self.list_shared_drives()
-
- if not shared_drives_result.get('success'):
- return {
- 'success': False,
- 'error': shared_drives_result.get('error', 'Cannot list shared drives')
- }
-
- shared_drives = shared_drives_result.get('shared_drives', [])
-
- # Filter by name pattern
- pattern = re.compile(name_pattern, re.IGNORECASE)
- matching_drives = [drive for drive in shared_drives if pattern.search(drive.get('name', ''))]
-
- return {
- 'success': True,
- 'matching_drives': matching_drives
- }
-
- except Exception as e:
- _logger.error(f"Error searching shared drives: {str(e)}")
- return {
- 'success': False,
- 'error': str(e)
- }
|