FastAPI boilerplate
+FastAPI boilerplate
+## Setup
+1. Create a virtual environment.
+ ```sh
+    python3 -m venv .venv
+ ```
+2. Activate virtual environment.
+    source /path/to/venv/bin/activate`
+3. Install project dependencies `pip install -r requirements.txt`
+4. Create a .env file by copying the .env.sample file
+`cp .env.sample .env`
+5. Start server.
+ ```sh
+ python
+Generic single-database configuration.
+import os
+from alembic import context
+from decouple import config
+from logging.config import fileConfig
+from sqlalchemy import engine_from_config
+from sqlalchemy import pool
+from api.db.database import Base
+from api.v1.models.auth import User, BlackListToken
+#from db.database import DATABASE_URL 
+# this is the Alembic Config object, which provides
+# access to the values within the .ini file in use.
+config = context.config
+# Interpret the config file for Python logging.
+# This line sets up loggers basically.
+if config.config_file_name is not None:
+    fileConfig(config.config_file_name)
+# add your model's MetaData object here
+# for 'autogenerate' support
+# from myapp import mymodel
+# target_metadata = mymodel.Base.metadata
+target_metadata = Base.metadata
+# other values from the config, defined by the needs of,
+# can be acquired:
+# my_important_option = config.get_main_option("my_important_option")
+# ... etc.
+def run_migrations_offline() -> None:
+    """Run migrations in 'offline' mode.
+    This configures the context with just a URL
+    and not an Engine, though an Engine is acceptable
+    here as well.  By skipping the Engine creation
+    we don't even need a DBAPI to be available.
+    Calls to context.execute() here emit the given string to the
+    script output.
+    """
+    url = config.get_main_option("sqlalchemy.url", DATABASE_URL)
+    context.configure(
+        url=url,
+        target_metadata=target_metadata,
+        literal_binds=True,
+        dialect_opts={"paramstyle": "named"},
+    )
+    with context.begin_transaction():
+        context.run_migrations()
+def run_migrations_online() -> None:
+    """Run migrations in 'online' mode.
+    In this scenario we need to create an Engine
+    and associate a connection with the context.
+    """
+    alembic_config = config.get_section(config.config_ini_section)
+    alembic_config['sqlalchemy.url'] = DATABASE_URL
+    connectable = engine_from_config(
+        alembic_config,
+        prefix="sqlalchemy.",
+        poolclass=pool.NullPool,
+    )
+    with connectable.connect() as connection:
+        context.configure(
+            connection=connection, target_metadata=target_metadata
+        )
+        with context.begin_transaction():
+            context.run_migrations()
+if context.is_offline_mode():
+    run_migrations_offline()
+    run_migrations_online()
+Revision ID: ${up_revision}
+Revises: ${down_revision | comma,n}
+Create Date: ${create_date}
+from typing import Sequence, Union
+from alembic import op
+import sqlalchemy as sa
+${imports if imports else ""}
+# revision identifiers, used by Alembic.
+revision: str = ${repr(up_revision)}
+down_revision: Union[str, None] = ${repr(down_revision)}
+branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
+depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
+def upgrade() -> None:
+    ${upgrades if upgrades else "pass"}
+def downgrade() -> None:
+    ${downgrades if downgrades else "pass"}
+from abc import ABC, abstractmethod
+class Service(ABC):
+    @abstractmethod
+    def create(self):
+        pass
+    @abstractmethod
+    def fetch(self):
+        pass
+    @abstractmethod
+    def fetch_all(self):
+        pass
+    @abstractmethod
+    def update(self):
+        pass
+    @abstractmethod
+    def delete(self):
+        pass
+from typing import Union
+from import OAuth2PasswordBearer
+from fastapi import Depends, Cookie, HTTPException, status
+from sqlalchemy.orm import Session
+from sqlalchemy.sql import and_
+from jose import JWTError, jwt
+from api.v1.schemas import auth as user_schema
+from import User
+from api.v1.models.auth import User as UserModel
+from api.db.database import get_db
+from api.core import responses
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
+def is_authenticated(
+    access_token: Union[str, user_schema.Token] = Depends(oauth2_scheme),
+    db: Session = Depends(get_db),
+) -> Union[user_schema.User, JWTError]:
+    if not access_token:
+        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=responses.INVALID_CREDENTIALS)
+    userService = User()
+    access_token_info = userService.verify_access_token(access_token, db)
+    if type(access_token_info) is JWTError:
+        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=responses.INVALID_CREDENTIALS)
+    user = userService.fetch(,db=db)
+    return user
+EMAIL_IN_USE = "This email is already in use."
+NOT_FOUND = "Not found!"
+ID_OR_UNIQUE_ID_REQUIRED = "ID or Unique ID required!"
+INVALID_CREDENTIALS = "Invalid Credentials!"
+COULD_NOT_VALIDATE_CRED = "Could not validate credentials."
+EXPIRED="Token expired."
+from sqlalchemy import create_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import sessionmaker
+from decouple import config
+def get_db_engine():
+    DB_TYPE = config("DB_TYPE")
+    DB_NAME = config("DB_NAME")
+    DB_USER = config("DB_USER")
+    DB_PASSWORD = config("DB_PASSWORD")
+    DB_HOST = config("DB_HOST")
+    DB_PORT = config("DB_PORT")
+    DATABASE_URL = ""
+    if DB_TYPE == "mysql":
+    elif DB_TYPE == "postgresql":
+        DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
+    else:
+        DATABASE_URL = "sqlite:///./database.db"
+    if DB_TYPE == "sqlite":
+        db_engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
+    else:
+        db_engine = create_engine(DATABASE_URL, pool_size=32, max_overflow=64)
+    return db_engine
+db_engine = get_db_engine()
+SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=db_engine)
+Base = declarative_base()
+def create_database():
+    return Base.metadata.create_all(bind=db_engine)
+def get_db():
+    db = SessionLocal()
+    try:
+        yield db
+    finally:
+        db.close()
+from pymongo import MongoClient
+from api.utils import settings
+from pymongo.mongo_client import MongoClient
+from motor.motor_asyncio import AsyncIOMotorClient
+def create_nosql_db():
+    client = MongoClient(settings.MONGO_URI)
+    try:
+        client.admin.command("ping")
+        print("MongoDB Connection Established...")
+    except Exception as e:
+        print(e)
+client = MongoClient(settings.MONGO_URI)
+db = client.get_database(settings.MONGO_DB_NAME)
+def clone_object(obj: dict, unwanted_fields=[]):
+    new_obj = {}
+    for k in list(obj.keys()):
+        if k in unwanted_fields:
+            continue
+        new_obj[k] = obj[k]
+    return new_obj
+from fastapi import HTTPException
+class CustomException(HTTPException):
+    @staticmethod
+    def PermissionError():
+        raise HTTPException(status_code=403, detail="User has no permission to perform this action")
+import resend
+from decouple import config
+resend.api_key = config('RESEND_API_KEY')
+class Mailer(resend.Emails):
+    pass
+from sqlalchemy.orm import Session
+def total_row_count(model, organization_id, db: Session):
+    return db.query(model).filter(model.organization_id == organization_id).filter(
+        model.is_deleted == False).count()
+def off_set(page: int, size: int):
+    return (page-1)*size
+def size_validator(size:int):
+    if size < 0 or size > 100:
+        return "page size must be between 0 and 100"
+    return size
+def page_urls(page: int, size: int, count: int, endpoint: str):
+    paging = {}
+    if (size + off_set(page, size)) >= count:
+        paging['next'] = None
+        if page > 1:
+            paging['previous'] = f"{endpoint}?page={page-1}&size={size}"
+        else:
+            paging['previous'] = None
+    else:
+        paging['next'] = f"{endpoint}?page={page+1}&size={size}"
+        if page > 1:
+            paging['previous'] = f"{endpoint}?page={page-1}&size={size}"
+        else:
+            paging['previous'] = None
+    return paging
+def build_paginated_response(
+    page: int, size: int, total: int, pointers: dict, items
+) -> dict:
+    response = {
+        "page": page,
+        "size": size,
+        "total": total,
+        "previous_page": pointers["previous"],
+        "next_page": pointers["next"],
+        "items": items,
+    }
+    return response
+from decouple import config
+MONGO_URI = config("MONGO_URI")
+from fastapi import status
+from fastapi.exceptions import HTTPException
+def is_empty_string(string: str) -> bool:
+    if len(string.strip()) == 0:
+        return True
+    return False
+class EmptyStringException(HTTPException):
+    def __init__(self, detail: str) -> None:
+        super().__init__(
+            status_code=status.HTTP_400_BAD_REQUEST,
+            detail=detail,
+            headers=None,
+        )
+def build_paginated_response(
+    page: int, size: int, total: int, pointers: dict, items
+) -> dict:
+    response = {
+        "page": page,
+        "size": size,
+        "total": total,
+        "previous_page": pointers["previous"],
+        "next_page": pointers["next"],
+        "items": items,
+    }
+    return response
+from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, DateTime, BIGINT, Text
+from sqlalchemy.orm import relationship
+from datetime import datetime, date
+from api.db.database import Base
+import passlib.hash as _hash
+class User(Base):
+    __tablename__ = "users"
+    id = Column(BIGINT, primary_key=True, autoincrement=True, index=True)
+    unique_id = Column(String(255), nullable=True)
+    first_name = Column(String(255), nullable=False)
+    last_name = Column(String(255), nullable=False)
+    email = Column(String(500), unique=True, index=True, nullable=False)
+    password = Column(String(500), nullable=False)
+    is_active = Column(Boolean, default=True)
+    date_created = Column(DateTime,default=datetime.utcnow)
+    last_updated = Column(DateTime,default=datetime.utcnow)
+    is_deleted = Column(Boolean, default=False)
+class BlackListToken(Base):
+    __tablename__ = "blacklist_tokens"
+    id = Column(BIGINT, primary_key=True, autoincrement=True, index=True)
+    created_by = Column(BIGINT, ForeignKey(''), index=True)
+    token = Column(String(255), index=True)
+    date_created = Column(DateTime, default= datetime.utcnow)
+from fastapi import Depends, Cookie, HTTPException, APIRouter, Depends, status, Response, Request, BackgroundTasks
+from import OAuth2PasswordBearer
+from sqlalchemy.orm import Session
+from datetime import timedelta, datetime
+from typing import Union
+from decouple import config
+from api.v1.schemas import auth as user_schema
+from api.db.database import get_db
+from import User
+from api.v1.models.auth import User as UserModel
+from api.core import responses
+from api.core.dependencies.user import is_authenticated
+IS_REFRESH_TOKEN_SECURE = True if config('PYTHON_ENV') == "production" else False
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
+app = APIRouter(tags=["Auth"])
+"/signup", status_code=status.HTTP_201_CREATED)
+async def signup(
+    response: Response,
+    user:user_schema.CreateUser,
+    db:Session = Depends(get_db)
+    """
+    Endpoint to create a user
+    Returns: Created User.
+    """
+    userService = User()
+    created_user = userService.create(user=user, db=db)
+    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
+    access_token = userService.create_access_token(
+        data={"id":}, db=db, expires_delta=access_token_expires
+    )
+    refresh_token = userService.create_refresh_token(data={"id":}, db=db)
+    response.set_cookie(
+            key="refresh_token",
+            value=refresh_token,
+            max_age=JWT_REFRESH_EXPIRY,
+            secure=True,
+            httponly=True,
+            samesite="strict",
+        )
+    return {"message": responses.SUCCESS,
+            "data": user_schema.ShowUser.model_validate(created_user),
+            "access_token": access_token, 
+            "refresh_token": refresh_token,
+            "token_type": "bearer"
+            }
+"/login", status_code=status.HTTP_200_OK)
+async def login_for_access_token(
+    response: Response,
+    data: user_schema.Login,
+    background_task: BackgroundTasks,
+    db: Session = Depends(get_db)
+    """
+    LOGIN
+    Returns: Logged in User and access token.
+    """
+    userService = User()
+    user = userService.authenticate_user(, password=data.password, db=db)
+    if not user:
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail=responses.INVALID_CREDENTIALS,
+            headers={"WWW-Authenticate": "Bearer"},
+        )
+    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
+    access_token = userService.create_access_token(
+        data={"id":}, db=db, expires_delta=access_token_expires
+    )
+    refresh_token = userService.create_refresh_token(data={"id":}, db=db)
+    response.set_cookie(
+            key="refresh_token",
+            value=refresh_token,
+            max_age=JWT_REFRESH_EXPIRY,
+            secure=True,
+            httponly=True,
+            samesite="strict",
+            path="/"
+        )
+    return {
+            "data": user_schema.ShowUser.model_validate(user), 
+            "access_token": access_token, 
+            "refresh_token": refresh_token,
+            "token_type": "bearer"
+            }
+@app.get("/user", status_code=status.HTTP_200_OK)
+async def get_user(
+    user: user_schema.User = Depends(is_authenticated),
+    db: Session = Depends(get_db),
+    """
+        Returns an authenticated user information
+    """
+    print(user)
+    return user
+@app.get("/refresh-access-token", status_code=status.HTTP_200_OK)
+async def refresh_access_token(
+    response: Response,
+    refresh_token: Union[str, None] = Cookie(default=None),
+    db: Session = Depends(get_db),
+    """Refreshes an access_token with the issued refresh_token
+    Parameters
+        ----------
+        refresh_token : str, None
+            The refresh token sent in the cookie by the client (default is None)
+        Raises
+        ------
+        UnauthorizedError
+            If the refresh token is None.
+    """
+    print(refresh_token)
+    credentials_exception =HTTPException(
+        status_code=status.HTTP_401_UNAUTHORIZED,
+        detail="Could not validate credentials",
+        headers={"WWW-Authenticate": "Bearer"},
+    )
+    if refresh_token is None:
+        raise HTTPException(
+            detail="Log in to authenticate user",
+            status_code=status.HTTP_401_UNAUTHORIZED,
+        )
+    valid_refresh_token = User.verify_refresh_token(
+        refresh_token, db
+    )
+    if is None:
+        response.set_cookie(
+            key="refresh_token",
+            value=refresh_token,
+            max_age=JWT_REFRESH_EXPIRY,
+            secure=IS_REFRESH_TOKEN_SECURE,
+            httponly=True,
+            samesite="strict",
+        )
+        print("refresh failed")
+    else:
+        user = (
+            db.query(UserModel)
+            .filter( ==
+            .first()
+        )
+        access_token = User.create_access_token(
+            {"user_id":}, db
+        )
+        response.set_cookie(
+            key="refresh_token",
+            value=refresh_token,
+            max_age=JWT_REFRESH_EXPIRY,
+            secure=IS_REFRESH_TOKEN_SECURE,
+            httponly=True,
+            samesite="strict",
+        )
+    # Access token expires in 15 mins,
+    return {"user": user_schema.ShowUser.model_validate(user), "access_token": access_token, "expires_in": 900}
+"/logout", status_code=status.HTTP_200_OK)
+async def logout_user(
+    request: Request,
+    response: Response,
+    user: user_schema.User = Depends(is_authenticated),
+    db: Session = Depends(get_db),
+    """
+        This endpoint logs out an authenticated user.
+        Returns message: User logged out successfully.
+    """
+    userService = User()
+    access_token = request.headers.get('Authorization')
+    logout = userService.logout(token=access_token, user=user, db=db)
+    response.set_cookie(
+        key="refresh_token",
+        max_age="0",
+        secure=True,
+        httponly=True,
+        samesite="strict",
+    )
+    return {"message": "User logged out successfully."}
+async def delete_user(
+    user_id: int,
+    user: user_schema.User = Depends(is_authenticated),
+    db: Session = Depends(get_db),
+    """
+        This endpoint deletes a user from the db. (Soft delete)
+        Returns message: User deleted successfully.
+    """
+    userService =  User()
+    deleted_user = userService.delete(db=db, id=user_id)
+    return {"message": "User deleted successfully."}
+"/users/roles", status_code=status.HTTP_200_OK)
+async def create_user_roles(
+    user: user_schema.ShowUser = Depends(is_authenticated),
+    db:Session = Depends(get_db)
+    """
+     Endpoint to create custom roles for users mixing permissions.
+     Returns created role
+    """
+    pass
+from uuid import uuid4
+from pydantic import BaseModel, model_validator, EmailStr
+from typing import Optional
+from fastapi import HTTPException, status
+from datetime import datetime
+from api.db.database import SessionLocal
+from api.v1.models.auth import User
+from api.core import responses
+class Login(BaseModel):
+    email: EmailStr
+    password: str
+class Token(BaseModel):
+    access_token: str
+    token_type: str
+class TokenData(BaseModel):
+    id:int
+    email:str
+class UserBase(BaseModel):
+    first_name: str 
+    last_name: str
+    email: str
+    unique_id: Optional[str] = None
+    is_active: bool = True
+    date_created: Optional[datetime] = datetime.utcnow()
+    last_updated: Optional[datetime] = datetime.utcnow()
+    class Config:
+        from_attributes = True
+class CreateUser(UserBase):
+    password: str
+    class Config:
+        from_attributes = True
+    #validate email not in use
+    @model_validator(mode='before')
+    @classmethod
+    def validate_email(cls, values):
+        email = values.get("email")
+        with SessionLocal() as db:
+            user_email = db.query(User).filter( == email).first()
+            if user_email:
+                raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail= responses.EMAIL_IN_USE)
+        return values
+class ShowUser(UserBase):
+    id: int
+    is_deleted: Optional[bool]
+    class Config:
+        from_attributes=True
+import passlib.hash as _hash
+from passlib.context import CryptContext
+from import OAuth2PasswordBearer
+from fastapi import Depends
+from jose import JWTError, jwt
+from sqlalchemy.orm import Session
+from sqlalchemy.sql import or_
+from fastapi import HTTPException, status
+from datetime import datetime,timedelta
+from typing import Annotated, Union
+from uuid import uuid4
+from decouple import config
+from api.v1.models.auth import User as UserModel, BlackListToken
+from api.v1.schemas import auth as user_schema
+from api.core import responses
+from import Service
+pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
+class User(Service):
+    def __init__(self) -> None:
+        pass
+    def create(self, user: user_schema.CreateUser, db: Session):
+        created_user = UserModel(unique_id=user.unique_id,
+                            first_name=user.first_name,
+                            last_name=user.last_name,
+                  ,
+                            date_created=user.date_created,
+                            last_updated=user.last_updated,
+                            is_active=user.is_active,
+                            password=self.hash_password(user.password))
+        db.add(created_user)
+        db.commit()
+        return created_user
+    @staticmethod
+    def fetch(db: Session, id: int = None, unique_id: str = None) -> user_schema.User:
+        if id is None and unique_id is None:
+            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=responses.ID_OR_UNIQUE_ID_REQUIRED)
+        user = db.query(UserModel).filter(
+        if user is None:
+            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=responses.NOT_FOUND)
+        return user
+    @staticmethod
+    def fetch_all():
+        pass
+    @staticmethod
+    def fetch_by_email(email: str, db: Session) -> user_schema.User:
+        user = db.query(UserModel).filter( == email, UserModel.is_deleted==False).first()
+        return user
+    def update(self):
+        pass
+    @classmethod
+    def delete(cls,db: Session, id: int=None, unique_id: str=None) -> user_schema.User:
+        user =  cls.fetch(id=id, unique_id=unique_id, db=db)
+        user.is_deleted = True
+        db.commit()
+        return user
+    @classmethod
+    async def get_current_user(cls, token: Annotated[str, Depends(oauth2_scheme)], db:Session) -> user_schema.User:
+        credentials_exception = HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail=responses.COULD_NOT_VALIDATE_CRED,
+            headers={"WWW-Authenticate": "Bearer"},
+        )
+        try:
+            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
+            id: str = payload.get("id")
+            if id is None:
+                raise credentials_exception
+            token_data = id
+        except JWTError:
+            raise credentials_exception
+        user = cls.fetch(id=token_data,db=db)
+        if user is None:
+            raise credentials_exception
+        return user
+    @classmethod
+    def authenticate_user(cls, db: Session, password: str,email: str) -> user_schema.User:
+        user = cls.fetch_by_email(email=email, db=db)
+        if not user:
+            return False
+        if not cls.verify_password(password, user.password):
+            return False
+        return user
+    @staticmethod
+    def verify_password(password, hashed_password):
+        return pwd_context.verify(password, hashed_password)
+    @staticmethod
+    def hash_password(password) -> str:
+        return pwd_context.hash(password)
+    @staticmethod
+    def create_access_token(data: dict, db: Session, expires_delta: timedelta = None) -> str:
+        to_encode = data.copy()
+        if expires_delta:
+            expire = datetime.utcnow() + expires_delta
+        else:
+            expire = datetime.utcnow() + timedelta(minutes=30)
+        to_encode.update({"exp": expire})
+        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
+        db.commit()
+        return encoded_jwt
+    @staticmethod
+    def create_refresh_token(data: dict, db: Session) -> str:
+        to_encode = data.copy()
+        expire = datetime.utcnow() + timedelta(seconds=int(JWT_REFRESH_EXPIRY))
+        to_encode.update({"exp": expire})
+        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
+        return encoded_jwt
+    @classmethod
+    def verify_access_token(cls, token: str, db: Session) -> user_schema.TokenData:
+        try:  
+            invalid_token = cls.check_token_blacklist(db=db, token=token)
+            if invalid_token == True:
+                raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail=responses.INVALID_CREDENTIALS,
+                                     headers={"WWW-Authenticate": "Bearer"})
+            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
+            id: int = payload.get("id")
+            if id is None:
+                raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail=responses.INVALID_CREDENTIALS,
+                                     headers={"WWW-Authenticate": "Bearer"})
+            user = cls.fetch(db=db,id=id)
+            if user is None:
+                raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=responses.INVALID_CREDENTIALS,
+                                    headers={"WWW-Authenticate": "Bearer"})
+            token_data = user_schema.TokenData(, id=id)
+            return token_data
+        except JWTError as error:
+            print(error, 'error')
+            return JWTError(HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail=responses.INVALID_CREDENTIALS,
+                                     headers={"WWW-Authenticate": "Bearer"}))
+    @classmethod
+    def verify_refresh_token(cls, refresh_token: str, db: Session) ->  user_schema.TokenData:
+        try:
+            if not refresh_token:
+                raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=responses.EXPIRED)
+            payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
+            id: str = payload.get("id")
+            if id is None:
+                raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail=responses.INVALID_CREDENTIALS,
+                                     headers={"WWW-Authenticate": "Bearer"})
+            user = cls.fetch(id=id, db=db)
+            if user is None:
+                raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=responses.INVALID_CREDENTIALS)
+            token_data = user_schema.TokenData(, id=id)
+        except JWTError:
+            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail=responses.INVALID_CREDENTIALS,
+                                     headers={"WWW-Authenticate": "Bearer"})
+        return token_data
+    @staticmethod
+    def check_token_blacklist(token: str, db:Session)-> bool:
+        fetched_token = db.query(BlackListToken).filter(BlackListToken.token == token).first()
+        if fetched_token:
+            return True
+        else:
+            return False
+    @staticmethod
+    def logout(token: str, user: user_schema.ShowUser, db:Session) -> str:
+        blacklist_token = BlackListToken(
+            token=token.split(' ')[1],
+        )
+        db.add(blacklist_token)
+        db.commit()
+        return token
+import uvicorn
+from contextlib import asynccontextmanager
+from typing import Union
+from fastapi import FastAPI
+from fastapi.middleware.cors import CORSMiddleware
+from starlette.requests import Request
+from api.db.database import create_database
+from api.db.mongo import create_nosql_db
+from api.v1.routes.auth import app as auth
+async def lifespan(app: FastAPI):
+    create_database()
+    create_nosql_db()
+    yield
+    ## write shutdown logic below yield
+app = FastAPI(lifespan=lifespan)
+origins = [
+    "http://localhost:3000",
+    "http://localhost:3001",
+    CORSMiddleware,
+    allow_origins=origins,
+    allow_credentials=True,
+    allow_methods=["*"],
+    allow_headers=["*"],
+app.include_router(auth, tags=["Auth"])
+# app.include_router(users, tags=["Users"])
+@app.get("/", tags=["Home"])
+async def get_root(request: Request) -> dict:
+    return {
+        "message": "Welcome to API",
+        "URL": "",
+    }
+if __name__ == "__main__":
+"main:app", port=7001, reload=True)
+from setuptools import setup, find_packages
+setup(name='api', packages=find_packages())
+from fastapi.testclient import TestClient
+import pytest
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.ext.declarative import declarative_base
+from ..main import app
+from decouple import config
+from api.db.database import get_db, Base
+DB_TYPE = config("DB_TYPE")
+DB_NAME = config("DB_NAME")
+DB_USER = config("DB_USER")
+DB_HOST = config("DB_HOST")
+DB_PORT = config("DB_PORT")
+engine = create_engine(SQLALCHEMY_DATABASE_URL)
+TestingSessionLocal = sessionmaker(
+    autocommit=False, autoflush=False, bind=engine)
+def session():
+    Base.metadata.drop_all(bind=engine)
+    Base.metadata.create_all(bind=engine)
+    db = TestingSessionLocal()
+    try:
+        yield db
+    finally:
+        db.close()
+def client(session):
+    def override_get_db():
+        try:
+            yield session
+        finally:
+            session.close()
+    app.dependency_overrides[get_db] = override_get_db
+    yield TestClient(app)
+from fastapi.testclient import TestClient
+import pytest
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.ext.declarative import declarative_base
+from main import app
+from decouple import config
+import json
+from api.db.database import get_db
+from api.db.database import Base
+from import User
+from api.v1.models.auth import User as UserModel
+DB_TYPE = config("DB_TYPE")
+DB_NAME = config("DB_NAME")
+DB_USER = config("DB_USER")
+DB_HOST = config("DB_HOST")
+DB_PORT = config("DB_PORT")
+engine = create_engine(SQLALCHEMY_DATABASE_URL)
+TestingSessionLocal = sessionmaker(
+    autocommit=False, autoflush=False, bind=engine)
+def session():
+    Base.metadata.drop_all(bind=engine)
+    Base.metadata.create_all(bind=engine)
+    db = TestingSessionLocal()
+    try:
+        yield db
+    finally:
+        db.close()
+def client(session):
+    def override_get_db():
+        try:
+            yield session
+        finally:
+            session.close()
+    app.dependency_overrides[get_db] = override_get_db
+    yield TestClient(app)
+def test_user(client):
+    payload = {
+    "first_name": "dean",
+    "last_name": "smith",
+    "email": "",
+    "password": "password123",
+    "unique_id": "1005"
+    res ="/signup/", data=json.dumps(payload))
+    assert res.status_code == 201
+    new_user = res.json()['data']
+    new_user['access_token'] = res.json()['access_token']
+    new_user['email'] = payload['email']
+    return new_user
+import pytest
+from jose import jwt
+from decouple import config
+from api.v1.schemas.auth import User, ShowUser, Token, TokenData
+import json
+payload = {
+    "first_name": "test",
+    "last_name": "user",
+    "email": "",
+    "password": "password123",
+    "unique_id": "1005"
+def test_create_user(client):
+    res =
+        "/signup/", data=(json.dumps(payload)))
+    new_user = ShowUser(**res.json()['data'])
+    assert['email']
+    assert res.status_code == 201
+def test_login_user(test_user, client):
+    res =
+        "/login", data=json.dumps({"email": test_user['email'], "password": "password123"}))
+    logged_in_user = ShowUser(**res.json()['data'])
+    payload = jwt.decode(res.json()['access_token'],
+                         SECRET_KEY, algorithms=[ALGORITHM])
+    id = payload.get("id")
+    token_type= res.json()['token_type']
+    assert id == test_user['id']
+    assert == test_user['email']
+    assert token_type == "bearer"
+    assert res.status_code == 200
+@pytest.mark.parametrize("email, password, status_code", [
+    ('', 'password123', 401),
+    ('', 'wrongpassword', 401),
+    ('', 'wrongpassword', 401),
+    (None, 'password123', 422),
+    ('', None, 422)
+def test_incorrect_login(test_user, client, email, password, status_code):
+    res =
+        "/login", data=json.dumps({"email": email, "password": password}))
+    assert res.status_code == status_code
\ No newline at end of file