Coverage for backend \ app \ Usuarios \ repositories \ usuarioRepository.py: 25.00%
52 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-29 16:13 -0500
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-29 16:13 -0500
1from app.Usuarios.models.usuarioModel import Usuario
2from app.Usuarios.schemas.usuarioSchemas import *
3from sqlalchemy.orm import joinedload
4class UsuarioRepository:
5 def __init__(self, dbSession):
6 self.dbSession = dbSession
8 def validarCredenciales(self, cedula, password):
9 return self.dbSession.query(Usuario).filter(Usuario.cedulaUsuario == cedula, Usuario.passwordUsuario == password).first()
11 def listarUsuarios(self):
12 return self.dbSession.query(Usuario).options(joinedload(Usuario.rol)).all()
14 def obtenerUsuarioPorId(self, idUsuario: int):
15 return (self.dbSession.query(Usuario)
16 .options(joinedload(Usuario.rol)).filter(Usuario.idUsuario == idUsuario).first())
18 def validarCedulaExistente(self, cedula):#Verificar si la cedula ya esta registrada
19 return self.dbSession.query(Usuario).filter(Usuario.cedulaUsuario == cedula).first()
21 def crearUsuario(self, usuario: UsuarioCrearSchema):
22 usuario = Usuario(
23 idRol=usuario.idRol,
24 nombreCompleto=usuario.nombreCompleto,
25 cedulaUsuario=usuario.cedulaUsuario,
26 emailUsuario=usuario.emailUsuario,
27 passwordUsuario=usuario.passwordUsuario,
28 activoUsuario=True
29 )
31 if self.validarCedulaExistente(usuario.cedulaUsuario):
32 return None
33 self.dbSession.add(usuario)
34 self.dbSession.commit()
35 self.dbSession.refresh(usuario)
36 return usuario
38 def modificarUsuario(self, idUsuario: int, usuarioActualizar: UsuarioActualizarSchema):
39 usuario = self.obtenerUsuarioPorId(idUsuario)
40 if not usuario:
41 return None
43 usuarioData=usuarioActualizar.model_dump(exclude_unset=True)
44 camposValidos = ["idRol", "nombreCompleto", "emailUsuario", "passwordUsuario", "activoUsuario"]
46 for campo, valor in usuarioData.items():
47 if campo in camposValidos:
48 setattr(usuario, campo, valor)
50 self.dbSession.commit()
51 self.dbSession.refresh(usuario)
52 return usuario
54 def deshabilitarUsuario(self, idUsuario: int):
55 usuario = self.obtenerUsuarioPorId(idUsuario)
56 if not usuario:
57 return None
58 usuario.activoUsuario = False
59 self.dbSession.commit()
60 self.dbSession.refresh(usuario)
61 return usuario
63 def crearUsuariosIniciales(self): #Crea datos para pruebas iniciales
64 usuariosIniciales = [
65 {"nombreCompleto": "admin", "cedulaUsuario": "admin", "emailUsuario": "admin@example.com", "passwordUsuario": "1234", "idRol": 1},
66 {"nombreCompleto": "bodeguero", "cedulaUsuario": "bodeguero", "emailUsuario": "bodeguero@example.com", "passwordUsuario": "1234", "idRol": 2},
67 {"nombreCompleto": "cajero", "cedulaUsuario": "cajero", "emailUsuario": "cajero@example.com", "passwordUsuario": "1234", "idRol": 3},
68 {"nombreCompleto": "Damian Barahona", "cedulaUsuario": "1750834515", "emailUsuario": "damian@example.com", "passwordUsuario": "1234", "idRol": 1},
69 {"nombreCompleto": "Kenin Cayambe", "cedulaUsuario": "1750834516", "emailUsuario": "kenin@example.com", "passwordUsuario": "1234", "idRol": 2},
70 {"nombreCompleto": "Sthalin Chasipanta", "cedulaUsuario": "1750834517", "emailUsuario": "sthali@example.com", "passwordUsuario": "1234", "idRol": 2},
71 {"nombreCompleto": "Cristian Licto", "cedulaUsuario": "1750834518", "emailUsuario": "cristian@example.com", "passwordUsuario": "1234", "idRol": 3},
72 {"nombreCompleto": "Juan Tandazo", "cedulaUsuario": "1750834519", "emailUsuario": "juan@example.com", "passwordUsuario": "1234", "idRol": 3},
74 ]
76 for usuario in usuariosIniciales:
77 existe=self.validarCedulaExistente(usuario["cedulaUsuario"])
78 if not existe:
79 usuario = Usuario(**usuario)
80 self.dbSession.add(usuario)
81 print("Usuarios por defecto creados...!")
82 self.dbSession.commit()
83 self.dbSession.close()