Coverage for backend \ app \ Pedido \ repositories \ pedidoRepository.py: 14.40%
125 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-29 16:13 -0500
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-29 16:13 -0500
1from app.Pedido.models.pedidoModel import Pedido
2from app.Pedido.models.detallePedidoModel import DetallePedido
3from app.Productos.models.productoModel import Producto
4from app.Inventario.models.inventarioModel import Inventario
5from sqlalchemy.orm import joinedload, selectinload
6from typing import List
7from datetime import datetime, timezone, timedelta
9class PedidoRepository:
10 def __init__(self, dbSession):
11 self.dbSession = dbSession
13 def listarPedidos(self) -> List[Pedido]:
14 # Use selectinload for 1-N relationships to avoid JOIN + LIMIT issues
15 return (self.dbSession.query(Pedido)
16 .options(
17 joinedload(Pedido.usuarioCreador),
18 joinedload(Pedido.usuarioAprobador),
19 selectinload(Pedido.detalles).selectinload(DetallePedido.producto),
20 selectinload(Pedido.detalles).selectinload(DetallePedido.usuarioReceptor)
21 )
22 .all())
24 def obtenerPorId(self, idPedido: int):
25 # Evitar usar joinedload en colecciones si luego se aplica limit/first; usamos selectinload para 1-N
26 return (self.dbSession.query(Pedido)
27 .options(
28 joinedload(Pedido.usuarioCreador),
29 joinedload(Pedido.usuarioAprobador),
30 selectinload(Pedido.detalles).selectinload(DetallePedido.producto),
31 selectinload(Pedido.detalles).selectinload(DetallePedido.usuarioReceptor)
32 )
33 .filter(Pedido.idPedido == idPedido).first())
35 def listarPedidosPendientes(self) -> List[Pedido]:
36 return self.dbSession.query(Pedido).filter(Pedido.estadoPedido == "PENDIENTE_REVISION").all()
38 def crearPedido(self, pedidoCrear, idUsuarioCreador: int, rolUsuario: str):
39 # Validar productos y calcular total
40 total = 0.0
41 detalles_objs = []
42 missing = []
43 for d in pedidoCrear.detalles:
44 producto = self.dbSession.query(Producto).filter(Producto.idProducto == d.idProducto).first()
45 if not producto:
46 missing.append({"idProducto": d.idProducto, "error": "producto_no_encontrado"})
47 continue
48 # No permitir pedidos de productos inactivos
49 if not producto.activoProducto:
50 missing.append({"idProducto": d.idProducto, "error": "producto_inactivo"})
51 continue
52 # El precio se toma del producto registrado; el payload solo provee id y cantidad
53 precio = producto.precioUnitarioCompra
54 total += precio * d.cantidadSolicitada
55 detalle = DetallePedido(
56 idProducto=d.idProducto,
57 cantidadSolicitada=d.cantidadSolicitada,
58 precioUnitarioCompra=precio,
59 # El detalle inicia siempre en PENDIENTE_REVISION; la aprobación de detalles ocurre en la revisión del pedido
60 estadoDetalle="PENDIENTE_REVISION"
61 )
62 detalles_objs.append(detalle)
63 if missing:
64 return {"error": missing}
65 # Si el creador es Administrador, el pedido se aprueba automáticamente y el mismo usuario es el aprobador
66 estado_inicial = "PENDIENTE_REVISION"
67 idUsuarioAprobador = None
68 if rolUsuario == "Administrador":
69 estado_inicial = "APROBADO"
70 idUsuarioAprobador = idUsuarioCreador
71 # Si se aprueba automáticamente, actualizar los detalles a PENDIENTE_RECEPCION
72 for d in detalles_objs:
73 if d.estadoDetalle == "PENDIENTE_REVISION":
74 d.estadoDetalle = "PENDIENTE_RECEPCION"
75 quitoTZ = timezone(timedelta(hours=-5))
76 nuevo = Pedido(
77 idUsuarioCreador=idUsuarioCreador,
78 idUsuarioAprobador=idUsuarioAprobador,
79 totalCostoPedido=total,
80 fechaCreacion=datetime.now(quitoTZ),
81 estadoPedido=estado_inicial,
82 observaciones=None
83 )
84 self.dbSession.add(nuevo)
85 self.dbSession.flush()
86 for det in detalles_objs:
87 det.idPedido = nuevo.idPedido
88 self.dbSession.add(det)
89 self.dbSession.commit()
90 self.dbSession.refresh(nuevo)
91 return nuevo
93 def revisarPedido(self, idPedido: int, estadoNuevo: str, observaciones: str, idUsuarioAprobador: int):
94 pedido = self.dbSession.query(Pedido).filter(Pedido.idPedido == idPedido).first()
95 if not pedido:
96 return None
97 # No permitir cambios si el pedido ya fue revisado (no está en pendiente)
98 if pedido.estadoPedido != "PENDIENTE_REVISION":
99 return {"error": "pedido_no_modificable"}
100 pedido.estadoPedido = estadoNuevo
101 pedido.observaciones = observaciones
102 pedido.idUsuarioAprobador = idUsuarioAprobador
103 # Si se aprueba, pasar detalles a PENDIENTE_RECEPCION
104 if estadoNuevo == "APROBADO":
105 for d in pedido.detalles:
106 if d.estadoDetalle == "PENDIENTE_REVISION":
107 d.estadoDetalle = "PENDIENTE_RECEPCION"
108 # Si se rechaza, marcar detalles como NO_APROBADO
109 if estadoNuevo == "RECHAZADO":
110 for d in pedido.detalles:
111 d.estadoDetalle = "NO_APROBADO"
112 self.dbSession.commit()
113 self.dbSession.refresh(pedido)
114 return pedido
116 def listarDetallesPorPedido(self, idPedido: int):
117 return self.dbSession.query(DetallePedido).options(joinedload(DetallePedido.producto)).filter(DetallePedido.idPedido == idPedido).all()
119 def obtenerDetallePorId(self, idDetalle: int):
120 return self.dbSession.query(DetallePedido).options(joinedload(DetallePedido.producto)).filter(DetallePedido.idDetallePedido == idDetalle).first()
122 def registrarRecepcionDetalle(self, idDetalle: int, confirmar: bool, idUsuarioReceptor: int):
123 # Solo procede si confirmar == True
124 detalle = self.dbSession.query(DetallePedido).filter(DetallePedido.idDetallePedido == idDetalle).first()
125 if not detalle:
126 return None
127 # No permitir recepción si el pedido no está aprobado (sigue pendiente)
128 pedido = self.dbSession.query(Pedido).filter(Pedido.idPedido == detalle.idPedido).first()
129 if pedido is None:
130 return None
131 if pedido.estadoPedido == "PENDIENTE_REVISION":
132 return {"error": "pedido_no_aprobado"}
133 if not confirmar:
134 # No permitimos confirmaciones falsas
135 return {"error": "confirmacion_requerida"}
136 # Si ya está recibido, no permitimos doble recepción
137 if detalle.estadoDetalle == "RECIBIDO":
138 return {"error": "ya_recibido"}
139 # Registrar quién recibió y cuándo
140 detalle.idUsuarioReceptor = idUsuarioReceptor
141 quitoTZ = timezone(timedelta(hours=-5))
142 detalle.fechaRecepcion = datetime.now(quitoTZ)
143 # Marcar el detalle como recibido (se asume recepción completa del detalle)
144 detalle.estadoDetalle = "RECIBIDO"
145 # Actualizar inventario con la cantidad solicitada (recepción completa)
146 inventario = self.dbSession.query(Inventario).filter(Inventario.idProducto == detalle.idProducto).first()
147 if inventario:
148 inventario.cantidadDisponible = (inventario.cantidadDisponible or 0) + detalle.cantidadSolicitada
149 else:
150 # Crear registro de inventario si no existe
151 nuevo_inv = Inventario(
152 idProducto=detalle.idProducto,
153 cantidadDisponible=detalle.cantidadSolicitada,
154 cantidadMinima=0,
155 activoInventario=True
156 )
157 self.dbSession.add(nuevo_inv)
158 self.dbSession.commit()
159 self.dbSession.refresh(detalle)
160 # Actualizar estado del pedido según estados de sus detalles
161 detalles = self.dbSession.query(DetallePedido).filter(DetallePedido.idPedido == pedido.idPedido).all()
162 todos_recibidos = all(d.estadoDetalle == "RECIBIDO" for d in detalles)
163 alguno_recibido = any(d.estadoDetalle == "RECIBIDO" for d in detalles)
164 if todos_recibidos:
165 pedido.estadoPedido = "COMPLETAMENTE_RECIBIDO"
166 elif alguno_recibido:
167 pedido.estadoPedido = "PARCIALMENTE_RECIBIDO"
168 self.dbSession.commit()
169 self.dbSession.refresh(pedido)
170 return detalle
172 def crearPedidosIniciales(self):
173 """Crear pedidos de ejemplo para facilitar pruebas automáticas/manuales.
174 - Un pedido creado por el Admin (idUsuario=1) que debe quedar APROBADO automáticamente.
175 - Un pedido creado por el Bodeguero (idUsuario=2) que debe quedar PENDIENTE_REVISION.
176 Usa como productos de prueba 'Cola 2 Litros' y 'Jugo de Naranja 1L' si existen.
177 """
178 # No duplicar si ya hay pedidos
179 existe = self.dbSession.query(Pedido).first()
180 if existe:
181 return
182 # Buscar productos de prueba
183 producto1 = self.dbSession.query(Producto).filter(Producto.nombreProducto == "Cola 2 Litros").first()
184 producto2 = self.dbSession.query(Producto).filter(Producto.nombreProducto == "Jugo de Naranja 1L").first()
185 if not producto1 or not producto2:
186 # No crear si faltan productos base
187 return
188 # Construir schemas de detalle/pedido
189 try:
190 from app.Pedido.schemas.pedidoSchemas import PedidoCrearSchema, DetallePedidoCrearSchema
191 admin_pedido = PedidoCrearSchema(detalles=[DetallePedidoCrearSchema(idProducto=producto1.idProducto, cantidadSolicitada=5), DetallePedidoCrearSchema(idProducto=producto2.idProducto, cantidadSolicitada=3)])
192 bodeguero_pedido = PedidoCrearSchema(detalles=[DetallePedidoCrearSchema(idProducto=producto1.idProducto, cantidadSolicitada=2)])
193 # Crear usando la lógica existente (admin será aprobado automáticamente)
194 self.crearPedido(admin_pedido, 1, "Administrador")
195 self.crearPedido(bodeguero_pedido, 2, "Bodeguero")
196 except Exception:
197 # Si por alguna razón no se pueden crear los pedidos, no romper el startup
198 return