Coverage for backend \ app \ Reportes \ repositories \ reporteRepository.py: 27.87%
61 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-29 16:13 -0500
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-29 16:13 -0500
1from app.Inventario.repositories.inventarioRepository import InventarioRepository
2from app.Productos.models.productoModel import Producto
3from app.Productos.models.categoriaProductoModel import CategoriaProducto
4from app.Venta.models.detalleVentaModel import DetalleVenta
5from app.Venta.models.ventaModel import Venta
6from app.Caja.models.cajaHistorialModel import CajaHistorial
7from app.Clientes.modells.clienteModel import Cliente
8from sqlalchemy import func
9from datetime import datetime, timedelta
10from sqlalchemy.orm import joinedload
12class ReporteRepository:
13 def __init__(self, dbSession):
14 self.dbSession = dbSession
16 def reporte_inventario(self, idProducto=None, idCategoria=None, nombreProducto=None):
17 """Filtros opcionales: se pueden combinar. Se aplican de forma conjunta (AND) si se envían varios."""
18 invRepo = InventarioRepository(self.dbSession)
19 inventarios = invRepo.listarInventarios()
20 resultados = []
21 for inv in inventarios:
22 p = getattr(inv, 'producto', None)
23 if idProducto is not None and inv.idProducto != idProducto:
24 continue
25 if idCategoria is not None and not (p and p.idCategoriaProducto == idCategoria):
26 continue
27 if nombreProducto and not (p and nombreProducto.lower() in (p.nombreProducto or '').lower()):
28 continue
29 resultados.append(inv)
30 return resultados
32 def reporte_ventas_por_producto_categoria(self, fechaInicio, fechaFin, idProducto=None, idCategoria=None):
33 # construir rango de fechas (fechas ya validadas por el servicio)
34 q = self.dbSession.query(
35 DetalleVenta.idProducto,
36 Producto.nombreProducto,
37 Producto.idCategoriaProducto,
38 CategoriaProducto.nombreCategoria,
39 func.coalesce(func.sum(DetalleVenta.cantidadVendida),0).label('cantidadVendida'),
40 func.coalesce(func.sum(DetalleVenta.subtotalProducto - DetalleVenta.valorDescuentoProducto),0).label('ingresos')
41 ).join(Producto, Producto.idProducto == DetalleVenta.idProducto)
42 q = q.join(CategoriaProducto, CategoriaProducto.idCategoriaProducto == Producto.idCategoriaProducto)
43 q = q.join(Venta, Venta.idVenta == DetalleVenta.idVenta)
44 inicio = datetime.combine(fechaInicio, datetime.min.time())
45 fin = datetime.combine(fechaFin, datetime.max.time())
46 q = q.filter(Venta.fechaVenta >= inicio, Venta.fechaVenta <= fin)
47 if idProducto is not None:
48 q = q.filter(DetalleVenta.idProducto == idProducto)
49 if idCategoria is not None:
50 q = q.filter(Producto.idCategoriaProducto == idCategoria)
51 q = q.group_by(DetalleVenta.idProducto, Producto.nombreProducto, Producto.idCategoriaProducto, CategoriaProducto.nombreCategoria)
52 return q.all()
54 def resumen_caja_diaria(self, fecha: datetime.date, idUsuarioCaja: int):
55 from datetime import datetime
56 tz = datetime.now().astimezone().tzinfo
57 inicio = datetime.combine(fecha, datetime.min.time()).astimezone(tz)
58 fin = datetime.combine(fecha, datetime.max.time()).replace(hour=23, minute=59, second=59, microsecond=0).astimezone(tz)
59 # Buscar la(s) caja(s) del cajero en la fecha
60 query = self.dbSession.query(CajaHistorial).filter(CajaHistorial.fechaAperturaCaja >= inicio, CajaHistorial.fechaAperturaCaja <= fin, CajaHistorial.idUsuarioCaja == idUsuarioCaja)
61 cajas = query.options(joinedload(CajaHistorial.usuario)).all()
62 # Para cada caja, traer las ventas y sus detalles
63 for c in cajas:
64 ventas = self.dbSession.query(Venta).options(joinedload(Venta.detalles)).filter(Venta.idCaja == c.idCaja).all()
65 c.ventas = ventas
66 return cajas
68 def clientes_frecuentes(self, dias=30, minVentas=3, minGasto=100.0):
69 desde = datetime.now() - timedelta(days=dias)
70 # sumar ventas por cliente
71 q = self.dbSession.query(
72 Venta.idCliente,
73 func.count(Venta.idVenta).label('ventasCount'),
74 func.coalesce(func.sum(Venta.totalPagar),0).label('totalGastado')
75 ).filter(Venta.fechaVenta >= desde).group_by(Venta.idCliente).having(func.count(Venta.idVenta) >= minVentas, func.sum(Venta.totalPagar) >= minGasto)
76 clientes = q.all()
77 # obtener historial completo para cada cliente
78 resultados = []
79 for c in clientes:
80 cliente_entity = self.dbSession.query(Cliente).filter(Cliente.idCliente == c.idCliente).first()
81 ventas = self.dbSession.query(Venta).options(joinedload(Venta.detalles)).filter(Venta.idCliente == c.idCliente, Venta.fechaVenta >= desde).all()
82 resultados.append({'cliente': cliente_entity, 'ventasCount': c.ventasCount, 'totalGastado': float(c.totalGastado or 0.0), 'historial': ventas})
83 return resultados