# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from datetime import datetime, date
from dateutil.relativedelta import relativedelta
from odoo import api, fields, models, _
from odoo.exceptions import UserError
from odoo.tools import float_round
class HrEfficiency(models.Model):
_name = 'hr.efficiency'
_description = 'Employee Efficiency'
_order = 'month_year desc, employee_id'
_rec_name = 'display_name'
_active_name = 'active'
active = fields.Boolean('Active', default=True, help='Technical field to archive old records')
month_year = fields.Char('Month Year', required=True, index=True, help="Format: YYYY-MM")
employee_id = fields.Many2one('hr.employee', 'Employee', required=True, index=True)
company_id = fields.Many2one('res.company', 'Company', required=True, default=lambda self: self.env.company)
calculation_date = fields.Datetime('Calculation Date', default=fields.Datetime.now, help='When this calculation was performed')
# Available hours (considering holidays and time off)
available_hours = fields.Float('Available Hours', digits=(10, 2), help="Total available hours considering holidays and time off")
# Planned hours
planned_hours = fields.Float('Planned Hours', digits=(10, 2), help="Total hours planned in planning module")
planned_billable_hours = fields.Float('Planned Billable Hours', digits=(10, 2), help="Hours planned on billable projects")
planned_non_billable_hours = fields.Float('Planned Non-Billable Hours', digits=(10, 2), help="Hours planned on non-billable projects")
# Actual hours
actual_billable_hours = fields.Float('Actual Billable Hours', digits=(10, 2), help="Hours actually worked on billable projects")
actual_non_billable_hours = fields.Float('Actual Non-Billable Hours', digits=(10, 2), help="Hours actually worked on non-billable projects")
# Computed fields
total_actual_hours = fields.Float('Total Actual Hours', compute='_compute_total_actual_hours', store=True)
# Dynamic indicator fields (will be created automatically)
# These fields are managed dynamically based on hr.efficiency.indicator records
# Overall efficiency (always present)
overall_efficiency = fields.Float('Overall Efficiency (%)', compute='_compute_indicators', store=True, help='Overall efficiency based on configured indicators')
display_name = fields.Char('Display Name', compute='_compute_display_name', store=True)
# Note: Removed unique constraint to allow historical tracking
# Multiple records can exist for the same employee and month
@api.depends('actual_billable_hours', 'actual_non_billable_hours')
def _compute_total_actual_hours(self):
for record in self:
record.total_actual_hours = record.actual_billable_hours + record.actual_non_billable_hours
@api.depends('available_hours', 'planned_hours', 'total_actual_hours')
def _compute_indicators(self):
for record in self:
# Get all manual fields for this model
manual_fields = self.env['ir.model.fields'].search([
('model', '=', 'hr.efficiency'),
('state', '=', 'manual'),
('ttype', '=', 'float')
])
# Prepare efficiency data
efficiency_data = {
'available_hours': record.available_hours,
'planned_hours': record.planned_hours,
'planned_billable_hours': record.planned_billable_hours,
'planned_non_billable_hours': record.planned_non_billable_hours,
'actual_billable_hours': record.actual_billable_hours,
'actual_non_billable_hours': record.actual_non_billable_hours,
}
# Calculate all indicators dynamically
for field in manual_fields:
# Find the corresponding indicator
indicator = self.env['hr.efficiency.indicator'].search([
('name', 'ilike', field.field_description)
], limit=1)
if indicator:
# Calculate indicator value using the indicator formula
indicator_value = indicator.evaluate_formula(efficiency_data)
# Set the value on the record
if hasattr(record, field.name):
setattr(record, field.name, float_round(indicator_value, 2))
# Overall efficiency based on configured indicators
record.overall_efficiency = self._calculate_overall_efficiency(record)
def _get_indicator_field_name(self, indicator_name):
"""
Convert indicator name to valid field name
"""
# Remove special characters and convert to lowercase
field_name = indicator_name.lower()
field_name = field_name.replace(' ', '_').replace('-', '_').replace('(', '').replace(')', '')
field_name = field_name.replace('í', 'i').replace('á', 'a').replace('é', 'e').replace('ó', 'o').replace('ú', 'u')
field_name = field_name.replace('ñ', 'n')
# Ensure it starts with x_ for manual fields
if not field_name.startswith('x_'):
field_name = 'x_' + field_name
return field_name
@api.model
def _create_default_dynamic_fields(self):
"""
Create default dynamic field records for existing indicators
"""
# Get all active indicators
indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
for indicator in indicators:
# Check if field already exists in ir.model.fields
field_name = indicator.name.lower().replace(' ', '_')
field_name = field_name.replace('í', 'i').replace('á', 'a').replace('é', 'e').replace('ó', 'o').replace('ú', 'u')
field_name = field_name.replace('ñ', 'n')
# Ensure it starts with x_ for manual fields
if not field_name.startswith('x_'):
field_name = 'x_' + field_name
existing_field = self.env['ir.model.fields'].search([
('model', '=', 'hr.efficiency'),
('name', '=', field_name)
], limit=1)
if not existing_field:
# Create field in ir.model.fields (like Studio does)
self.env['ir.model.fields'].create({
'name': field_name,
'model': 'hr.efficiency',
'model_id': self.env['ir.model'].search([('model', '=', 'hr.efficiency')], limit=1).id,
'ttype': 'float',
'field_description': indicator.name,
'state': 'manual', # This is the key - it makes it a custom field
'store': True,
'compute': '_compute_indicators',
})
def _calculate_overall_efficiency(self, record):
"""
Calculate overall efficiency based on configured indicators
"""
indicators = self.env['hr.efficiency.indicator'].search([('active', '=', True)], order='sequence')
if not indicators:
# Default calculation if no indicators configured
return 0.0
total_weight = 0
weighted_sum = 0
efficiency_data = {
'available_hours': record.available_hours,
'planned_hours': record.planned_hours,
'planned_billable_hours': record.planned_billable_hours,
'planned_non_billable_hours': record.planned_non_billable_hours,
'actual_billable_hours': record.actual_billable_hours,
'actual_non_billable_hours': record.actual_non_billable_hours,
}
for indicator in indicators:
if indicator.weight > 0:
indicator_value = indicator.evaluate_formula(efficiency_data)
weighted_sum += indicator_value * indicator.weight
total_weight += indicator.weight
if total_weight > 0:
return float_round(weighted_sum / total_weight, 2)
else:
return 0.0
@api.depends('employee_id', 'month_year')
def _compute_display_name(self):
for record in self:
if record.employee_id and record.month_year:
record.display_name = f"{record.employee_id.name} - {record.month_year}"
else:
record.display_name = "New Efficiency Record"
@api.model
def _calculate_employee_efficiency(self, employee, month_year):
"""
Calculate efficiency for a specific employee and month
"""
# Parse month_year (format: YYYY-MM)
try:
year, month = month_year.split('-')
start_date = date(int(year), int(month), 1)
end_date = (start_date + relativedelta(months=1)) - relativedelta(days=1)
except ValueError:
raise UserError(_("Invalid month_year format. Expected: YYYY-MM"))
# Calculate available hours (considering holidays and time off)
available_hours = self._calculate_available_hours(employee, start_date, end_date)
# Calculate planned hours
planned_hours, planned_billable_hours, planned_non_billable_hours = self._calculate_planned_hours(employee, start_date, end_date)
# Calculate actual hours
actual_billable_hours, actual_non_billable_hours = self._calculate_actual_hours(employee, start_date, end_date)
return {
'month_year': month_year,
'employee_id': employee.id,
'company_id': employee.company_id.id,
'available_hours': available_hours,
'planned_hours': planned_hours,
'planned_billable_hours': planned_billable_hours,
'planned_non_billable_hours': planned_non_billable_hours,
'actual_billable_hours': actual_billable_hours,
'actual_non_billable_hours': actual_non_billable_hours,
}
def _calculate_available_hours(self, employee, start_date, end_date):
"""
Calculate available hours considering holidays and time off
"""
if not employee.resource_calendar_id:
return 0.0
# Convert dates to datetime for the method
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
# Get working hours from calendar
work_hours_data = employee._list_work_time_per_day(start_datetime, end_datetime)
total_work_hours = sum(hours for _, hours in work_hours_data[employee.id])
# Subtract hours from approved time off
time_off_hours = self._get_time_off_hours(employee, start_date, end_date)
return max(0.0, total_work_hours - time_off_hours)
def _get_time_off_hours(self, employee, start_date, end_date):
"""
Get hours from approved time off requests
"""
# Get approved time off requests
leaves = self.env['hr.leave'].search([
('employee_id', '=', employee.id),
('state', '=', 'validate'),
('date_from', '<=', end_date),
('date_to', '>=', start_date),
])
total_hours = 0.0
for leave in leaves:
# Calculate overlap with the month
overlap_start = max(leave.date_from.date(), start_date)
overlap_end = min(leave.date_to.date(), end_date)
if overlap_start <= overlap_end:
# Get hours for the overlap period
overlap_start_dt = datetime.combine(overlap_start, datetime.min.time())
overlap_end_dt = datetime.combine(overlap_end, datetime.max.time())
work_hours_data = employee._list_work_time_per_day(overlap_start_dt, overlap_end_dt)
total_hours += sum(hours for _, hours in work_hours_data[employee.id])
return total_hours
def _calculate_planned_hours(self, employee, start_date, end_date):
"""
Calculate planned hours from planning module
"""
# Get planning slots for the employee in the date range
planning_slots = self.env['planning.slot'].search([
('employee_id', '=', employee.id),
('start_datetime', '>=', datetime.combine(start_date, datetime.min.time())),
('end_datetime', '<=', datetime.combine(end_date, datetime.max.time())),
('state', 'in', ['draft', 'published']),
])
total_planned = 0.0
total_billable = 0.0
total_non_billable = 0.0
for slot in planning_slots:
hours = slot.allocated_hours or 0.0
# Check if the slot is linked to a billable project
if slot.project_id and slot.project_id.allow_billable:
total_billable += hours
else:
total_non_billable += hours
total_planned += hours
return total_planned, total_billable, total_non_billable
def _calculate_actual_hours(self, employee, start_date, end_date):
"""
Calculate actual hours from timesheets
"""
# Get timesheets for the employee in the date range (excluding time off)
timesheets = self.env['account.analytic.line'].search([
('employee_id', '=', employee.id),
('date', '>=', start_date),
('date', '<=', end_date),
('project_id', '!=', False), # Only project timesheets
('holiday_id', '=', False), # Exclude time off timesheets
])
total_billable = 0.0
total_non_billable = 0.0
for timesheet in timesheets:
hours = timesheet.unit_amount or 0.0
# Additional filter: exclude time off tasks
if timesheet.task_id and timesheet.task_id.name and 'time off' in timesheet.task_id.name.lower():
continue # Skip time off tasks
# Additional filter: exclude time off from name
if timesheet.name and 'tiempo personal' in timesheet.name.lower():
continue # Skip personal time entries
# Check if the project is billable
if timesheet.project_id and timesheet.project_id.allow_billable:
total_billable += hours
else:
total_non_billable += hours
return total_billable, total_non_billable
@api.model
def calculate_efficiency_for_period(self, start_month=None, end_month=None):
"""
Calculate efficiency for all employees for a given period
"""
if not start_month:
# Default: last 3 months and next 6 months
current_date = date.today()
start_month = (current_date - relativedelta(months=3)).strftime('%Y-%m')
end_month = (current_date + relativedelta(months=6)).strftime('%Y-%m')
# Generate list of months
months = self._generate_month_list(start_month, end_month)
# Get all active employees
employees = self.env['hr.employee'].search([('active', '=', True)])
created_records = []
for employee in employees:
for month in months:
# Calculate efficiency data
efficiency_data = self._calculate_employee_efficiency(employee, month)
# Check if there are changes compared to the latest record
latest_record = self.search([
('employee_id', '=', employee.id),
('month_year', '=', month),
('company_id', '=', employee.company_id.id),
], order='calculation_date desc', limit=1)
has_changes = False
if latest_record:
# Compare current data with latest record
fields_to_compare = [
'available_hours', 'planned_hours', 'planned_billable_hours',
'planned_non_billable_hours', 'actual_billable_hours',
'actual_non_billable_hours'
]
for field in fields_to_compare:
if abs(efficiency_data[field] - latest_record[field]) > 0.01: # Tolerance for floating point
has_changes = True
break
else:
# No previous record exists, so this is a change
has_changes = True
# Only create new record if there are changes
if has_changes:
# Archive existing records for this employee and month
existing_records = self.search([
('employee_id', '=', employee.id),
('month_year', '=', month),
('company_id', '=', employee.company_id.id),
])
if existing_records:
existing_records.write({'active': False})
# Create new record
new_record = self.create(efficiency_data)
created_records.append(new_record)
return {
'created': len(created_records),
'updated': 0, # No longer updating existing records
'total_processed': len(created_records),
}
@api.model
def _init_dynamic_system(self):
"""
Initialize dynamic fields and views when module is installed
"""
import logging
_logger = logging.getLogger(__name__)
try:
_logger.info("Starting dynamic system initialization...")
# Step 1: Create dynamic field records for existing indicators
self._create_default_dynamic_fields()
_logger.info("Default dynamic fields created successfully")
# Step 2: Dynamic fields are created via hr.efficiency.dynamic.field model
_logger.info("Dynamic fields creation handled by hr.efficiency.dynamic.field model")
# Step 3: Update views
self._update_views_with_dynamic_fields()
_logger.info("Views updated successfully")
# Step 4: Force recompute of existing records
records = self.search([])
if records:
records._invalidate_cache()
_logger.info(f"Invalidated cache for {len(records)} records")
_logger.info("Dynamic system initialization completed successfully")
except Exception as e:
_logger.error(f"Error during dynamic system initialization: {str(e)}")
raise
@api.model
def _update_views_with_dynamic_fields(self):
"""
Update inherited views to include dynamic fields after module is loaded
"""
import logging
_logger = logging.getLogger(__name__)
try:
# Get all manual fields for this model (like Studio does)
manual_fields = self.env['ir.model.fields'].search([
('model', '=', 'hr.efficiency'),
('state', '=', 'manual'),
('ttype', '=', 'float')
], order='id')
_logger.info(f"Found {len(manual_fields)} manual fields to add to views")
# Build dynamic fields XML for list view
dynamic_fields_xml = ''
for field in manual_fields:
field_xml = f''
dynamic_fields_xml += field_xml
# Update inherited list view
inherited_list_view = self.env.ref('hr_efficiency.view_hr_efficiency_list_inherited', raise_if_not_found=False)
if inherited_list_view:
arch = inherited_list_view.arch
comment = ''
if comment in arch:
arch = arch.replace(comment, dynamic_fields_xml)
inherited_list_view.write({'arch': arch})
_logger.info(f"Updated inherited list view with {len(manual_fields)} dynamic fields")
else:
_logger.warning("Comment not found in inherited list view")
# Build dynamic fields XML for form view
form_dynamic_fields_xml = ''
for field in manual_fields:
field_xml = f''
form_dynamic_fields_xml += field_xml
# Update inherited form view
inherited_form_view = self.env.ref('hr_efficiency.view_hr_efficiency_form_inherited', raise_if_not_found=False)
if inherited_form_view:
arch = inherited_form_view.arch
comment = ''
if comment in arch:
arch = arch.replace(comment, form_dynamic_fields_xml)
inherited_form_view.write({'arch': arch})
_logger.info(f"Updated inherited form view with dynamic fields")
else:
_logger.warning("Comment not found in inherited form view")
except Exception as e:
_logger.error(f"Error updating views with dynamic fields: {str(e)}")
raise
def _generate_month_list(self, start_month, end_month):
"""
Generate list of months between start_month and end_month (inclusive)
"""
months = []
current = datetime.strptime(start_month, '%Y-%m')
end = datetime.strptime(end_month, '%Y-%m')
while current <= end:
months.append(current.strftime('%Y-%m'))
current = current + relativedelta(months=1)
return months
@api.model
def _cron_calculate_efficiency(self):
"""
Cron job to automatically calculate efficiency
"""
self.calculate_efficiency_for_period()
@api.model
def _get_month_filter_options(self):
"""
Get dynamic month filter options for search view
Returns a list of tuples (month_year, display_name) for the last 2, current, and next 2 months
"""
current_date = date.today()
months = []
# Last 2 months
for i in range(2, 0, -1):
month_date = current_date - relativedelta(months=i)
month_year = month_date.strftime('%Y-%m')
month_name = month_date.strftime('%B %Y') # e.g., "August 2024"
months.append((month_year, month_name))
# Current month
current_month_year = current_date.strftime('%Y-%m')
current_month_name = current_date.strftime('%B %Y')
months.append((current_month_year, current_month_name))
# Next 2 months
for i in range(1, 3):
month_date = current_date + relativedelta(months=i)
month_year = month_date.strftime('%Y-%m')
month_name = month_date.strftime('%B %Y')
months.append((month_year, month_name))
return months