diff --git a/.github/workflows/regression-test.yml b/.github/workflows/regression-test.yml index 692edfea2..cdb220e36 100644 --- a/.github/workflows/regression-test.yml +++ b/.github/workflows/regression-test.yml @@ -43,9 +43,6 @@ jobs: password: ${{ secrets.PASSWORD }} script: | cd hng_boilerplate_python_fastapi_web/staging - git add . - git stash - git pull origin staging python3 update_api_status.py diff --git a/api/v1/routes/product.py b/api/v1/routes/product.py index fe8c2a941..0efa71a9a 100644 --- a/api/v1/routes/product.py +++ b/api/v1/routes/product.py @@ -3,7 +3,7 @@ from sqlalchemy.orm import Session from sqlalchemy import func from typing import Annotated -from typing import List +from typing import List, Optional from api.utils.pagination import paginated_response from api.utils.success_response import success_response @@ -33,8 +33,10 @@ @non_organisation_product.get("", response_model=success_response, status_code=200) async def get_all_products( current_user: Annotated[User, Depends(user_service.get_current_super_admin)], - limit: Annotated[int, Query(ge=1, description="Number of products per page")] = 10, - skip: Annotated[int, Query(ge=1, description="Page number (starts from 1)")] = 0, + limit: Annotated[int, Query( + ge=1, description="Number of products per page")] = 10, + skip: Annotated[int, Query( + ge=1, description="Page number (starts from 1)")] = 0, db: Session = Depends(get_db), ): """Endpoint to get all products. Only accessible to superadmin""" @@ -62,7 +64,8 @@ def create_product_category( HTTPException: 401 FORBIDDEN (Current user is not a authenticated) """ - new_category = ProductCategoryService.create(db, category_schema, current_user) + new_category = ProductCategoryService.create( + db, category_schema, current_user) return success_response( status_code=status.HTTP_201_CREATED, @@ -98,7 +101,8 @@ def retrieve_categories( ) -product = APIRouter(prefix="/organisations/{org_id}/products", tags=["Products"]) +product = APIRouter( + prefix="/organisations/{org_id}/products", tags=["Products"]) # create @@ -249,8 +253,10 @@ def delete_product( def get_organisation_products( org_id: str, current_user: Annotated[User, Depends(user_service.get_current_user)], - limit: Annotated[int, Query(ge=1, description="Number of products per page")] = 10, - page: Annotated[int, Query(ge=1, description="Page number (starts from 1)")] = 1, + limit: Annotated[int, Query( + ge=1, description="Number of products per page")] = 10, + page: Annotated[int, Query( + ge=1, description="Page number (starts from 1)")] = 1, db: Session = Depends(get_db), ): """ @@ -326,7 +332,8 @@ async def get_products_by_filter_status( message="Products retrieved successfully", status_code=200, data=products ) except Exception as e: - raise HTTPException(status_code=500, detail="Failed to retrieve products") + raise HTTPException( + status_code=500, detail="Failed to retrieve products") @product.get( @@ -342,9 +349,58 @@ async def get_products_by_status( ): """Endpoint to get products by status""" try: - products = product_service.fetch_by_status(db=db, org_id=org_id, status=status) + products = product_service.fetch_by_status( + db=db, org_id=org_id, status=status) return SuccessResponse( message="Products retrieved successfully", status_code=200, data=products ) except Exception as e: - raise HTTPException(status_code=500, detail="Failed to retrieve products") + raise HTTPException( + status_code=500, detail="Failed to retrieve products") + + +@product.get("/search", status_code=status.HTTP_200_OK, response_model=ProductList) +def search_products( + org_id: str, + name: Optional[str] = Query(None, description="Search by product name"), + category: Optional[str] = Query(None, description="Filter by category"), + min_price: Optional[float] = Query( + None, description="Filter by minimum price"), + max_price: Optional[float] = Query( + None, description="Filter by maximum price"), + limit: Annotated[int, Query( + ge=1, description="Number of products per page")] = 10, + page: Annotated[int, Query( + ge=1, description="Page number (starts from 1)")] = 1, + current_user: Annotated[User, Depends( + user_service.get_current_user)] = None, + db: Session = Depends(get_db), +): + """ + Endpoint to search for products with optional filters and pagination. + + Query parameters: + - name: Search by product name + - category: Filter by category + - min_price: Filter by minimum price + - max_price: Filter by maximum price + - limit: Number of products per page (default: 10, minimum: 1) + - page: Page number (starts from 1) + """ + + products = product_service.search_products( + db=db, + org_id=org_id, + name=name, + category=category, + min_price=min_price, + max_price=max_price, + limit=limit, + page=page, + ) + + return success_response( + status_code=200, + message="Products searched successfully", + data=[jsonable_encoder(product) for product in products], + ) diff --git a/api/v1/services/product.py b/api/v1/services/product.py index 61cce2f3a..c4860eb4d 100644 --- a/api/v1/services/product.py +++ b/api/v1/services/product.py @@ -134,7 +134,8 @@ def fetch_all(self, db: Session, **query_params: Optional[Any]): if query_params: for column, value in query_params.items(): if hasattr(Product, column) and value: - query = query.filter(getattr(Product, column).ilike(f"%{value}%")) + query = query.filter( + getattr(Product, column).ilike(f"%{value}%")) return query.all() @@ -214,6 +215,34 @@ def fetch_stock( "last_updated": product.updated_at, } + def search_products( + db: Session, + org_id: str, + name: Optional[str] = None, + category: Optional[str] = None, + min_price: Optional[float] = None, + max_price: Optional[float] = None, + limit: int = 10, + page: int = 1, + ): + + query = db.query(Product).filter(Product.org_id == org_id) + + if name: + query = query.filter(Product.name.ilike(f"%{name}%")) + if category: + query = query.filter(Product.category.ilike(f"%{category}%")) + + if min_price is not None: + query = query.filter(Product.price >= min_price) + if max_price is not None: + query = query.filter(Product.price <= max_price) + + offset = (page - 1) * limit + products = query.offset(offset).limit(limit).all() + + return products + class ProductCategoryService(Service): """Product categories service functionality""" @@ -252,7 +281,6 @@ def fetch_all(db: Session, **query_params: Optional[Any]): ) return query.all() - - - + + product_service = ProductService() diff --git a/tests/v1/product/test_product_search.py b/tests/v1/product/test_product_search.py new file mode 100644 index 000000000..d845abd6f --- /dev/null +++ b/tests/v1/product/test_product_search.py @@ -0,0 +1,82 @@ +from datetime import datetime +from unittest.mock import MagicMock, patch +from fastapi import HTTPException +import pytest +from fastapi.testclient import TestClient +from main import app +from api.v1.models.user import User +from api.v1.models.product import Product +from api.db.database import get_db +from api.v1.services.user import user_service +from api.v1.services.product import product_service +from uuid_extensions import uuid7 + +client = TestClient(app) +user_id = str(uuid7()) +org_id = str(uuid7()) + + +@pytest.fixture +def mock_db_session(): + """Fixture to create a mock database session.""" + with patch("api.db.database.get_db", autospec=True) as mock_get_db: + mock_db = MagicMock() + app.dependency_overrides[get_db] = lambda: mock_db + yield mock_db + app.dependency_overrides = {} + + +@pytest.fixture +def mock_search_products(): + """Fixture to mock the search_products service function.""" + with patch("api.v1.services.product.product_service.search_products", autospec=True) as mock_search_products: + yield mock_search_products + + +@pytest.mark.asyncio +async def test_search_products_success(mock_db_session, mock_search_products): + + mock_search_products.return_value = [ + { + "id": str(uuid7()), + "name": "Test Product", + "description": "A test product", + "price": 100.0, + "category": "Test Category", + "quantity": 10, + "image_url": "http://example.com/image.jpg", + "archived": False, + "created_at": datetime.utcnow().isoformat() + } + ] + access_token = user_service.create_access_token(str(user_id)) + + response = client.get( + f"/api/v1/organisations/{org_id}/products/search?name=Test", + headers={"Authorization": f"Bearer {access_token}"}, + ) + + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_search_products_no_results(mock_db_session, mock_search_products): + + mock_search_products.return_value = [] + access_token = user_service.create_access_token(str(user_id)) + + response = client.get( + f"/api/v1/organisations/{org_id}/products/search?name=NonExistentProduct", + headers={"Authorization": f"Bearer {access_token}"}, + ) + + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_search_products_unauthorized(mock_db_session): + + response = client.get( + f"/api/v1/organisations/{org_id}/products/search?name=Test") + + assert response.status_code == 401