meraproject/services/user-reader/app/labor_write.py
keboss-m 5c21d25d45 Initial commit: Merakomis portal, Docker stack and user-reader API.
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-24 11:04:05 +03:00

457 lines
16 KiB
Python

"""Write API табеля: time-entries, absences."""
from __future__ import annotations
from datetime import date
from typing import Annotated, Any
import pymysql
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from app.labor import _column_lookup, _prefixed_col, _resolve_table
from app.labor_cache import invalidate_time_cache
from app.labor_day import get_work_hours_by_date
from app.labor_identity import (
ensure_emp_exists,
parse_acting_emp_id,
resolve_target_emp_id,
)
from app.labor_permissions import can_write_absence, can_write_time_entry
from app.labor_time_logic import (
apply_daily_limits,
normalize_time_value,
project_day_info,
sum_day_totals,
)
from app.main import _quote_ident, _table_columns, get_conn, require_api_key
from app.merakomis_schema import TIME_ABSENCE_TABLE, TIME_TABLE
router = APIRouter()
class TimeEntryWrite(BaseModel):
emp_id: int | None = Field(None, ge=1)
project_id: int = Field(..., ge=1)
date: str
time: float = Field(..., ge=0, le=24)
over: int = Field(0, ge=0, le=1)
class AbsenceWrite(BaseModel):
emp_id: int | None = Field(None, ge=1)
project_id: int | None = Field(None, ge=0)
type: int = Field(0, ge=0)
dates: list[str] = Field(..., min_length=1)
class AbsenceRangeWrite(BaseModel):
emp_id: int = Field(..., ge=1)
absence_id: int = Field(0, ge=0)
begin: str
end: str
def _parse_iso_date(value: str, name: str = "date") -> date:
try:
return date.fromisoformat(value.strip())
except ValueError as e:
raise HTTPException(
status_code=400,
detail={"code": "invalid_date", "message": f"{name}: ожидается YYYY-MM-DD"},
) from e
def _time_cols(cur, db: str) -> dict[str, str]:
table = _resolve_table(cur, db, TIME_TABLE)
cols = _table_columns(cur, db, table)
lut = _column_lookup(cols)
mapping = {}
for f in ("id", "emp", "project", "date", "duration", "is_over", "portal", "account"):
c = _prefixed_col(lut, TIME_TABLE, f)
if c:
mapping[f] = c
return {"table": table, **mapping}
def _fetch_day_sums(cur, db: str, emp_id: int, day: date) -> tuple[float, float]:
cols = _time_cols(cur, db)
tq = _quote_ident(cols["table"])
cur.execute(
f"""
SELECT {_quote_ident(cols['duration'])} AS cc,
{_quote_ident(cols['is_over'])} AS is_over
FROM {tq}
WHERE {_quote_ident(cols['emp'])} = %s
AND {_quote_ident(cols['date'])} = %s
""",
(emp_id, day.isoformat()),
)
return sum_day_totals(cur.fetchall())
def _fetch_project_day_rows(
cur, db: str, emp_id: int, project_id: int, day: date
) -> list[dict]:
cols = _time_cols(cur, db)
tq = _quote_ident(cols["table"])
cur.execute(
f"""
SELECT {_quote_ident(cols['duration'])} AS cc,
{_quote_ident(cols['is_over'])} AS is_over
FROM {tq}
WHERE {_quote_ident(cols['emp'])} = %s
AND {_quote_ident(cols['project'])} = %s
AND {_quote_ident(cols['date'])} = %s
""",
(emp_id, project_id, day.isoformat()),
)
return cur.fetchall()
def _upsert_time_entry(
cur,
db: str,
emp_id: int,
project_id: int,
day: date,
duration: float,
is_over: int,
) -> int | None:
cols = _time_cols(cur, db)
tq = _quote_ident(cols["table"])
if duration <= 0:
cur.execute(
f"""
DELETE FROM {tq}
WHERE {_quote_ident(cols['emp'])} = %s
AND {_quote_ident(cols['project'])} = %s
AND {_quote_ident(cols['date'])} = %s
AND {_quote_ident(cols['is_over'])} = %s
""",
(emp_id, project_id, day.isoformat(), is_over),
)
return None
portal_col = cols.get("portal")
account_col = cols.get("account")
ins_cols = [cols["emp"], cols["project"], cols["date"], cols["duration"], cols["is_over"]]
vals = [emp_id, project_id, day.isoformat(), duration, is_over]
if portal_col:
ins_cols.append(portal_col)
vals.append(0)
if account_col:
ins_cols.append(account_col)
vals.append(0)
placeholders = ", ".join(["%s"] * len(vals))
col_sql = ", ".join(_quote_ident(c) for c in ins_cols)
cur.execute(
f"""
INSERT INTO {tq} ({col_sql}) VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {_quote_ident(cols['duration'])} = VALUES({_quote_ident(cols['duration'])})
""",
tuple(vals),
)
cur.execute(
f"""
SELECT {_quote_ident(cols['id'])} AS id FROM {tq}
WHERE {_quote_ident(cols['emp'])} = %s
AND {_quote_ident(cols['project'])} = %s
AND {_quote_ident(cols['date'])} = %s
AND {_quote_ident(cols['is_over'])} = %s
LIMIT 1
""",
(emp_id, project_id, day.isoformat(), is_over),
)
row = cur.fetchone()
return int(row["id"]) if row and row.get("id") else None
def _absence_cols(cur, db: str) -> dict[str, str]:
table = _resolve_table(cur, db, TIME_ABSENCE_TABLE)
cols = _table_columns(cur, db, table)
lut = _column_lookup(cols)
out = {"table": table}
for f in ("id", "emp", "date", "absence", "portal", "account"):
c = _prefixed_col(lut, TIME_ABSENCE_TABLE, f)
if c:
out[f] = c
return out
@router.put("/api/time-entries")
def put_time_entries(
body: TimeEntryWrite,
_auth: Annotated[None, Depends(require_api_key)],
acting_emp_id: Annotated[int, Depends(parse_acting_emp_id)],
) -> dict[str, Any]:
target_emp_id = resolve_target_emp_id(acting_emp_id, body.emp_id)
day = _parse_iso_date(body.date)
is_over = int(body.over)
try:
with get_conn() as c:
with c.cursor() as cur:
from app.main import _db_name
db = _db_name(cur)
ensure_emp_exists(cur, db, acting_emp_id, kind="acting")
ensure_emp_exists(cur, db, target_emp_id, kind="target")
if not can_write_time_entry(
cur, db, acting_emp_id, target_emp_id, body.project_id
):
raise HTTPException(
status_code=403,
detail={
"code": "forbidden",
"message": "Нет прав на запись часов",
},
)
time_val = normalize_time_value(float(body.time))
total_work, total_over = _fetch_day_sums(cur, db, target_emp_id, day)
proj_rows = _fetch_project_day_rows(
cur, db, target_emp_id, body.project_id, day
)
info = project_day_info(proj_rows)
max_work = get_work_hours_by_date(cur, db, day)
max_over = 24.0 - max_work
time_val, limits = apply_daily_limits(
time_val,
bool(is_over),
total_work,
total_over,
info["hours"],
info["over"],
max_work,
max_over,
)
row_id = _upsert_time_entry(
cur,
db,
target_emp_id,
body.project_id,
day,
time_val,
is_over,
)
cache_deleted = invalidate_time_cache(
cur, db, day, target_emp_id, body.project_id
)
info = project_day_info(
_fetch_project_day_rows(
cur, db, target_emp_id, body.project_id, day
)
)
c.commit()
return {
"ok": True,
"acting_emp_id": acting_emp_id,
"target_emp_id": target_emp_id,
"id": row_id,
"emp_id": target_emp_id,
"project_id": body.project_id,
"date": day.isoformat(),
"duration": time_val,
"is_over": is_over,
"info": info,
"limits": limits,
"cache_rows_deleted": cache_deleted,
}
except HTTPException:
raise
except pymysql.Error as e:
raise HTTPException(status_code=500, detail={"code": "db_error", "message": str(e)}) from e
@router.put("/api/absences")
def put_absences(
body: AbsenceWrite,
_auth: Annotated[None, Depends(require_api_key)],
acting_emp_id: Annotated[int, Depends(parse_acting_emp_id)],
) -> dict[str, Any]:
target_emp_id = resolve_target_emp_id(acting_emp_id, body.emp_id)
results: list[dict[str, Any]] = []
try:
with get_conn() as c:
with c.cursor() as cur:
from app.main import _db_name
db = _db_name(cur)
ensure_emp_exists(cur, db, acting_emp_id, kind="acting")
ensure_emp_exists(cur, db, target_emp_id, kind="target")
if not can_write_absence(cur, db, acting_emp_id, target_emp_id):
raise HTTPException(
status_code=403,
detail={"code": "forbidden", "message": "Нет прав"},
)
acols = _absence_cols(cur, db)
tcols = _time_cols(cur, db)
aq = _quote_ident(acols["table"])
tq = _quote_ident(tcols["table"])
absence_type = int(body.type)
for ds in body.dates:
day = _parse_iso_date(ds, "dates")
if absence_type == 0:
cur.execute(
f"""
DELETE FROM {aq}
WHERE {_quote_ident(acols['emp'])} = %s
AND {_quote_ident(acols['date'])} = %s
""",
(target_emp_id, day.isoformat()),
)
else:
ins = [acols["emp"], acols["date"], acols["absence"]]
vals: list[Any] = [target_emp_id, day.isoformat(), absence_type]
if acols.get("portal"):
ins.append(acols["portal"])
vals.append(0)
if acols.get("account"):
ins.append(acols["account"])
vals.append(0)
col_sql = ", ".join(_quote_ident(x) for x in ins)
ph = ", ".join(["%s"] * len(vals))
cur.execute(
f"""
INSERT INTO {aq} ({col_sql}) VALUES ({ph})
ON DUPLICATE KEY UPDATE
{_quote_ident(acols['absence'])} = VALUES({_quote_ident(acols['absence'])})
""",
tuple(vals),
)
cur.execute(
f"""
DELETE FROM {tq}
WHERE {_quote_ident(tcols['emp'])} = %s
AND {_quote_ident(tcols['date'])} = %s
AND {_quote_ident(tcols['is_over'])} = 0
""",
(target_emp_id, day.isoformat()),
)
deleted = invalidate_time_cache(cur, db, day, target_emp_id, None)
results.append(
{
"date": day.isoformat(),
"absence_type_id": absence_type,
"cache_rows_deleted": deleted,
}
)
c.commit()
return {
"ok": True,
"acting_emp_id": acting_emp_id,
"target_emp_id": target_emp_id,
"emp_id": target_emp_id,
"results": results,
}
except HTTPException:
raise
except pymysql.Error as e:
raise HTTPException(status_code=500, detail={"code": "db_error", "message": str(e)}) from e
@router.put("/api/absences/range")
def put_absences_range(
body: AbsenceRangeWrite,
_auth: Annotated[None, Depends(require_api_key)],
acting_emp_id: Annotated[int, Depends(parse_acting_emp_id)],
) -> dict[str, Any]:
begin = _parse_iso_date(body.begin, "begin")
end = _parse_iso_date(body.end, "end")
if end < begin:
raise HTTPException(
status_code=400,
detail={
"code": "date_range_invalid",
"message": "Дата окончания должна быть больше, чем дата начала",
},
)
from app.labor_day import iter_dates
dates = list(iter_dates(begin, end))
try:
with get_conn() as c:
with c.cursor() as cur:
from app.main import _db_name
db = _db_name(cur)
ensure_emp_exists(cur, db, acting_emp_id, kind="acting")
ensure_emp_exists(cur, db, body.emp_id, kind="target")
if not can_write_absence(cur, db, acting_emp_id, body.emp_id):
raise HTTPException(
status_code=403,
detail={"code": "forbidden", "message": "Нет прав"},
)
acols = _absence_cols(cur, db)
tcols = _time_cols(cur, db)
aq = _quote_ident(acols["table"])
tq = _quote_ident(tcols["table"])
iso_dates = [d.isoformat() for d in dates]
ph = ", ".join(["%s"] * len(iso_dates))
cur.execute(
f"""
DELETE FROM {aq}
WHERE {_quote_ident(acols['emp'])} = %s
AND {_quote_ident(acols['date'])} IN ({ph})
""",
(body.emp_id, *iso_dates),
)
cur.execute(
f"""
DELETE FROM {tq}
WHERE {_quote_ident(tcols['emp'])} = %s
AND {_quote_ident(tcols['date'])} IN ({ph})
AND {_quote_ident(tcols['is_over'])} = 0
""",
(body.emp_id, *iso_dates),
)
if body.absence_id:
for d in dates:
ins = [acols["emp"], acols["date"], acols["absence"]]
vals: list[Any] = [body.emp_id, d.isoformat(), body.absence_id]
if acols.get("portal"):
ins.append(acols["portal"])
vals.append(0)
if acols.get("account"):
ins.append(acols["account"])
vals.append(0)
col_sql = ", ".join(_quote_ident(x) for x in ins)
ph2 = ", ".join(["%s"] * len(vals))
cur.execute(
f"INSERT INTO {aq} ({col_sql}) VALUES ({ph2})",
tuple(vals),
)
for d in dates:
invalidate_time_cache(cur, db, d, body.emp_id, None)
c.commit()
return {
"ok": True,
"acting_emp_id": acting_emp_id,
"target_emp_id": body.emp_id,
"e": 0,
"m": "Успешно сохранено",
"dates_count": len(dates),
}
except HTTPException:
raise
except pymysql.Error as e:
raise HTTPException(status_code=500, detail={"code": "db_error", "message": str(e)}) from e