# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import logging
import pytz
from datetime import datetime, date
from dateutil.relativedelta import relativedelta
from odoo import api, fields, models, _
from odoo.exceptions import UserError
from odoo.tools import float_round
_logger = logging.getLogger(__name__)
class HrEfficiency(models.Model):
_name = 'hr.efficiency'
_description = 'Employee Efficiency'
_order = 'month_year desc, employee_id'
_rec_name = 'display_name'
_active_name = 'active'
# Basic fields
name = fields.Char('Name', compute='_compute_display_name', store=True)
employee_id = fields.Many2one('hr.employee', 'Employee', required=True, domain=[('employee_type', '=', 'employee')])
month_year = fields.Char('Month Year', required=True, help="Format: YYYY-MM (e.g., 2024-08)")
date = fields.Date('Date', compute='_compute_date', store=True, help="Date field for standard Odoo date filters")
company_id = fields.Many2one('res.company', 'Company', default=lambda self: self.env.company)
active = fields.Boolean('Active', default=True)
calculation_date = fields.Datetime('Calculation Date', default=fields.Datetime.now, help='When this calculation was performed')
# Available hours (what employee should work)
available_hours = fields.Float('Available Hours', digits=(10, 2), help="Total hours employee should work in the month")
# Employee contract information
wage = fields.Float('Gross Salary', digits=(10, 2), aggregator='sum', help="Employee's gross salary from their contract at the time of calculation")
wage_overhead = fields.Float('Wage Overhead', digits=(10, 2), aggregator='sum', help="Hourly cost multiplied by available hours for the month, including company overhead")
currency_id = fields.Many2one('res.currency', 'Currency', help="Currency for the wage field")
# Employee utilization and company overhead (stored at calculation time)
utilization_rate = fields.Float('Utilization Rate (%)', digits=(5, 2), default=100.0, aggregator='avg', help="Employee's utilization rate at the time of calculation")
overhead = fields.Float('Company Overhead (%)', digits=(5, 2), default=40.0, aggregator='avg', help="Company's overhead percentage at the time of calculation")
expected_profitability = fields.Float('Expected Profitability (%)', digits=(5, 2), default=30.0, aggregator='avg', help="Company's expected profitability percentage at the time of calculation")
efficiency_factor = fields.Float('Efficiency Factor (%)', digits=(5, 2), default=85.0, aggregator='avg', help="Company's efficiency factor percentage at the time of calculation")
# Planned hours (what was planned)
planned_hours = fields.Float('Planned Hours', digits=(10, 2), help="Total hours planned for the month")
planned_billable_hours = fields.Float('Planned Billable Hours', digits=(10, 2), help="Hours planned on billable projects")
planned_non_billable_hours = fields.Float('Planned Non-Billable Hours', digits=(10, 2), help="Hours planned on non-billable projects")
# Actual hours
actual_billable_hours = fields.Float('Actual Billable Hours', digits=(10, 2), help="Hours actually worked on billable projects")
actual_non_billable_hours = fields.Float('Actual Non-Billable Hours', digits=(10, 2), help="Hours actually worked on non-billable projects")
# Calculated fields (stored for performance)
total_actual_hours = fields.Float('Total Actual Hours', digits=(10, 2), help="Total actual hours (billable + non-billable)")
expected_hours_to_date = fields.Float('Expected Hours to Date', digits=(10, 2), help='Hours that should be registered based on planning until current date')
# Precio por Hora (stored field)
precio_por_hora = fields.Float('Precio por Hora', digits=(10, 2), help='Precio que cobramos al cliente por hora (wage_overhead/available_hours + rentabilidad_esperada)', aggregator='avg')
# Dynamic indicator fields (will be created automatically)
# These fields are managed dynamically based on hr.efficiency.indicator records
# Overall efficiency (always present) - Now stored fields calculated on save
overall_efficiency = fields.Float('Overall Efficiency (%)', digits=(5, 2), help='Overall efficiency based on configured indicators')
overall_efficiency_display = fields.Char('Overall Efficiency Display', help='Overall efficiency formatted for display with badge widget')
display_name = fields.Char('Display Name', compute='_compute_display_name', store=True)
# Note: Removed unique constraint to allow historical tracking
# Multiple records can exist for the same employee and month
@api.depends('month_year')
def _compute_date(self):
"""
Compute date field from month_year for standard Odoo date filters
"""
for record in self:
if record.month_year:
try:
year, month = record.month_year.split('-')
record.date = date(int(year), int(month), 1)
except (ValueError, AttributeError):
record.date = False
else:
record.date = False
def _calculate_expected_hours_to_date(self, record):
"""
Calculate expected hours to date based on planned hours and working days
"""
if not record.month_year or not record.planned_hours or not record.employee_id.contract_id:
return 0.0
try:
# Parse month_year (format: YYYY-MM)
year, month = record.month_year.split('-')
start_date = date(int(year), int(month), 1)
end_date = (start_date + relativedelta(months=1)) - relativedelta(days=1)
# Get current date
current_date = date.today()
# If current date is outside the month, use end of month
if current_date > end_date:
calculation_date = end_date
elif current_date < start_date:
calculation_date = start_date
else:
calculation_date = current_date
# Calculate working days in the month
total_working_days = self._count_working_days(start_date, end_date, record.employee_id)
# Calculate working days until current date
working_days_until_date = self._count_working_days(start_date, calculation_date, record.employee_id)
if total_working_days > 0:
# Calculate expected hours based on planned hours (what was planned to work)
expected_hours = (record.planned_hours / total_working_days) * working_days_until_date
# Ensure we don't exceed planned hours for the month
expected_hours = min(expected_hours, record.planned_hours)
return float_round(expected_hours, 2)
else:
return 0.0
except (ValueError, AttributeError) as e:
return 0.0
def _calculate_precio_por_hora(self, record):
"""
Calculate precio por hora: (wage_overhead / available_hours) * (1 + expected_profitability/100)
Since wage_overhead already includes overhead, we only need to add expected profitability
"""
try:
# Get wage_overhead and available_hours from record
wage_overhead = getattr(record, 'wage_overhead', 0.0) or 0.0
available_hours = getattr(record, 'available_hours', 0.0) or 0.0
expected_profitability = getattr(record, 'expected_profitability', 30.0) or 30.0
if wage_overhead > 0 and available_hours > 0:
# Calculate hourly cost from wage_overhead (which already includes overhead)
hourly_cost_with_overhead = wage_overhead / available_hours
# Apply only expected profitability margin (overhead is already included)
precio_por_hora = hourly_cost_with_overhead * (1 + (expected_profitability / 100))
return float_round(precio_por_hora, 2)
else:
return 0.0
except (ValueError, AttributeError, ZeroDivisionError) as e:
return 0.0
def _calculate_all_indicators(self):
"""
Calculate all indicators and overall efficiency for stored fields
This method is called on create/write instead of using computed fields
"""
# Prepare values to update without triggering write recursively
values_to_update = {}
# Pre-fetch necessary fields to avoid CacheMiss during iteration
fields_to_load = [
'available_hours', 'planned_hours', 'planned_billable_hours',
'planned_non_billable_hours', 'actual_billable_hours',
'actual_non_billable_hours', 'total_actual_hours',
'expected_hours_to_date', 'wage', 'wage_overhead', 'utilization_rate', 'overhead',
'precio_por_hora', 'employee_id', 'company_id', 'month_year', 'active',
]
# Load fields for all records in 'self' with error handling
try:
self.read(fields_to_load)
except Exception as e:
import logging
_logger = logging.getLogger(__name__)
_logger.warning(f"Error loading fields: {e}")
# Continue with empty cache if needed
for record in self:
# Get all manual fields for this model
manual_fields = self.env['ir.model.fields'].search([
('model', '=', 'hr.efficiency'),
('state', '=', 'manual'),
('ttype', '=', 'float')
])
# Prepare efficiency data with safe field access
efficiency_data = {
'available_hours': getattr(record, 'available_hours', 0.0) or 0.0,
'planned_hours': getattr(record, 'planned_hours', 0.0) or 0.0,
'planned_billable_hours': getattr(record, 'planned_billable_hours', 0.0) or 0.0,
'planned_non_billable_hours': getattr(record, 'planned_non_billable_hours', 0.0) or 0.0,
'actual_billable_hours': getattr(record, 'actual_billable_hours', 0.0) or 0.0,
'actual_non_billable_hours': getattr(record, 'actual_non_billable_hours', 0.0) or 0.0,
'total_actual_hours': getattr(record, 'total_actual_hours', 0.0) or 0.0,
'expected_hours_to_date': getattr(record, 'expected_hours_to_date', 0.0) or 0.0,
'wage': getattr(record, 'wage', 0.0) or 0.0,
'wage_overhead': getattr(record, 'wage_overhead', 0.0) or 0.0,
'utilization_rate': getattr(record, 'utilization_rate', 100.0) or 100.0,
'overhead': getattr(record, 'overhead', 40.0) or 40.0,
'precio_por_hora': getattr(record, 'precio_por_hora', 0.0) or 0.0,
}
# STEP 1: Calculate total_actual_hours FIRST (needed for indicators)
try:
total_actual_hours = getattr(record, 'actual_billable_hours', 0.0) + getattr(record, 'actual_non_billable_hours', 0.0)
except Exception as e:
import logging
_logger = logging.getLogger(__name__)
_logger.error(f"Error calculating total_actual_hours for record {record.id}: {e}")
total_actual_hours = 0.0
record_values = {'total_actual_hours': float_round(total_actual_hours, 2)}
# STEP 2: Calculate expected_hours_to_date
expected_hours = self._calculate_expected_hours_to_date(record)
record_values['expected_hours_to_date'] = expected_hours
# STEP 3: Calculate precio_por_hora (needed for profitability indicators)
precio_por_hora = self._calculate_precio_por_hora(record)
record_values['precio_por_hora'] = precio_por_hora
# STEP 4: Update efficiency_data with ALL calculated base fields
efficiency_data.update({
'total_actual_hours': total_actual_hours,
'expected_hours_to_date': expected_hours,
'precio_por_hora': precio_por_hora,
})
# STEP 5: Calculate all indicators dynamically (in sequence order)
active_indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
for indicator in active_indicators:
field_name = self._get_indicator_field_name(indicator.name)
# Check if the field exists in the record
if field_name in record._fields:
# Calculate indicator value using the indicator formula
indicator_value = indicator.evaluate_formula(efficiency_data)
# Store the value to update later
record_values[field_name] = float_round(indicator_value, 2)
# Calculate overall efficiency
overall_efficiency = self._calculate_overall_efficiency(record)
record_values['overall_efficiency'] = overall_efficiency
# Calculate overall efficiency display
if overall_efficiency == 0:
record_values['overall_efficiency_display'] = '0.00'
else:
record_values['overall_efficiency_display'] = f"{overall_efficiency:.2f}"
# Store values for this record
values_to_update[record.id] = record_values
# Update all records at once to avoid recursion
for record_id, values in values_to_update.items():
record = self.browse(record_id)
# Use direct SQL update to avoid triggering write method
if values:
record.env.cr.execute(
"UPDATE hr_efficiency SET " +
", ".join([f"{key} = %s" for key in values.keys()]) +
" WHERE id = %s",
list(values.values()) + [record_id]
)
def _update_stored_manual_fields(self):
"""Update stored manual fields with computed values"""
for record in self:
# Get all manual fields for this model
manual_fields = self.env['ir.model.fields'].search([
('model', '=', 'hr.efficiency'),
('state', '=', 'manual'),
('ttype', '=', 'float'),
('store', '=', True),
], order='id')
# Prepare efficiency data with safe field access
efficiency_data = {
'available_hours': record.available_hours or 0.0,
'planned_hours': record.planned_hours or 0.0,
'planned_billable_hours': record.planned_billable_hours or 0.0,
'planned_non_billable_hours': record.planned_non_billable_hours or 0.0,
'actual_billable_hours': record.actual_billable_hours or 0.0,
'actual_non_billable_hours': record.actual_non_billable_hours or 0.0,
'total_actual_hours': record.total_actual_hours or 0.0,
'expected_hours_to_date': record.expected_hours_to_date or 0.0,
'wage': record.wage or 0.0,
'wage_overhead': record.wage_overhead or 0.0,
'utilization_rate': record.utilization_rate or 100.0,
'overhead': record.overhead or 40.0,
}
# Calculate and update stored manual fields
for field in manual_fields:
# Find the corresponding indicator
indicator = self.env['hr.efficiency.indicator'].search([
('name', 'ilike', field.field_description)
], limit=1)
if indicator and field.name in record._fields:
# Calculate indicator value using the indicator formula
indicator_value = indicator.evaluate_formula(efficiency_data)
# Update the stored field value
record[field.name] = float_round(indicator_value, 2)
@api.model
def _recompute_all_indicators(self):
"""Recompute all indicator fields for all records"""
records = self.search([])
if records:
records._calculate_all_indicators()
def _get_indicator_field_name(self, indicator_name):
"""
Convert indicator name to valid field name
"""
import re
# Remove special characters and convert to lowercase
field_name = indicator_name.lower()
# Replace spaces, hyphens, and other special characters with underscores
field_name = re.sub(r'[^a-z0-9_]', '_', field_name)
# Remove multiple consecutive underscores
field_name = re.sub(r'_+', '_', field_name)
# Remove leading and trailing underscores
field_name = field_name.strip('_')
# Ensure it starts with x_ for manual fields
if not field_name.startswith('x_'):
field_name = 'x_' + field_name
# Ensure it doesn't exceed 63 characters (PostgreSQL limit)
if len(field_name) > 63:
field_name = field_name[:63]
return field_name
@api.model
def _create_default_dynamic_fields(self):
"""
Create default dynamic field records for existing indicators
"""
# Get all active indicators
indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
for indicator in indicators:
# Check if field already exists in ir.model.fields
field_name = self._get_indicator_field_name(indicator.name)
existing_field = self.env['ir.model.fields'].search([
('model', '=', 'hr.efficiency'),
('name', '=', field_name)
], limit=1)
if not existing_field:
# Create field in ir.model.fields (like Studio does)
self.env['ir.model.fields'].create({
'name': field_name,
'model': 'hr.efficiency',
'model_id': self.env['ir.model'].search([('model', '=', 'hr.efficiency')], limit=1).id,
'ttype': 'float',
'field_description': indicator.name,
'state': 'manual', # This is the key - it makes it a custom field
'store': True,
'compute': '_compute_indicators',
})
def _calculate_overall_efficiency(self, record):
"""
Calculate overall efficiency based on configured indicators
"""
indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
if not indicators:
# Default calculation if no indicators configured
return 0.0
# Check if there's any data to calculate
if (record.available_hours == 0 and
record.planned_hours == 0 and
record.actual_billable_hours == 0 and
record.actual_non_billable_hours == 0):
return 0.0
total_weight = 0
weighted_sum = 0
valid_indicators = 0
efficiency_data = {
'available_hours': getattr(record, 'available_hours', 0.0) or 0.0,
'planned_hours': getattr(record, 'planned_hours', 0.0) or 0.0,
'planned_billable_hours': getattr(record, 'planned_billable_hours', 0.0) or 0.0,
'planned_non_billable_hours': getattr(record, 'planned_non_billable_hours', 0.0) or 0.0,
'actual_billable_hours': getattr(record, 'actual_billable_hours', 0.0) or 0.0,
'actual_non_billable_hours': getattr(record, 'actual_non_billable_hours', 0.0) or 0.0,
'total_actual_hours': getattr(record, 'total_actual_hours', 0.0) or 0.0,
'expected_hours_to_date': getattr(record, 'expected_hours_to_date', 0.0) or 0.0,
'wage': getattr(record, 'wage', 0.0) or 0.0,
'wage_overhead': getattr(record, 'wage_overhead', 0.0) or 0.0,
'utilization_rate': getattr(record, 'utilization_rate', 100.0) or 100.0,
'overhead': getattr(record, 'overhead', 40.0) or 40.0,
}
for indicator in indicators:
if indicator.weight > 0:
indicator_value = indicator.evaluate_formula(efficiency_data)
# Count indicators with valid values (including 0 when it's a valid result)
if indicator_value is not None:
weighted_sum += indicator_value * indicator.weight
total_weight += indicator.weight
valid_indicators += 1
# If no valid indicators or no total weight, return 0
if total_weight <= 0 or valid_indicators == 0:
return 0.0
# Multiply by 100 to show as percentage
return float_round((weighted_sum / total_weight) * 100, 2)
@api.depends('employee_id', 'month_year')
def _compute_display_name(self):
for record in self:
if record.employee_id and record.month_year:
record.display_name = f"{record.employee_id.name} - {record.month_year}"
else:
record.display_name = "New Efficiency Record"
@api.model
def _calculate_employee_efficiency(self, employee, month_year):
"""
Calculate efficiency for a specific employee and month
"""
# Parse month_year (format: YYYY-MM)
try:
year, month = month_year.split('-')
start_date = date(int(year), int(month), 1)
end_date = (start_date + relativedelta(months=1)) - relativedelta(days=1)
except ValueError:
raise UserError(_("Invalid month_year format. Expected: YYYY-MM"))
# Calculate available hours (considering holidays and time off)
available_hours = self._calculate_available_hours(employee, start_date, end_date)
# Calculate planned hours
planned_hours, planned_billable_hours, planned_non_billable_hours = self._calculate_planned_hours(employee, start_date, end_date)
# Calculate actual hours
actual_billable_hours, actual_non_billable_hours = self._calculate_actual_hours(employee, start_date, end_date)
# Calculate wage and currency from employee's contract
# Use current date for wage calculation (when the record is being created/calculated)
wage_date = date.today()
wage, currency_id = self._get_employee_wage_and_currency(employee, wage_date)
# Calculate wage_overhead: hourly_cost * available_hours * (1 + overhead/100)
hourly_cost = employee.hourly_cost or 0.0
overhead = employee.company_id.overhead or 40.0
wage_overhead = hourly_cost * available_hours
# Get employee's utilization rate and company's overhead at calculation time
utilization_rate = employee.utilization_rate or 100.0
overhead = employee.company_id.overhead or 40.0
expected_profitability = employee.company_id.expected_profitability or 30.0
efficiency_factor = employee.company_id.efficiency_factor or 85.0
# Apply utilization_rate to actual_billable_hours to reflect real billable capacity
adjusted_actual_billable_hours = actual_billable_hours * (utilization_rate / 100)
return {
'month_year': month_year,
'employee_id': employee.id,
'company_id': employee.company_id.id,
'available_hours': available_hours,
'planned_hours': planned_hours,
'planned_billable_hours': planned_billable_hours,
'planned_non_billable_hours': planned_non_billable_hours,
'actual_billable_hours': adjusted_actual_billable_hours,
'actual_non_billable_hours': actual_non_billable_hours,
'wage': wage,
'wage_overhead': wage_overhead,
'currency_id': currency_id,
'utilization_rate': utilization_rate,
'overhead': overhead,
'expected_profitability': expected_profitability,
'efficiency_factor': efficiency_factor,
}
@api.model
def _get_employee_wage_and_currency(self, employee, target_date):
"""
Get employee's wage and currency from their contract for a specific date
Always take the contract that is active on the target date
If no active contract on that date, return 0 with company currency
"""
if not employee:
return 0.0, self.env.company.currency_id.id
# Get all contracts for the employee
contracts = self.env['hr.contract'].search([
('employee_id', '=', employee.id),
('state', '=', 'open')
], order='date_start desc')
if not contracts:
return 0.0, self.env.company.currency_id.id
# Find the contract that is active on the target date
for contract in contracts:
contract_start = contract.date_start
# Check if contract is active on target date
if contract.date_end:
# Contract has end date
if contract_start <= target_date <= contract.date_end:
wage = contract.wage or 0.0
currency_id = contract.currency_id.id if contract.currency_id else self.env.company.currency_id.id
return wage, currency_id
else:
# Contract has no end date, it's active for any date after start
if contract_start <= target_date:
wage = contract.wage or 0.0
currency_id = contract.currency_id.id if contract.currency_id else self.env.company.currency_id.id
return wage, currency_id
# If no contract is active on the target date, return defaults
return 0.0, self.env.company.currency_id.id
def _calculate_available_hours(self, employee, start_date, end_date):
"""
Calculate available hours considering holidays and time off
"""
if not employee.resource_calendar_id:
return 0.0
# Convert dates to datetime for the method
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
# Get working hours from calendar
work_hours_data = employee._list_work_time_per_day(start_datetime, end_datetime)
total_work_hours = sum(hours for _, hours in work_hours_data[employee.id])
# Subtract hours from approved time off
time_off_hours = self._get_time_off_hours(employee, start_date, end_date)
return max(0.0, total_work_hours - time_off_hours)
def _get_time_off_hours(self, employee, start_date, end_date):
"""
Get hours from approved time off requests
"""
# Get approved time off requests
leaves = self.env['hr.leave'].search([
('employee_id', '=', employee.id),
('state', '=', 'validate'),
('date_from', '<=', end_date),
('date_to', '>=', start_date),
])
total_hours = 0.0
for leave in leaves:
# Calculate overlap with the month
overlap_start = max(leave.date_from.date(), start_date)
overlap_end = min(leave.date_to.date(), end_date)
if overlap_start <= overlap_end:
# Get hours for the overlap period
overlap_start_dt = datetime.combine(overlap_start, datetime.min.time())
overlap_end_dt = datetime.combine(overlap_end, datetime.max.time())
work_hours_data = employee._list_work_time_per_day(overlap_start_dt, overlap_end_dt)
total_hours += sum(hours for _, hours in work_hours_data[employee.id])
return total_hours
def _calculate_planned_hours(self, employee, start_date, end_date):
"""
Calculate planned hours from planning module
"""
# Get planning slots for the employee that overlap with the date range
# This is the same logic as Odoo Planning's Gantt chart
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
planning_slots = self.env['planning.slot'].search([
('employee_id', '=', employee.id),
('start_datetime', '<=', end_datetime),
('end_datetime', '>=', start_datetime),
# Removed state restriction to include all planning slots regardless of state
])
total_planned = 0.0
total_billable = 0.0
total_non_billable = 0.0
# Get working intervals for the resource and company calendar
# This is the same approach as Odoo Planning's Gantt chart
start_utc = pytz.utc.localize(start_datetime)
end_utc = pytz.utc.localize(end_datetime)
if employee.resource_id:
resource_work_intervals, calendar_work_intervals = employee.resource_id._get_valid_work_intervals(
start_utc, end_utc, calendars=employee.company_id.resource_calendar_id
)
else:
# Fallback to company calendar if no resource
calendar_work_intervals = {employee.company_id.resource_calendar_id.id: []}
resource_work_intervals = {}
for slot in planning_slots:
# Use the same logic as Odoo Planning's Gantt chart
# Calculate duration only within the specified period
hours = slot._get_duration_over_period(
start_utc, end_utc,
resource_work_intervals, calendar_work_intervals, has_allocated_hours=False
)
# Check if the slot is linked to a billable project
if slot.project_id and slot.project_id.allow_billable:
total_billable += hours
else:
total_non_billable += hours
total_planned += hours
return total_planned, total_billable, total_non_billable
def _calculate_actual_hours(self, employee, start_date, end_date):
"""
Calculate actual hours from timesheets
"""
# Get timesheets for the employee in the date range (excluding time off)
timesheets = self.env['account.analytic.line'].search([
('employee_id', '=', employee.id),
('date', '>=', start_date),
('date', '<=', end_date),
('project_id', '!=', False), # Only project timesheets
('holiday_id', '=', False), # Exclude time off timesheets
])
total_billable = 0.0
total_non_billable = 0.0
for timesheet in timesheets:
hours = timesheet.unit_amount or 0.0
# Additional filter: exclude time off tasks
if timesheet.task_id and timesheet.task_id.name and 'time off' in timesheet.task_id.name.lower():
continue # Skip time off tasks
# Additional filter: exclude time off from name
if timesheet.name and 'tiempo personal' in timesheet.name.lower():
continue # Skip personal time entries
# Check if the project is billable
if timesheet.project_id and timesheet.project_id.allow_billable:
total_billable += hours
else:
total_non_billable += hours
return total_billable, total_non_billable
@api.model
def calculate_efficiency_for_period(self, start_month=None, end_month=None):
"""
Calculate efficiency for all employees for a given period
"""
if not start_month:
# Default: last 3 months and next 6 months
current_date = date.today()
start_month = (current_date - relativedelta(months=3)).strftime('%Y-%m')
end_month = (current_date + relativedelta(months=6)).strftime('%Y-%m')
# Generate list of months
months = self._generate_month_list(start_month, end_month)
# Get all active employees of type 'employee'
employees = self.env['hr.employee'].search([
('active', '=', True),
('employee_type', '=', 'employee')
])
created_records = []
for employee in employees:
for month in months:
# Calculate efficiency data
efficiency_data = self._calculate_employee_efficiency(employee, month)
# Check if there are changes compared to the latest record
latest_record = self.search([
('employee_id', '=', employee.id),
('month_year', '=', month),
('company_id', '=', employee.company_id.id),
], order='calculation_date desc', limit=1)
has_changes = False
if latest_record:
# Compare current data with latest record
fields_to_compare = [
'available_hours', 'planned_hours', 'planned_billable_hours',
'planned_non_billable_hours', 'actual_billable_hours',
'actual_non_billable_hours'
]
# Check basic fields
for field in fields_to_compare:
if abs(efficiency_data[field] - latest_record[field]) > 0.01: # Tolerance for floating point
has_changes = True
break
# If no changes in basic fields, check dynamic indicators
if not has_changes:
# Get all active indicators
active_indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
for indicator in active_indicators:
field_name = self._get_indicator_field_name(indicator.name)
# Calculate current indicator value
current_value = indicator.evaluate_formula(efficiency_data)
# Get previous indicator value from record
previous_value = getattr(latest_record, field_name, None)
# Compare values with tolerance
if (current_value is not None and previous_value is not None and
abs(current_value - previous_value) > 0.01):
has_changes = True
break
elif current_value != previous_value: # Handle None vs value cases
has_changes = True
break
else:
# No previous record exists, so this is a change
has_changes = True
# Only create new record if there are changes
if has_changes:
# Archive existing records for this employee and month
existing_records = self.search([
('employee_id', '=', employee.id),
('month_year', '=', month),
('company_id', '=', employee.company_id.id),
])
if existing_records:
existing_records.write({'active': False})
# Create new record
new_record = self.create(efficiency_data)
created_records.append(new_record)
# Calculate indicators for all newly created records
if created_records:
# Convert list to recordset
created_recordset = self.browse([record.id for record in created_records])
created_recordset._calculate_all_indicators()
return {
'created': len(created_records),
'updated': 0, # No longer updating existing records
'total_processed': len(created_records),
}
@api.model
def _init_dynamic_system(self):
"""
Initialize dynamic fields and views when module is installed
"""
import logging
_logger = logging.getLogger(__name__)
try:
_logger.info("Starting dynamic system initialization...")
# Step 1: Create dynamic field records for existing indicators
self._create_default_dynamic_fields()
_logger.info("Default dynamic fields created successfully")
# Step 2: Dynamic fields are created via hr.efficiency.dynamic.field model
_logger.info("Dynamic fields creation handled by hr.efficiency.dynamic.field model")
# Step 3: Update views
self._update_views_with_dynamic_fields()
_logger.info("Views updated successfully")
# Step 4: Force recompute of existing records
records = self.search([])
if records:
records._invalidate_cache()
_logger.info(f"Invalidated cache for {len(records)} records")
_logger.info("Dynamic system initialization completed successfully")
except Exception as e:
_logger.error(f"Error during dynamic system initialization: {str(e)}")
raise
@api.model
def _post_init_hook(self):
"""
Post-install hook mejorado que sincroniza indicadores desde XML
Se ejecuta en cada carga del módulo para mantener sincronización
"""
import logging
_logger = logging.getLogger(__name__)
try:
_logger.info("🚀 Ejecutando _post_init_hook mejorado para hr_efficiency")
# 1. Sincronizar indicadores desde XML con campos dinámicos
sync_result = self.env['hr.efficiency.indicator'].sync_indicators_from_xml()
# 2. Aplicar valores por defecto a registros existentes
self._apply_default_values_to_existing_records()
# 3. Log del resultado
_logger.info("✅ _post_init_hook completado exitosamente")
_logger.info(f"📊 Resultado: {sync_result}")
except Exception as e:
_logger.error(f"❌ Error en _post_init_hook: {str(e)}")
# No hacer raise para evitar que falle la carga del módulo
_logger.warning("⚠️ Continuando carga del módulo a pesar del error en _post_init_hook")
@api.model
def create(self, vals_list):
"""
Override create to calculate indicators when records are created
"""
records = super().create(vals_list)
# Calculate indicators for newly created records
if records:
records._calculate_all_indicators()
return records
def write(self, vals):
"""
Override write to recalculate indicators when records are updated
"""
result = super().write(vals)
# Recalculate indicators for updated records
self._calculate_all_indicators()
return result
@api.model
def _register_hook(self):
"""
Called when the registry is loaded.
Update views with dynamic fields on every module load/restart.
"""
super()._register_hook()
try:
# Update views with current dynamic fields on every module load
self._update_views_with_dynamic_fields()
except Exception as e:
# Log error but don't prevent module loading
import logging
_logger = logging.getLogger(__name__)
_logger.warning(f"Could not update dynamic views on module load: {str(e)}")
@api.model
def _update_views_with_dynamic_fields(self):
"""
Update inherited views to include dynamic fields after module is loaded
"""
import logging
_logger = logging.getLogger(__name__)
try:
# Get active indicators ordered by sequence (consistent with other parts)
active_indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
fields_to_display = []
for indicator in active_indicators:
field_name = self._get_indicator_field_name(indicator.name)
# Check if this indicator has a manual field
manual_field = self.env['ir.model.fields'].search([
('model', '=', 'hr.efficiency'),
('state', '=', 'manual'),
('ttype', '=', 'float'),
('name', '=', field_name),
], limit=1)
if manual_field:
# Indicator with manual field
fields_to_display.append({
'name': field_name,
'field_description': indicator.name,
'indicator': indicator
})
else:
# Create manual field for this indicator
_logger.info(f"Creating manual field for indicator '{indicator.name}'")
self.env['hr.efficiency.indicator']._create_dynamic_field(indicator)
# Add to display list after creation
fields_to_display.append({
'name': field_name,
'field_description': indicator.name,
'indicator': indicator
})
_logger.info(f"Found {len(fields_to_display)} fields to add to views")
# Build dynamic fields XML for list view
dynamic_fields_xml = ''
for field_info in fields_to_display:
# Determine widget based on indicator type
indicator = field_info['indicator']
widget_map = {
'percentage': 'percentage', # Changed back to 'percentage' to show % symbol
'hours': 'float_time',
'currency': 'monetary',
'number': 'float'
}
widget = widget_map.get(indicator.indicator_type, 'float') if indicator else 'float'
field_xml = f''
dynamic_fields_xml += field_xml
# Update inherited list view
inherited_list_view = self.env.ref('hr_efficiency.view_hr_efficiency_list_inherited', raise_if_not_found=False)
if inherited_list_view:
if dynamic_fields_xml:
new_arch = f"""
{dynamic_fields_xml}
"""
else:
# If no dynamic fields, remove any existing dynamic fields from the view
new_arch = """
"""
inherited_list_view.write({'arch': new_arch})
_logger.info(f"Updated inherited list view with {len(fields_to_display)} dynamic fields")
# Build dynamic fields XML for form view
form_dynamic_fields_xml = ''
for field_info in fields_to_display:
# Determine widget based on indicator type
indicator = field_info['indicator']
widget_map = {
'percentage': 'badge',
'hours': 'float_time',
'currency': 'monetary',
'number': 'float'
}
widget = widget_map.get(indicator.indicator_type, 'badge') if indicator else 'badge'
field_xml = f''
form_dynamic_fields_xml += field_xml
# Update inherited form view
inherited_form_view = self.env.ref('hr_efficiency.view_hr_efficiency_form_inherited', raise_if_not_found=False)
if inherited_form_view:
if form_dynamic_fields_xml:
new_form_arch = f"""
{form_dynamic_fields_xml}
"""
else:
# If no dynamic fields, remove any existing dynamic fields from the view
new_form_arch = """
"""
inherited_form_view.write({'arch': new_form_arch})
_logger.info("Updated inherited form view with dynamic fields")
except Exception as e:
_logger.error(f"Error updating views with dynamic fields: {str(e)}")
raise
def _generate_month_list(self, start_month, end_month):
"""
Generate list of months between start_month and end_month (inclusive)
"""
months = []
# Convert date objects to datetime if needed
if isinstance(start_month, date):
start_month = start_month.strftime('%Y-%m')
if isinstance(end_month, date):
end_month = end_month.strftime('%Y-%m')
current = datetime.strptime(start_month, '%Y-%m')
end = datetime.strptime(end_month, '%Y-%m')
while current <= end:
months.append(current.strftime('%Y-%m'))
current = current + relativedelta(months=1)
return months
@api.model
def _cron_calculate_efficiency(self):
"""
Cron job to automatically calculate efficiency
"""
self.calculate_efficiency_for_period()
self._update_dynamic_filter_labels()
@api.model
def _update_dynamic_filter_labels(self):
"""
Update dynamic filter labels based on current date
"""
labels = self._get_dynamic_month_labels()
# Update filter labels
filter_mapping = {
'filter_two_months_ago': labels['two_months_ago'],
'filter_last_month': labels['last_month'],
'filter_current_month': labels['current_month'],
'filter_next_month': labels['next_month'],
'filter_two_months_ahead': labels['two_months_ahead'],
}
for filter_name, label in filter_mapping.items():
try:
filter_record = self.env.ref(f'hr_efficiency.{filter_name}', raise_if_not_found=False)
if filter_record:
filter_record.write({'name': label})
except Exception as e:
_logger.warning(f"Could not update filter {filter_name}: {str(e)}")
@api.model
def _get_month_filter_options(self):
"""
Get dynamic month filter options for search view
Returns a list of tuples (month_year, display_name) for the last 2, current, and next 2 months
"""
current_date = date.today()
months = []
# Last 2 months
for i in range(2, 0, -1):
month_date = current_date - relativedelta(months=i)
month_year = month_date.strftime('%Y-%m')
month_name = month_date.strftime('%B %Y') # e.g., "August 2024"
months.append((month_year, month_name))
# Current month
current_month_year = current_date.strftime('%Y-%m')
current_month_name = current_date.strftime('%B %Y')
months.append((current_month_year, current_month_name))
# Next 2 months
for i in range(1, 3):
month_date = current_date + relativedelta(months=i)
month_year = month_date.strftime('%Y-%m')
month_name = month_date.strftime('%B %Y')
months.append((month_year, month_name))
return months
@api.model
def _get_dynamic_month_labels(self):
"""
Get dynamic month labels for filters
Returns a dictionary with month labels like "Mes 1", "Mes 2", "Mes 3 << actual", etc.
"""
current_date = date.today()
labels = {}
# 2 months ago
month_2_ago = current_date - relativedelta(months=2)
labels['two_months_ago'] = f"Mes {month_2_ago.strftime('%m')}"
# 1 month ago
month_1_ago = current_date - relativedelta(months=1)
labels['last_month'] = f"Mes {month_1_ago.strftime('%m')}"
# Current month
labels['current_month'] = f"Mes {current_date.strftime('%m')} << actual"
# Next month
month_1_ahead = current_date + relativedelta(months=1)
labels['next_month'] = f"Mes {month_1_ahead.strftime('%m')}"
# 2 months ahead
month_2_ahead = current_date + relativedelta(months=2)
labels['two_months_ahead'] = f"Mes {month_2_ahead.strftime('%m')}"
return labels
def _count_working_days(self, start_date, end_date, employee=None):
"""
Count working days between two dates considering employee calendar
"""
working_days = 0
current_date = start_date
while current_date <= end_date:
# Check if it's a working day according to employee calendar
if self._is_working_day(current_date, employee):
working_days += 1
current_date += relativedelta(days=1)
return working_days
def _is_working_day(self, check_date, employee=None):
"""
Check if a date is a working day considering employee calendar
"""
if not employee or not employee.resource_calendar_id:
# Fallback to basic weekday check
return check_date.weekday() < 5
try:
# Convert date to datetime for calendar check
check_datetime = datetime.combine(check_date, datetime.min.time())
# Check if the day is a working day according to employee calendar
working_hours = employee.resource_calendar_id._list_work_time_per_day(
check_datetime, check_datetime, compute_leaves=True
)
# If there are working hours, it's a working day
return bool(working_hours)
except Exception:
# Fallback to basic weekday check if there's any error
return check_date.weekday() < 5
@api.model
def _apply_default_values_to_existing_records(self):
"""Apply default values to existing employee and company records"""
import logging
_logger = logging.getLogger(__name__)
try:
# Update employees with utilization_rate = 100.0 if not set
employees_to_update = self.env['hr.employee'].search([
('utilization_rate', '=', 0.0)
])
if employees_to_update:
employees_to_update.write({'utilization_rate': 100.0})
_logger.info(f"Updated {len(employees_to_update)} employees with default utilization_rate = 100.0%")
# Update companies with overhead = 40.0 if not set
companies_to_update = self.env['res.company'].search([
('overhead', '=', 0.0)
])
if companies_to_update:
companies_to_update.write({'overhead': 40.0})
_logger.info(f"Updated {len(companies_to_update)} companies with default overhead = 40.0%")
_logger.info("Default values applied successfully to existing records")
except Exception as e:
_logger.error(f"Error applying default values to existing records: {str(e)}")
# Don't raise the exception to avoid breaking the installation