||
- # -*- coding: utf-8 -*-
- # Part of Odoo. See LICENSE file for full copyright and licensing details.
- import logging
- import pytz
- from datetime import datetime, date
- from dateutil.relativedelta import relativedelta
- from odoo import api, fields, models, _
- from odoo.exceptions import UserError
- from odoo.tools import float_round
- _logger = logging.getLogger(__name__)
- class HrEfficiency(models.Model):
- _name = 'hr.efficiency'
- _description = 'Employee Efficiency'
- _order = 'month_year desc, employee_id'
- _rec_name = 'display_name'
- _active_name = 'active'
- # Basic fields
- name = fields.Char('Name', compute='_compute_display_name', store=True)
- employee_id = fields.Many2one('hr.employee', 'Employee', required=True, domain=[('employee_type', '=', 'employee')])
- month_year = fields.Char('Month Year', required=True, help="Format: YYYY-MM (e.g., 2024-08)")
- date = fields.Date('Date', compute='_compute_date', store=True, help="Date field for standard Odoo date filters")
- company_id = fields.Many2one('res.company', 'Company', default=lambda self: self.env.company)
- active = fields.Boolean('Active', default=True)
- calculation_date = fields.Datetime('Calculation Date', default=fields.Datetime.now, help='When this calculation was performed')
-
- # Available hours (what employee should work)
- available_hours = fields.Float('Available Hours', digits=(10, 2), help="Total hours employee should work in the month")
-
- # Employee contract information
- wage = fields.Float('Gross Salary', digits=(10, 2), aggregator='sum', help="Employee's gross salary from their contract at the time of calculation")
- currency_id = fields.Many2one('res.currency', 'Currency', help="Currency for the wage field")
-
- # Employee utilization and company overhead (stored at calculation time)
- utilization_rate = fields.Float('Utilization Rate (%)', digits=(5, 2), default=100.0, aggregator='avg', help="Employee's utilization rate at the time of calculation")
- overhead = fields.Float('Company Overhead (%)', digits=(5, 2), default=40.0, aggregator='avg', help="Company's overhead percentage at the time of calculation")
-
- # Planned hours (what was planned)
- planned_hours = fields.Float('Planned Hours', digits=(10, 2), help="Total hours planned for the month")
- planned_billable_hours = fields.Float('Planned Billable Hours', digits=(10, 2), help="Hours planned on billable projects")
- planned_non_billable_hours = fields.Float('Planned Non-Billable Hours', digits=(10, 2), help="Hours planned on non-billable projects")
-
- # Actual hours
- actual_billable_hours = fields.Float('Actual Billable Hours', digits=(10, 2), help="Hours actually worked on billable projects")
- actual_non_billable_hours = fields.Float('Actual Non-Billable Hours', digits=(10, 2), help="Hours actually worked on non-billable projects")
-
- # Calculated fields (stored for performance)
- total_actual_hours = fields.Float('Total Actual Hours', digits=(10, 2), help="Total actual hours (billable + non-billable)")
- expected_hours_to_date = fields.Float('Expected Hours to Date', digits=(10, 2), help='Hours that should be registered based on planning until current date')
-
- # Precio por Hora (stored field)
- precio_por_hora = fields.Float('Precio por Hora', digits=(10, 2), help='Precio que cobramos al cliente por hora (costo + overhead + 30% rentabilidad)', aggregator='avg')
-
- # Dynamic indicator fields (will be created automatically)
- # These fields are managed dynamically based on hr.efficiency.indicator records
-
- # Overall efficiency (always present) - Now stored fields calculated on save
- overall_efficiency = fields.Float('Overall Efficiency (%)', digits=(5, 2), help='Overall efficiency based on configured indicators')
- overall_efficiency_display = fields.Char('Overall Efficiency Display', help='Overall efficiency formatted for display with badge widget')
-
- display_name = fields.Char('Display Name', compute='_compute_display_name', store=True)
-
- # Note: Removed unique constraint to allow historical tracking
- # Multiple records can exist for the same employee and month
- @api.depends('month_year')
- def _compute_date(self):
- """
- Compute date field from month_year for standard Odoo date filters
- """
- for record in self:
- if record.month_year:
- try:
- year, month = record.month_year.split('-')
- record.date = date(int(year), int(month), 1)
- except (ValueError, AttributeError):
- record.date = False
- else:
- record.date = False
- def _calculate_expected_hours_to_date(self, record):
- """
- Calculate expected hours to date based on available hours and working days
- """
- if not record.month_year or not record.available_hours or not record.employee_id.contract_id:
- return 0.0
-
- try:
- # Parse month_year (format: YYYY-MM)
- year, month = record.month_year.split('-')
- start_date = date(int(year), int(month), 1)
- end_date = (start_date + relativedelta(months=1)) - relativedelta(days=1)
-
- # Get current date
- current_date = date.today()
-
- # If current date is outside the month, use end of month
- if current_date > end_date:
- calculation_date = end_date
- elif current_date < start_date:
- calculation_date = start_date
- else:
- calculation_date = current_date
-
- # Calculate working days in the month
- total_working_days = self._count_working_days(start_date, end_date, record.employee_id)
-
- # Calculate working days until current date
- working_days_until_date = self._count_working_days(start_date, calculation_date, record.employee_id)
-
- if total_working_days > 0:
- # Calculate expected hours based on available hours (what employee should work)
- expected_hours = (record.available_hours / total_working_days) * working_days_until_date
-
- # Ensure we don't exceed available hours for the month
- expected_hours = min(expected_hours, record.available_hours)
-
- return float_round(expected_hours, 2)
- else:
- return 0.0
-
- except (ValueError, AttributeError) as e:
- return 0.0
- def _calculate_precio_por_hora(self, record):
- """
- Calculate precio por hora: (wage / available_hours) * (1 + overhead/100) * 1.30
- """
- try:
- wage = getattr(record, 'wage', 0.0) or 0.0
- available_hours = getattr(record, 'available_hours', 0.0) or 0.0
- overhead = getattr(record, 'overhead', 40.0) or 40.0
-
- if available_hours > 0:
- # Calculate cost per hour
- cost_per_hour = wage / available_hours
- # Apply overhead and 30% profit margin
- precio_por_hora = cost_per_hour * (1 + (overhead / 100)) * 1.30
- return float_round(precio_por_hora, 2)
- else:
- return 0.0
-
- except (ValueError, AttributeError, ZeroDivisionError) as e:
- return 0.0
- def _calculate_all_indicators(self):
- """
- Calculate all indicators and overall efficiency for stored fields
- This method is called on create/write instead of using computed fields
- """
- # Prepare values to update without triggering write recursively
- values_to_update = {}
-
- # Pre-fetch necessary fields to avoid CacheMiss during iteration
- fields_to_load = [
- 'available_hours', 'planned_hours', 'planned_billable_hours',
- 'planned_non_billable_hours', 'actual_billable_hours',
- 'actual_non_billable_hours', 'total_actual_hours',
- 'expected_hours_to_date', 'wage', 'utilization_rate', 'overhead',
- 'precio_por_hora', 'employee_id', 'company_id', 'month_year', 'active',
- ]
-
- # Load fields for all records in 'self' with error handling
- try:
- self.read(fields_to_load)
- except Exception as e:
- import logging
- _logger = logging.getLogger(__name__)
- _logger.warning(f"Error loading fields: {e}")
- # Continue with empty cache if needed
-
- for record in self:
- # Get all manual fields for this model
- manual_fields = self.env['ir.model.fields'].search([
- ('model', '=', 'hr.efficiency'),
- ('state', '=', 'manual'),
- ('ttype', '=', 'float')
- ])
-
- # Prepare efficiency data with safe field access
- efficiency_data = {
- 'available_hours': getattr(record, 'available_hours', 0.0) or 0.0,
- 'planned_hours': getattr(record, 'planned_hours', 0.0) or 0.0,
- 'planned_billable_hours': getattr(record, 'planned_billable_hours', 0.0) or 0.0,
- 'planned_non_billable_hours': getattr(record, 'planned_non_billable_hours', 0.0) or 0.0,
- 'actual_billable_hours': getattr(record, 'actual_billable_hours', 0.0) or 0.0,
- 'actual_non_billable_hours': getattr(record, 'actual_non_billable_hours', 0.0) or 0.0,
- 'total_actual_hours': getattr(record, 'total_actual_hours', 0.0) or 0.0,
- 'expected_hours_to_date': getattr(record, 'expected_hours_to_date', 0.0) or 0.0,
- 'wage': getattr(record, 'wage', 0.0) or 0.0,
- 'utilization_rate': getattr(record, 'utilization_rate', 100.0) or 100.0,
- 'overhead': getattr(record, 'overhead', 40.0) or 40.0,
- 'precio_por_hora': getattr(record, 'precio_por_hora', 0.0) or 0.0,
- }
-
- # STEP 1: Calculate total_actual_hours FIRST (needed for indicators)
- try:
- total_actual_hours = getattr(record, 'actual_billable_hours', 0.0) + getattr(record, 'actual_non_billable_hours', 0.0)
- except Exception as e:
- import logging
- _logger = logging.getLogger(__name__)
- _logger.error(f"Error calculating total_actual_hours for record {record.id}: {e}")
- total_actual_hours = 0.0
- record_values = {'total_actual_hours': float_round(total_actual_hours, 2)}
-
- # STEP 2: Calculate expected_hours_to_date
- expected_hours = self._calculate_expected_hours_to_date(record)
- record_values['expected_hours_to_date'] = expected_hours
-
- # STEP 3: Calculate precio_por_hora (needed for profitability indicators)
- precio_por_hora = self._calculate_precio_por_hora(record)
- record_values['precio_por_hora'] = precio_por_hora
-
- # STEP 4: Update efficiency_data with ALL calculated base fields
- efficiency_data.update({
- 'total_actual_hours': total_actual_hours,
- 'expected_hours_to_date': expected_hours,
- 'precio_por_hora': precio_por_hora,
- })
-
- # STEP 5: Calculate all indicators dynamically (in sequence order)
- active_indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
-
- for indicator in active_indicators:
- field_name = self._get_indicator_field_name(indicator.name)
-
- # Check if the field exists in the record
- if field_name in record._fields:
- # Calculate indicator value using the indicator formula
- indicator_value = indicator.evaluate_formula(efficiency_data)
-
- # Store the value to update later
- record_values[field_name] = float_round(indicator_value, 2)
-
- # Calculate overall efficiency
- overall_efficiency = self._calculate_overall_efficiency(record)
- record_values['overall_efficiency'] = overall_efficiency
-
- # Calculate overall efficiency display
- if overall_efficiency == 0:
- record_values['overall_efficiency_display'] = '0.00'
- else:
- record_values['overall_efficiency_display'] = f"{overall_efficiency:.2f}"
-
- # Store values for this record
- values_to_update[record.id] = record_values
-
- # Update all records at once to avoid recursion
- for record_id, values in values_to_update.items():
- record = self.browse(record_id)
- # Use direct SQL update to avoid triggering write method
- if values:
- record.env.cr.execute(
- "UPDATE hr_efficiency SET " +
- ", ".join([f"{key} = %s" for key in values.keys()]) +
- " WHERE id = %s",
- list(values.values()) + [record_id]
- )
-
- # Update aggregation fields after calculating indicators
- self._update_aggregation_fields()
-
- def _update_aggregation_fields(self):
- """
- Actualiza los campos de agregación con los valores de los indicadores dinámicos
- """
- try:
- for record in self:
- # Obtener todos los indicadores activos (en orden de secuencia)
- active_indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
-
- for indicator in active_indicators:
- field_name = self._get_indicator_field_name(indicator.name)
- agg_field_name = f"{field_name}_agg"
-
- # Verificar si el campo de agregación existe
- if hasattr(record, agg_field_name):
- # Obtener el valor del indicador dinámico
- indicator_value = getattr(record, field_name, 0.0) or 0.0
-
- # Actualizar el campo de agregación usando SQL directo
- record.env.cr.execute(
- "UPDATE hr_efficiency SET %s = %%s WHERE id = %%s" % agg_field_name,
- [indicator_value, record.id]
- )
-
- except Exception as e:
- import logging
- _logger = logging.getLogger(__name__)
- _logger.warning(f"Error updating aggregation fields: {str(e)}")
-
- def _update_stored_manual_fields(self):
- """Update stored manual fields with computed values"""
- for record in self:
- # Get all manual fields for this model
- manual_fields = self.env['ir.model.fields'].search([
- ('model', '=', 'hr.efficiency'),
- ('state', '=', 'manual'),
- ('ttype', '=', 'float'),
- ('store', '=', True),
- ], order='id')
-
- # Prepare efficiency data with safe field access
- efficiency_data = {
- 'available_hours': record.available_hours or 0.0,
- 'planned_hours': record.planned_hours or 0.0,
- 'planned_billable_hours': record.planned_billable_hours or 0.0,
- 'planned_non_billable_hours': record.planned_non_billable_hours or 0.0,
- 'actual_billable_hours': record.actual_billable_hours or 0.0,
- 'actual_non_billable_hours': record.actual_non_billable_hours or 0.0,
- 'total_actual_hours': record.total_actual_hours or 0.0,
- 'expected_hours_to_date': record.expected_hours_to_date or 0.0,
- 'wage': record.wage or 0.0,
- 'utilization_rate': record.utilization_rate or 100.0,
- 'overhead': record.overhead or 40.0,
- }
-
- # Calculate and update stored manual fields
- for field in manual_fields:
- # Find the corresponding indicator
- indicator = self.env['hr.efficiency.indicator'].search([
- ('name', 'ilike', field.field_description)
- ], limit=1)
-
- if indicator and field.name in record._fields:
- # Calculate indicator value using the indicator formula
- indicator_value = indicator.evaluate_formula(efficiency_data)
-
- # Update the stored field value
- record[field.name] = float_round(indicator_value, 2)
-
- @api.model
- def _recompute_all_indicators(self):
- """Recompute all indicator fields for all records"""
- records = self.search([])
- if records:
- records._calculate_all_indicators()
-
- def _get_indicator_field_name(self, indicator_name):
- """
- Convert indicator name to valid field name
- """
- import re
-
- # Remove special characters and convert to lowercase
- field_name = indicator_name.lower()
-
- # Replace spaces, hyphens, and other special characters with underscores
- field_name = re.sub(r'[^a-z0-9_]', '_', field_name)
-
- # Remove multiple consecutive underscores
- field_name = re.sub(r'_+', '_', field_name)
-
- # Remove leading and trailing underscores
- field_name = field_name.strip('_')
-
- # Ensure it starts with x_ for manual fields
- if not field_name.startswith('x_'):
- field_name = 'x_' + field_name
-
- # Ensure it doesn't exceed 63 characters (PostgreSQL limit)
- if len(field_name) > 63:
- field_name = field_name[:63]
-
- return field_name
-
-
- @api.model
- def _create_default_dynamic_fields(self):
- """
- Create default dynamic field records for existing indicators
- """
- # Get all active indicators
- indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
-
- for indicator in indicators:
- # Check if field already exists in ir.model.fields
- field_name = self._get_indicator_field_name(indicator.name)
-
- existing_field = self.env['ir.model.fields'].search([
- ('model', '=', 'hr.efficiency'),
- ('name', '=', field_name)
- ], limit=1)
-
- if not existing_field:
- # Create field in ir.model.fields (like Studio does)
- self.env['ir.model.fields'].create({
- 'name': field_name,
- 'model': 'hr.efficiency',
- 'model_id': self.env['ir.model'].search([('model', '=', 'hr.efficiency')], limit=1).id,
- 'ttype': 'float',
- 'field_description': indicator.name,
- 'state': 'manual', # This is the key - it makes it a custom field
- 'store': True,
- 'compute': '_compute_indicators',
- })
-
- def _calculate_overall_efficiency(self, record):
- """
- Calculate overall efficiency based on configured indicators
- """
- indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
-
- if not indicators:
- # Default calculation if no indicators configured
- return 0.0
-
- # Check if there's any data to calculate
- if (record.available_hours == 0 and
- record.planned_hours == 0 and
- record.actual_billable_hours == 0 and
- record.actual_non_billable_hours == 0):
- return 0.0
-
- total_weight = 0
- weighted_sum = 0
- valid_indicators = 0
-
- efficiency_data = {
- 'available_hours': getattr(record, 'available_hours', 0.0) or 0.0,
- 'planned_hours': getattr(record, 'planned_hours', 0.0) or 0.0,
- 'planned_billable_hours': getattr(record, 'planned_billable_hours', 0.0) or 0.0,
- 'planned_non_billable_hours': getattr(record, 'planned_non_billable_hours', 0.0) or 0.0,
- 'actual_billable_hours': getattr(record, 'actual_billable_hours', 0.0) or 0.0,
- 'actual_non_billable_hours': getattr(record, 'actual_non_billable_hours', 0.0) or 0.0,
- 'total_actual_hours': getattr(record, 'total_actual_hours', 0.0) or 0.0,
- 'expected_hours_to_date': getattr(record, 'expected_hours_to_date', 0.0) or 0.0,
- 'wage': getattr(record, 'wage', 0.0) or 0.0,
- 'utilization_rate': getattr(record, 'utilization_rate', 100.0) or 100.0,
- 'overhead': getattr(record, 'overhead', 40.0) or 40.0,
- }
-
- for indicator in indicators:
- if indicator.weight > 0:
- indicator_value = indicator.evaluate_formula(efficiency_data)
- # Count indicators with valid values (including 0 when it's a valid result)
- if indicator_value is not None:
- weighted_sum += indicator_value * indicator.weight
- total_weight += indicator.weight
- valid_indicators += 1
-
- # If no valid indicators or no total weight, return 0
- if total_weight <= 0 or valid_indicators == 0:
- return 0.0
-
- # Multiply by 100 to show as percentage
- return float_round((weighted_sum / total_weight) * 100, 2)
- @api.depends('employee_id', 'month_year')
- def _compute_display_name(self):
- for record in self:
- if record.employee_id and record.month_year:
- record.display_name = f"{record.employee_id.name} - {record.month_year}"
- else:
- record.display_name = "New Efficiency Record"
- @api.model
- def _calculate_employee_efficiency(self, employee, month_year):
- """
- Calculate efficiency for a specific employee and month
- """
- # Parse month_year (format: YYYY-MM)
- try:
- year, month = month_year.split('-')
- start_date = date(int(year), int(month), 1)
- end_date = (start_date + relativedelta(months=1)) - relativedelta(days=1)
- except ValueError:
- raise UserError(_("Invalid month_year format. Expected: YYYY-MM"))
- # Calculate available hours (considering holidays and time off)
- available_hours = self._calculate_available_hours(employee, start_date, end_date)
-
- # Calculate planned hours
- planned_hours, planned_billable_hours, planned_non_billable_hours = self._calculate_planned_hours(employee, start_date, end_date)
-
- # Calculate actual hours
- actual_billable_hours, actual_non_billable_hours = self._calculate_actual_hours(employee, start_date, end_date)
-
- # Calculate wage and currency from employee's contract
- # Use current date for wage calculation (when the record is being created/calculated)
- wage_date = date.today()
- wage, currency_id = self._get_employee_wage_and_currency(employee, wage_date)
-
- # Get employee's utilization rate and company's overhead at calculation time
- utilization_rate = employee.utilization_rate or 100.0
- overhead = employee.company_id.overhead or 40.0
-
- # Apply utilization_rate to actual_billable_hours to reflect real billable capacity
- adjusted_actual_billable_hours = actual_billable_hours * (utilization_rate / 100)
-
- return {
- 'month_year': month_year,
- 'employee_id': employee.id,
- 'company_id': employee.company_id.id,
- 'available_hours': available_hours,
- 'planned_hours': planned_hours,
- 'planned_billable_hours': planned_billable_hours,
- 'planned_non_billable_hours': planned_non_billable_hours,
- 'actual_billable_hours': adjusted_actual_billable_hours,
- 'actual_non_billable_hours': actual_non_billable_hours,
- 'wage': wage,
- 'currency_id': currency_id,
- 'utilization_rate': utilization_rate,
- 'overhead': overhead,
- }
- @api.model
- def _get_employee_wage_and_currency(self, employee, target_date):
- """
- Get employee's wage and currency from their contract for a specific date
- Always take the contract that is active on the target date
- If no active contract on that date, return 0 with company currency
- """
- if not employee:
- return 0.0, self.env.company.currency_id.id
-
- # Get all contracts for the employee
- contracts = self.env['hr.contract'].search([
- ('employee_id', '=', employee.id),
- ('state', '=', 'open')
- ], order='date_start desc')
-
- if not contracts:
- return 0.0, self.env.company.currency_id.id
-
- # Find the contract that is active on the target date
- for contract in contracts:
- contract_start = contract.date_start
-
- # Check if contract is active on target date
- if contract.date_end:
- # Contract has end date
- if contract_start <= target_date <= contract.date_end:
- wage = contract.wage or 0.0
- currency_id = contract.currency_id.id if contract.currency_id else self.env.company.currency_id.id
- return wage, currency_id
- else:
- # Contract has no end date, it's active for any date after start
- if contract_start <= target_date:
- wage = contract.wage or 0.0
- currency_id = contract.currency_id.id if contract.currency_id else self.env.company.currency_id.id
- return wage, currency_id
-
- # If no contract is active on the target date, return defaults
- return 0.0, self.env.company.currency_id.id
- def _calculate_available_hours(self, employee, start_date, end_date):
- """
- Calculate available hours considering holidays and time off
- """
- if not employee.resource_calendar_id:
- return 0.0
-
- # Convert dates to datetime for the method
- start_datetime = datetime.combine(start_date, datetime.min.time())
- end_datetime = datetime.combine(end_date, datetime.max.time())
-
- # Get working hours from calendar
- work_hours_data = employee._list_work_time_per_day(start_datetime, end_datetime)
- total_work_hours = sum(hours for _, hours in work_hours_data[employee.id])
-
- # Subtract hours from approved time off
- time_off_hours = self._get_time_off_hours(employee, start_date, end_date)
-
- return max(0.0, total_work_hours - time_off_hours)
- def _get_time_off_hours(self, employee, start_date, end_date):
- """
- Get hours from approved time off requests
- """
- # Get approved time off requests
- leaves = self.env['hr.leave'].search([
- ('employee_id', '=', employee.id),
- ('state', '=', 'validate'),
- ('date_from', '<=', end_date),
- ('date_to', '>=', start_date),
- ])
-
- total_hours = 0.0
- for leave in leaves:
- # Calculate overlap with the month
- overlap_start = max(leave.date_from.date(), start_date)
- overlap_end = min(leave.date_to.date(), end_date)
-
- if overlap_start <= overlap_end:
- # Get hours for the overlap period
- overlap_start_dt = datetime.combine(overlap_start, datetime.min.time())
- overlap_end_dt = datetime.combine(overlap_end, datetime.max.time())
- work_hours_data = employee._list_work_time_per_day(overlap_start_dt, overlap_end_dt)
- total_hours += sum(hours for _, hours in work_hours_data[employee.id])
-
- return total_hours
- def _calculate_planned_hours(self, employee, start_date, end_date):
- """
- Calculate planned hours from planning module
- """
- # Get planning slots for the employee that overlap with the date range
- # This is the same logic as Odoo Planning's Gantt chart
- start_datetime = datetime.combine(start_date, datetime.min.time())
- end_datetime = datetime.combine(end_date, datetime.max.time())
-
- planning_slots = self.env['planning.slot'].search([
- ('employee_id', '=', employee.id),
- ('start_datetime', '<=', end_datetime),
- ('end_datetime', '>=', start_datetime),
- # Removed state restriction to include all planning slots regardless of state
- ])
-
- total_planned = 0.0
- total_billable = 0.0
- total_non_billable = 0.0
-
- # Get working intervals for the resource and company calendar
- # This is the same approach as Odoo Planning's Gantt chart
- start_utc = pytz.utc.localize(start_datetime)
- end_utc = pytz.utc.localize(end_datetime)
-
- if employee.resource_id:
- resource_work_intervals, calendar_work_intervals = employee.resource_id._get_valid_work_intervals(
- start_utc, end_utc, calendars=employee.company_id.resource_calendar_id
- )
- else:
- # Fallback to company calendar if no resource
- calendar_work_intervals = {employee.company_id.resource_calendar_id.id: []}
- resource_work_intervals = {}
-
- for slot in planning_slots:
- # Use the same logic as Odoo Planning's Gantt chart
- # Calculate duration only within the specified period
- hours = slot._get_duration_over_period(
- start_utc, end_utc,
- resource_work_intervals, calendar_work_intervals, has_allocated_hours=False
- )
-
- # Check if the slot is linked to a billable project
- if slot.project_id and slot.project_id.allow_billable:
- total_billable += hours
- else:
- total_non_billable += hours
-
- total_planned += hours
-
- return total_planned, total_billable, total_non_billable
- def _calculate_actual_hours(self, employee, start_date, end_date):
- """
- Calculate actual hours from timesheets
- """
- # Get timesheets for the employee in the date range (excluding time off)
- timesheets = self.env['account.analytic.line'].search([
- ('employee_id', '=', employee.id),
- ('date', '>=', start_date),
- ('date', '<=', end_date),
- ('project_id', '!=', False), # Only project timesheets
- ('holiday_id', '=', False), # Exclude time off timesheets
- ])
-
- total_billable = 0.0
- total_non_billable = 0.0
-
- for timesheet in timesheets:
- hours = timesheet.unit_amount or 0.0
-
- # Additional filter: exclude time off tasks
- if timesheet.task_id and timesheet.task_id.name and 'time off' in timesheet.task_id.name.lower():
- continue # Skip time off tasks
-
- # Additional filter: exclude time off from name
- if timesheet.name and 'tiempo personal' in timesheet.name.lower():
- continue # Skip personal time entries
-
- # Check if the project is billable
- if timesheet.project_id and timesheet.project_id.allow_billable:
- total_billable += hours
- else:
- total_non_billable += hours
-
- return total_billable, total_non_billable
- @api.model
- def calculate_efficiency_for_period(self, start_month=None, end_month=None):
- """
- Calculate efficiency for all employees for a given period
- """
- if not start_month:
- # Default: last 3 months and next 6 months
- current_date = date.today()
- start_month = (current_date - relativedelta(months=3)).strftime('%Y-%m')
- end_month = (current_date + relativedelta(months=6)).strftime('%Y-%m')
-
- # Generate list of months
- months = self._generate_month_list(start_month, end_month)
-
- # Get all active employees of type 'employee'
- employees = self.env['hr.employee'].search([
- ('active', '=', True),
- ('employee_type', '=', 'employee')
- ])
-
- created_records = []
-
- for employee in employees:
- for month in months:
- # Calculate efficiency data
- efficiency_data = self._calculate_employee_efficiency(employee, month)
-
- # Check if there are changes compared to the latest record
- latest_record = self.search([
- ('employee_id', '=', employee.id),
- ('month_year', '=', month),
- ('company_id', '=', employee.company_id.id),
- ], order='calculation_date desc', limit=1)
-
- has_changes = False
- if latest_record:
- # Compare current data with latest record
- fields_to_compare = [
- 'available_hours', 'planned_hours', 'planned_billable_hours',
- 'planned_non_billable_hours', 'actual_billable_hours',
- 'actual_non_billable_hours'
- ]
-
- # Check basic fields
- for field in fields_to_compare:
- if abs(efficiency_data[field] - latest_record[field]) > 0.01: # Tolerance for floating point
- has_changes = True
- break
-
- # If no changes in basic fields, check dynamic indicators
- if not has_changes:
- # Get all active indicators
- active_indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
-
- for indicator in active_indicators:
- field_name = self._get_indicator_field_name(indicator.name)
-
- # Calculate current indicator value
- current_value = indicator.evaluate_formula(efficiency_data)
-
- # Get previous indicator value from record
- previous_value = getattr(latest_record, field_name, None)
-
- # Compare values with tolerance
- if (current_value is not None and previous_value is not None and
- abs(current_value - previous_value) > 0.01):
- has_changes = True
- break
- elif current_value != previous_value: # Handle None vs value cases
- has_changes = True
- break
- else:
- # No previous record exists, so this is a change
- has_changes = True
-
- # Only create new record if there are changes
- if has_changes:
- # Archive existing records for this employee and month
- existing_records = self.search([
- ('employee_id', '=', employee.id),
- ('month_year', '=', month),
- ('company_id', '=', employee.company_id.id),
- ])
- if existing_records:
- existing_records.write({'active': False})
-
- # Create new record
- new_record = self.create(efficiency_data)
- created_records.append(new_record)
-
- # Calculate indicators for all newly created records
- if created_records:
- # Convert list to recordset
- created_recordset = self.browse([record.id for record in created_records])
- created_recordset._calculate_all_indicators()
-
- return {
- 'created': len(created_records),
- 'updated': 0, # No longer updating existing records
- 'total_processed': len(created_records),
- }
-
- @api.model
- def _init_dynamic_system(self):
- """
- Initialize dynamic fields and views when module is installed
- """
- import logging
- _logger = logging.getLogger(__name__)
-
- try:
- _logger.info("Starting dynamic system initialization...")
-
- # Step 1: Create dynamic field records for existing indicators
- self._create_default_dynamic_fields()
- _logger.info("Default dynamic fields created successfully")
-
- # Step 2: Dynamic fields are created via hr.efficiency.dynamic.field model
- _logger.info("Dynamic fields creation handled by hr.efficiency.dynamic.field model")
-
- # Step 3: Update views
- self._update_views_with_dynamic_fields()
- _logger.info("Views updated successfully")
-
- # Step 4: Force recompute of existing records
- records = self.search([])
- if records:
- records._invalidate_cache()
- _logger.info(f"Invalidated cache for {len(records)} records")
-
- _logger.info("Dynamic system initialization completed successfully")
-
- except Exception as e:
- _logger.error(f"Error during dynamic system initialization: {str(e)}")
- raise
-
- @api.model
- def _post_init_hook(self):
- """
- Post-install hook to ensure dynamic fields are created for existing indicators
- """
- import logging
- _logger = logging.getLogger(__name__)
-
- try:
- _logger.info("Running post-install hook for hr_efficiency module")
-
- # Ensure all active indicators have manual fields
- active_indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
- for indicator in active_indicators:
- self.env['hr.efficiency.indicator']._create_dynamic_field(indicator)
-
- # Update views with dynamic fields
- self._update_views_with_dynamic_fields()
-
- # Apply default values to existing records
- self._apply_default_values_to_existing_records()
-
- _logger.info(f"Post-install hook completed. Processed {len(active_indicators)} indicators")
-
- except Exception as e:
- _logger.error(f"Error in post-install hook: {str(e)}")
- raise
- @api.model
- def create(self, vals_list):
- """
- Override create to calculate indicators when records are created
- """
- records = super().create(vals_list)
-
- # Calculate indicators for newly created records
- if records:
- records._calculate_all_indicators()
-
- return records
-
- def write(self, vals):
- """
- Override write to recalculate indicators when records are updated
- """
- result = super().write(vals)
-
- # Recalculate indicators for updated records
- self._calculate_all_indicators()
-
- return result
- @api.model
- def _register_hook(self):
- """
- Called when the registry is loaded.
- Update views with dynamic fields on every module load/restart.
- """
- super()._register_hook()
- try:
- # Ensure aggregation fields exist for dynamic indicators
- self._ensure_aggregation_fields_exist()
- # Update views with current dynamic fields on every module load
- self._update_views_with_dynamic_fields()
- except Exception as e:
- # Log error but don't prevent module loading
- import logging
- _logger = logging.getLogger(__name__)
- _logger.warning(f"Could not update dynamic views on module load: {str(e)}")
- @api.model
- def _ensure_aggregation_fields_exist(self):
- """
- Asegura que existan campos de agregación para todos los indicadores activos
- """
- try:
- active_indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
-
- for indicator in active_indicators:
- field_name = self._get_indicator_field_name(indicator.name)
- agg_field_name = f"{field_name}_agg"
-
- # Verificar si el campo de agregación ya existe en ir.model.fields
- agg_field_record = self.env['ir.model.fields'].search([
- ('model', '=', 'hr.efficiency'),
- ('name', '=', agg_field_name)
- ], limit=1)
-
- if not agg_field_record:
- # Determinar tipo de agregación basado en el tipo de indicador
- agg_type = 'avg' if indicator.indicator_type == 'percentage' else 'sum'
-
- # Obtener el model_id para hr.efficiency
- model_record = self.env['ir.model'].search([('model', '=', 'hr.efficiency')], limit=1)
-
- if model_record:
- # Crear el campo de agregación
- self.env['ir.model.fields'].create({
- 'name': agg_field_name,
- 'model_id': model_record.id,
- 'field_description': f'{indicator.name} (Agregación)',
- 'ttype': 'float',
- 'state': 'manual',
- 'store': True,
- 'help': f'Campo de agregación para {indicator.name}'
- })
-
- import logging
- _logger = logging.getLogger(__name__)
- _logger.info(f"Created aggregation field: {agg_field_name} for indicator: {indicator.name}")
- else:
- import logging
- _logger = logging.getLogger(__name__)
- _logger.warning(f"Model hr.efficiency not found for aggregation field: {agg_field_name}")
-
- import logging
- _logger = logging.getLogger(__name__)
- _logger.info(f"Created aggregation field: {agg_field_name} for indicator: {indicator.name}")
-
- except Exception as e:
- import logging
- _logger = logging.getLogger(__name__)
- _logger.warning(f"Error creating aggregation fields: {str(e)}")
- @api.model
- def _update_views_with_dynamic_fields(self):
- """
- Update inherited views to include dynamic fields after module is loaded
- """
- import logging
- _logger = logging.getLogger(__name__)
-
- try:
- # Get active indicators ordered by sequence (consistent with other parts)
- active_indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
-
- fields_to_display = []
-
- for indicator in active_indicators:
- field_name = self._get_indicator_field_name(indicator.name)
-
- # Check if this indicator has a manual field
- manual_field = self.env['ir.model.fields'].search([
- ('model', '=', 'hr.efficiency'),
- ('state', '=', 'manual'),
- ('ttype', '=', 'float'),
- ('name', '=', field_name),
- ], limit=1)
-
- if manual_field:
- # Indicator with manual field
- fields_to_display.append({
- 'name': field_name,
- 'field_description': indicator.name,
- 'indicator': indicator
- })
- else:
- # Create manual field for this indicator
- _logger.info(f"Creating manual field for indicator '{indicator.name}'")
- self.env['hr.efficiency.indicator']._create_dynamic_field(indicator)
- # Add to display list after creation
- fields_to_display.append({
- 'name': field_name,
- 'field_description': indicator.name,
- 'indicator': indicator
- })
-
- _logger.info(f"Found {len(fields_to_display)} fields to add to views")
-
- # Build dynamic fields XML for list view
- dynamic_fields_xml = ''
- for field_info in fields_to_display:
- # Determine widget based on indicator type
- indicator = field_info['indicator']
- widget_map = {
- 'percentage': 'percentage', # Changed back to 'percentage' to show % symbol
- 'hours': 'float_time',
- 'currency': 'monetary',
- 'number': 'float'
- }
- widget = widget_map.get(indicator.indicator_type, 'float') if indicator else 'float'
-
- field_xml = f'<field name="{field_info["name"]}" widget="{widget}" optional="show"'
-
- # Add field name as string (tooltip will be shown automatically from field description)
- if indicator:
- field_xml += f' string="{indicator.name}"'
-
- # Add decorations based on indicator thresholds
- if indicator:
- # Use standard efficiency ranges: >=0.9 green, >=0.8 yellow, >=0.7 orange, <0.7 red or zero
- field_xml += f' decoration-success="{field_info["name"]} >= 0.9"'
- field_xml += f' decoration-warning="{field_info["name"]} >= 0.8 and {field_info["name"]} < 0.9"'
- field_xml += f' decoration-info="{field_info["name"]} >= 0.7 and {field_info["name"]} < 0.8"'
- field_xml += f' decoration-danger="{field_info["name"]} < 0.7 or {field_info["name"]} == 0"'
-
- field_xml += '/>'
- dynamic_fields_xml += field_xml
-
- # Add aggregation field for this indicator
- agg_field_name = f"{field_info['name']}_agg"
- agg_type = 'avg' if indicator.indicator_type == 'percentage' else 'sum'
- agg_label = f"Total {indicator.name}" if agg_type == 'sum' else f"Average {indicator.name}"
-
- agg_field_xml = f'<field name="{agg_field_name}" {agg_type}="{agg_label}" optional="hide" invisible="1"/>'
- dynamic_fields_xml += agg_field_xml
-
- # Update inherited list view
- inherited_list_view = self.env.ref('hr_efficiency.view_hr_efficiency_list_inherited', raise_if_not_found=False)
- if inherited_list_view:
- if dynamic_fields_xml:
- new_arch = f"""
- <xpath expr=\"//field[@name='expected_hours_to_date']\" position=\"after\">{dynamic_fields_xml}</xpath>
- """
- else:
- # If no dynamic fields, remove any existing dynamic fields from the view
- new_arch = """
- <xpath expr=\"//field[@name='expected_hours_to_date']\" position=\"after\">
- <!-- No dynamic fields to display -->
- </xpath>
- """
- inherited_list_view.write({'arch': new_arch})
- _logger.info(f"Updated inherited list view with {len(fields_to_display)} dynamic fields")
-
- # Build dynamic fields XML for form view
- form_dynamic_fields_xml = ''
- for field_info in fields_to_display:
- # Determine widget based on indicator type
- indicator = field_info['indicator']
- widget_map = {
- 'percentage': 'badge',
- 'hours': 'float_time',
- 'currency': 'monetary',
- 'number': 'float'
- }
- widget = widget_map.get(indicator.indicator_type, 'badge') if indicator else 'badge'
-
- field_xml = f'<field name="{field_info["name"]}" widget="{widget}"'
-
- # Add help text with indicator description (valid in form views)
- if indicator and indicator.description:
- # Escape quotes in description for XML
- help_text = indicator.description.replace('"', '"')
- field_xml += f' help="{help_text}"'
-
- # Add decorations based on indicator thresholds
- if indicator:
- # Use standard efficiency ranges: >=0.9 green, >=0.8 yellow, >=0.7 orange, <0.7 red or zero
- field_xml += f' decoration-success="{field_info["name"]} >= 0.9"'
- field_xml += f' decoration-warning="{field_info["name"]} >= 0.8 and {field_info["name"]} < 0.9"'
- field_xml += f' decoration-info="{field_info["name"]} >= 0.7 and {field_info["name"]} < 0.8"'
- field_xml += f' decoration-danger="{field_info["name"]} < 0.7 or {field_info["name"]} == 0"'
-
- field_xml += '/>'
- form_dynamic_fields_xml += field_xml
-
- # Update inherited form view
- inherited_form_view = self.env.ref('hr_efficiency.view_hr_efficiency_form_inherited', raise_if_not_found=False)
- if inherited_form_view:
- if form_dynamic_fields_xml:
- new_form_arch = f"""
- <xpath expr=\"//field[@name='overall_efficiency']\" position=\"before\">{form_dynamic_fields_xml}</xpath>
- """
- else:
- # If no dynamic fields, remove any existing dynamic fields from the view
- new_form_arch = """
- <xpath expr=\"//field[@name='overall_efficiency']\" position=\"before\">
- <!-- No dynamic fields to display -->
- </xpath>
- """
- inherited_form_view.write({'arch': new_form_arch})
- _logger.info("Updated inherited form view with dynamic fields")
-
- except Exception as e:
- _logger.error(f"Error updating views with dynamic fields: {str(e)}")
- raise
- def _generate_month_list(self, start_month, end_month):
- """
- Generate list of months between start_month and end_month (inclusive)
- """
- months = []
-
- # Convert date objects to datetime if needed
- if isinstance(start_month, date):
- start_month = start_month.strftime('%Y-%m')
- if isinstance(end_month, date):
- end_month = end_month.strftime('%Y-%m')
-
- current = datetime.strptime(start_month, '%Y-%m')
- end = datetime.strptime(end_month, '%Y-%m')
-
- while current <= end:
- months.append(current.strftime('%Y-%m'))
- current = current + relativedelta(months=1)
-
- return months
- @api.model
- def _cron_calculate_efficiency(self):
- """
- Cron job to automatically calculate efficiency
- """
- self.calculate_efficiency_for_period()
- self._update_dynamic_filter_labels()
- @api.model
- def _update_dynamic_filter_labels(self):
- """
- Update dynamic filter labels based on current date
- """
- labels = self._get_dynamic_month_labels()
-
- # Update filter labels
- filter_mapping = {
- 'filter_two_months_ago': labels['two_months_ago'],
- 'filter_last_month': labels['last_month'],
- 'filter_current_month': labels['current_month'],
- 'filter_next_month': labels['next_month'],
- 'filter_two_months_ahead': labels['two_months_ahead'],
- }
-
- for filter_name, label in filter_mapping.items():
- try:
- filter_record = self.env.ref(f'hr_efficiency.{filter_name}', raise_if_not_found=False)
- if filter_record:
- filter_record.write({'name': label})
- except Exception as e:
- _logger.warning(f"Could not update filter {filter_name}: {str(e)}")
- @api.model
- def _get_month_filter_options(self):
- """
- Get dynamic month filter options for search view
- Returns a list of tuples (month_year, display_name) for the last 2, current, and next 2 months
- """
- current_date = date.today()
- months = []
-
- # Last 2 months
- for i in range(2, 0, -1):
- month_date = current_date - relativedelta(months=i)
- month_year = month_date.strftime('%Y-%m')
- month_name = month_date.strftime('%B %Y') # e.g., "August 2024"
- months.append((month_year, month_name))
-
- # Current month
- current_month_year = current_date.strftime('%Y-%m')
- current_month_name = current_date.strftime('%B %Y')
- months.append((current_month_year, current_month_name))
-
- # Next 2 months
- for i in range(1, 3):
- month_date = current_date + relativedelta(months=i)
- month_year = month_date.strftime('%Y-%m')
- month_name = month_date.strftime('%B %Y')
- months.append((month_year, month_name))
-
- return months
- @api.model
- def _get_dynamic_month_labels(self):
- """
- Get dynamic month labels for filters
- Returns a dictionary with month labels like "Mes 1", "Mes 2", "Mes 3 << actual", etc.
- """
- current_date = date.today()
- labels = {}
-
- # 2 months ago
- month_2_ago = current_date - relativedelta(months=2)
- labels['two_months_ago'] = f"Mes {month_2_ago.strftime('%m')}"
-
- # 1 month ago
- month_1_ago = current_date - relativedelta(months=1)
- labels['last_month'] = f"Mes {month_1_ago.strftime('%m')}"
-
- # Current month
- labels['current_month'] = f"Mes {current_date.strftime('%m')} << actual"
-
- # Next month
- month_1_ahead = current_date + relativedelta(months=1)
- labels['next_month'] = f"Mes {month_1_ahead.strftime('%m')}"
-
- # 2 months ahead
- month_2_ahead = current_date + relativedelta(months=2)
- labels['two_months_ahead'] = f"Mes {month_2_ahead.strftime('%m')}"
-
- return labels
- def _count_working_days(self, start_date, end_date, employee=None):
- """
- Count working days between two dates considering employee calendar
- """
- working_days = 0
- current_date = start_date
-
- while current_date <= end_date:
- # Check if it's a working day according to employee calendar
- if self._is_working_day(current_date, employee):
- working_days += 1
- current_date += relativedelta(days=1)
-
- return working_days
- def _is_working_day(self, check_date, employee=None):
- """
- Check if a date is a working day considering employee calendar
- """
- if not employee or not employee.resource_calendar_id:
- # Fallback to basic weekday check
- return check_date.weekday() < 5
-
- try:
- # Convert date to datetime for calendar check
- check_datetime = datetime.combine(check_date, datetime.min.time())
-
- # Check if the day is a working day according to employee calendar
- working_hours = employee.resource_calendar_id._list_work_time_per_day(
- check_datetime, check_datetime, compute_leaves=True
- )
-
- # If there are working hours, it's a working day
- return bool(working_hours)
-
- except Exception:
- # Fallback to basic weekday check if there's any error
- return check_date.weekday() < 5
- @api.model
- def _apply_default_values_to_existing_records(self):
- """Apply default values to existing employee and company records"""
- import logging
- _logger = logging.getLogger(__name__)
-
- try:
- # Update employees with utilization_rate = 100.0 if not set
- employees_to_update = self.env['hr.employee'].search([
- ('utilization_rate', '=', 0.0)
- ])
- if employees_to_update:
- employees_to_update.write({'utilization_rate': 100.0})
- _logger.info(f"Updated {len(employees_to_update)} employees with default utilization_rate = 100.0%")
-
- # Update companies with overhead = 40.0 if not set
- companies_to_update = self.env['res.company'].search([
- ('overhead', '=', 0.0)
- ])
- if companies_to_update:
- companies_to_update.write({'overhead': 40.0})
- _logger.info(f"Updated {len(companies_to_update)} companies with default overhead = 40.0%")
-
- _logger.info("Default values applied successfully to existing records")
-
- except Exception as e:
- _logger.error(f"Error applying default values to existing records: {str(e)}")
- # Don't raise the exception to avoid breaking the installation
|