"""Write API табеля: time-entries, absences.""" from __future__ import annotations from datetime import date from typing import Annotated, Any import pymysql from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from app.labor import _column_lookup, _prefixed_col, _resolve_table from app.labor_cache import invalidate_time_cache from app.labor_day import get_work_hours_by_date from app.labor_identity import ( ensure_emp_exists, parse_acting_emp_id, resolve_target_emp_id, ) from app.labor_permissions import can_write_absence, can_write_time_entry from app.labor_time_logic import ( apply_daily_limits, normalize_time_value, project_day_info, sum_day_totals, ) from app.main import _quote_ident, _table_columns, get_conn, require_api_key from app.merakomis_schema import TIME_ABSENCE_TABLE, TIME_TABLE router = APIRouter() class TimeEntryWrite(BaseModel): emp_id: int | None = Field(None, ge=1) project_id: int = Field(..., ge=1) date: str time: float = Field(..., ge=0, le=24) over: int = Field(0, ge=0, le=1) class AbsenceWrite(BaseModel): emp_id: int | None = Field(None, ge=1) project_id: int | None = Field(None, ge=0) type: int = Field(0, ge=0) dates: list[str] = Field(..., min_length=1) class AbsenceRangeWrite(BaseModel): emp_id: int = Field(..., ge=1) absence_id: int = Field(0, ge=0) begin: str end: str def _parse_iso_date(value: str, name: str = "date") -> date: try: return date.fromisoformat(value.strip()) except ValueError as e: raise HTTPException( status_code=400, detail={"code": "invalid_date", "message": f"{name}: ожидается YYYY-MM-DD"}, ) from e def _time_cols(cur, db: str) -> dict[str, str]: table = _resolve_table(cur, db, TIME_TABLE) cols = _table_columns(cur, db, table) lut = _column_lookup(cols) mapping = {} for f in ("id", "emp", "project", "date", "duration", "is_over", "portal", "account"): c = _prefixed_col(lut, TIME_TABLE, f) if c: mapping[f] = c return {"table": table, **mapping} def _fetch_day_sums(cur, db: str, emp_id: int, day: date) -> tuple[float, float]: cols = _time_cols(cur, db) tq = _quote_ident(cols["table"]) cur.execute( f""" SELECT {_quote_ident(cols['duration'])} AS cc, {_quote_ident(cols['is_over'])} AS is_over FROM {tq} WHERE {_quote_ident(cols['emp'])} = %s AND {_quote_ident(cols['date'])} = %s """, (emp_id, day.isoformat()), ) return sum_day_totals(cur.fetchall()) def _fetch_project_day_rows( cur, db: str, emp_id: int, project_id: int, day: date ) -> list[dict]: cols = _time_cols(cur, db) tq = _quote_ident(cols["table"]) cur.execute( f""" SELECT {_quote_ident(cols['duration'])} AS cc, {_quote_ident(cols['is_over'])} AS is_over FROM {tq} WHERE {_quote_ident(cols['emp'])} = %s AND {_quote_ident(cols['project'])} = %s AND {_quote_ident(cols['date'])} = %s """, (emp_id, project_id, day.isoformat()), ) return cur.fetchall() def _upsert_time_entry( cur, db: str, emp_id: int, project_id: int, day: date, duration: float, is_over: int, ) -> int | None: cols = _time_cols(cur, db) tq = _quote_ident(cols["table"]) if duration <= 0: cur.execute( f""" DELETE FROM {tq} WHERE {_quote_ident(cols['emp'])} = %s AND {_quote_ident(cols['project'])} = %s AND {_quote_ident(cols['date'])} = %s AND {_quote_ident(cols['is_over'])} = %s """, (emp_id, project_id, day.isoformat(), is_over), ) return None portal_col = cols.get("portal") account_col = cols.get("account") ins_cols = [cols["emp"], cols["project"], cols["date"], cols["duration"], cols["is_over"]] vals = [emp_id, project_id, day.isoformat(), duration, is_over] if portal_col: ins_cols.append(portal_col) vals.append(0) if account_col: ins_cols.append(account_col) vals.append(0) placeholders = ", ".join(["%s"] * len(vals)) col_sql = ", ".join(_quote_ident(c) for c in ins_cols) cur.execute( f""" INSERT INTO {tq} ({col_sql}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {_quote_ident(cols['duration'])} = VALUES({_quote_ident(cols['duration'])}) """, tuple(vals), ) cur.execute( f""" SELECT {_quote_ident(cols['id'])} AS id FROM {tq} WHERE {_quote_ident(cols['emp'])} = %s AND {_quote_ident(cols['project'])} = %s AND {_quote_ident(cols['date'])} = %s AND {_quote_ident(cols['is_over'])} = %s LIMIT 1 """, (emp_id, project_id, day.isoformat(), is_over), ) row = cur.fetchone() return int(row["id"]) if row and row.get("id") else None def _absence_cols(cur, db: str) -> dict[str, str]: table = _resolve_table(cur, db, TIME_ABSENCE_TABLE) cols = _table_columns(cur, db, table) lut = _column_lookup(cols) out = {"table": table} for f in ("id", "emp", "date", "absence", "portal", "account"): c = _prefixed_col(lut, TIME_ABSENCE_TABLE, f) if c: out[f] = c return out @router.put("/api/time-entries") def put_time_entries( body: TimeEntryWrite, _auth: Annotated[None, Depends(require_api_key)], acting_emp_id: Annotated[int, Depends(parse_acting_emp_id)], ) -> dict[str, Any]: target_emp_id = resolve_target_emp_id(acting_emp_id, body.emp_id) day = _parse_iso_date(body.date) is_over = int(body.over) try: with get_conn() as c: with c.cursor() as cur: from app.main import _db_name db = _db_name(cur) ensure_emp_exists(cur, db, acting_emp_id, kind="acting") ensure_emp_exists(cur, db, target_emp_id, kind="target") if not can_write_time_entry( cur, db, acting_emp_id, target_emp_id, body.project_id ): raise HTTPException( status_code=403, detail={ "code": "forbidden", "message": "Нет прав на запись часов", }, ) time_val = normalize_time_value(float(body.time)) total_work, total_over = _fetch_day_sums(cur, db, target_emp_id, day) proj_rows = _fetch_project_day_rows( cur, db, target_emp_id, body.project_id, day ) info = project_day_info(proj_rows) max_work = get_work_hours_by_date(cur, db, day) max_over = 24.0 - max_work time_val, limits = apply_daily_limits( time_val, bool(is_over), total_work, total_over, info["hours"], info["over"], max_work, max_over, ) row_id = _upsert_time_entry( cur, db, target_emp_id, body.project_id, day, time_val, is_over, ) cache_deleted = invalidate_time_cache( cur, db, day, target_emp_id, body.project_id ) info = project_day_info( _fetch_project_day_rows( cur, db, target_emp_id, body.project_id, day ) ) c.commit() return { "ok": True, "acting_emp_id": acting_emp_id, "target_emp_id": target_emp_id, "id": row_id, "emp_id": target_emp_id, "project_id": body.project_id, "date": day.isoformat(), "duration": time_val, "is_over": is_over, "info": info, "limits": limits, "cache_rows_deleted": cache_deleted, } except HTTPException: raise except pymysql.Error as e: raise HTTPException(status_code=500, detail={"code": "db_error", "message": str(e)}) from e @router.put("/api/absences") def put_absences( body: AbsenceWrite, _auth: Annotated[None, Depends(require_api_key)], acting_emp_id: Annotated[int, Depends(parse_acting_emp_id)], ) -> dict[str, Any]: target_emp_id = resolve_target_emp_id(acting_emp_id, body.emp_id) results: list[dict[str, Any]] = [] try: with get_conn() as c: with c.cursor() as cur: from app.main import _db_name db = _db_name(cur) ensure_emp_exists(cur, db, acting_emp_id, kind="acting") ensure_emp_exists(cur, db, target_emp_id, kind="target") if not can_write_absence(cur, db, acting_emp_id, target_emp_id): raise HTTPException( status_code=403, detail={"code": "forbidden", "message": "Нет прав"}, ) acols = _absence_cols(cur, db) tcols = _time_cols(cur, db) aq = _quote_ident(acols["table"]) tq = _quote_ident(tcols["table"]) absence_type = int(body.type) for ds in body.dates: day = _parse_iso_date(ds, "dates") if absence_type == 0: cur.execute( f""" DELETE FROM {aq} WHERE {_quote_ident(acols['emp'])} = %s AND {_quote_ident(acols['date'])} = %s """, (target_emp_id, day.isoformat()), ) else: ins = [acols["emp"], acols["date"], acols["absence"]] vals: list[Any] = [target_emp_id, day.isoformat(), absence_type] if acols.get("portal"): ins.append(acols["portal"]) vals.append(0) if acols.get("account"): ins.append(acols["account"]) vals.append(0) col_sql = ", ".join(_quote_ident(x) for x in ins) ph = ", ".join(["%s"] * len(vals)) cur.execute( f""" INSERT INTO {aq} ({col_sql}) VALUES ({ph}) ON DUPLICATE KEY UPDATE {_quote_ident(acols['absence'])} = VALUES({_quote_ident(acols['absence'])}) """, tuple(vals), ) cur.execute( f""" DELETE FROM {tq} WHERE {_quote_ident(tcols['emp'])} = %s AND {_quote_ident(tcols['date'])} = %s AND {_quote_ident(tcols['is_over'])} = 0 """, (target_emp_id, day.isoformat()), ) deleted = invalidate_time_cache(cur, db, day, target_emp_id, None) results.append( { "date": day.isoformat(), "absence_type_id": absence_type, "cache_rows_deleted": deleted, } ) c.commit() return { "ok": True, "acting_emp_id": acting_emp_id, "target_emp_id": target_emp_id, "emp_id": target_emp_id, "results": results, } except HTTPException: raise except pymysql.Error as e: raise HTTPException(status_code=500, detail={"code": "db_error", "message": str(e)}) from e @router.put("/api/absences/range") def put_absences_range( body: AbsenceRangeWrite, _auth: Annotated[None, Depends(require_api_key)], acting_emp_id: Annotated[int, Depends(parse_acting_emp_id)], ) -> dict[str, Any]: begin = _parse_iso_date(body.begin, "begin") end = _parse_iso_date(body.end, "end") if end < begin: raise HTTPException( status_code=400, detail={ "code": "date_range_invalid", "message": "Дата окончания должна быть больше, чем дата начала", }, ) from app.labor_day import iter_dates dates = list(iter_dates(begin, end)) try: with get_conn() as c: with c.cursor() as cur: from app.main import _db_name db = _db_name(cur) ensure_emp_exists(cur, db, acting_emp_id, kind="acting") ensure_emp_exists(cur, db, body.emp_id, kind="target") if not can_write_absence(cur, db, acting_emp_id, body.emp_id): raise HTTPException( status_code=403, detail={"code": "forbidden", "message": "Нет прав"}, ) acols = _absence_cols(cur, db) tcols = _time_cols(cur, db) aq = _quote_ident(acols["table"]) tq = _quote_ident(tcols["table"]) iso_dates = [d.isoformat() for d in dates] ph = ", ".join(["%s"] * len(iso_dates)) cur.execute( f""" DELETE FROM {aq} WHERE {_quote_ident(acols['emp'])} = %s AND {_quote_ident(acols['date'])} IN ({ph}) """, (body.emp_id, *iso_dates), ) cur.execute( f""" DELETE FROM {tq} WHERE {_quote_ident(tcols['emp'])} = %s AND {_quote_ident(tcols['date'])} IN ({ph}) AND {_quote_ident(tcols['is_over'])} = 0 """, (body.emp_id, *iso_dates), ) if body.absence_id: for d in dates: ins = [acols["emp"], acols["date"], acols["absence"]] vals: list[Any] = [body.emp_id, d.isoformat(), body.absence_id] if acols.get("portal"): ins.append(acols["portal"]) vals.append(0) if acols.get("account"): ins.append(acols["account"]) vals.append(0) col_sql = ", ".join(_quote_ident(x) for x in ins) ph2 = ", ".join(["%s"] * len(vals)) cur.execute( f"INSERT INTO {aq} ({col_sql}) VALUES ({ph2})", tuple(vals), ) for d in dates: invalidate_time_cache(cur, db, d, body.emp_id, None) c.commit() return { "ok": True, "acting_emp_id": acting_emp_id, "target_emp_id": body.emp_id, "e": 0, "m": "Успешно сохранено", "dates_count": len(dates), } except HTTPException: raise except pymysql.Error as e: raise HTTPException(status_code=500, detail={"code": "db_error", "message": str(e)}) from e