Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Pagination Support to GET /api/v1/activity-log Endpoint #1197 #1223

Open
wants to merge 15 commits into
base: dev
Choose a base branch
from
Open
10 changes: 9 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ celerybeat.pid
# Environments
.env*
!.env.sample
.venv
.blog_env/

env/
Expand Down Expand Up @@ -173,3 +172,12 @@ cython_debug/
#.idea/

alembic/versions

# Ignore environment and virtual environment files
.env
.env.*
.ven
.venv
env
alembic/env.py

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ GRANT ALL PRIVILEGES ON DATABASE hng_fast_api TO user;

**Starting the database**
after cloning the database, dont run
`alembic revision --autogenerate -m 'initial migration'`
`alembic revision --autogenerate -m al'initial migration'`
but run
`alembic upgrade head`

Expand Down
16 changes: 12 additions & 4 deletions api/v1/routes/activity_logs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends, status, HTTPException
from fastapi import APIRouter, Depends, status, HTTPException, Query
from fastapi.encoders import jsonable_encoder
from sqlalchemy.orm import Session
from api.v1.models.user import User
Expand Down Expand Up @@ -30,11 +30,17 @@ async def create_activity_log(
)



@activity_logs.get("", response_model=list[ActivityLogResponse])
async def get_all_activity_logs(current_user: User = Depends(user_service.get_current_super_admin), db: Session = Depends(get_db)):
'''Get all activity logs'''
async def get_all_activity_logs(
page: int = Query(1, ge=1),
limit: int = Query(10, le=100),
current_user: User = Depends(user_service.get_current_super_admin),
db: Session = Depends(get_db)
):

activity_logs = activity_log_service.fetch_all(db=db)
"""Get paginated activity logs"""
activity_logs = activity_log_service.fetch_all(db=db, page=page, limit=limit)
Comment on lines +42 to +43
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just passing the page and page limits doesn't mean it's reflected in the implementation.


return success_response(
status_code=200,
Expand All @@ -45,6 +51,8 @@ async def get_all_activity_logs(current_user: User = Depends(user_service.get_cu
@activity_logs.get("/{user_id}", status_code=status.HTTP_200_OK)
async def fetch_all_users_activity_log(
user_id: str,
page: int = Query(1, ge=1),
limit: int = Query(10, le=100),
db: Session = Depends(get_db),
current_user: User = Depends(user_service.get_current_super_admin)
):
Expand Down
28 changes: 16 additions & 12 deletions api/v1/services/activity_logs.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,44 @@
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import desc
from api.v1.models.activity_logs import ActivityLog
from typing import Optional, Any



class ActivityLogService:
"""Activity Log service"""

def create_activity_log(self, db: Session, user_id: str, action: str):
"""Creates a new activity log"""

activity_log = ActivityLog(user_id=user_id, action=action)
db.add(activity_log)
db.commit()
db.refresh(activity_log)
return activity_log

def fetch_all(self, db: Session, **query_params: Optional[Any]):
"""Fetch all products with option tto search using query parameters"""

def fetch_all(self, db: Session, page: int, limit: int, user_id: Optional[str] = None, **query_params: Optional[Any]):
"""Fetch all products with option to search using query parameters"""
offset = (page - 1) * limit
query = db.query(ActivityLog)

# Enable filter by query parameter
if user_id: # Filter by user id
query = query.filter(ActivityLog.user_id == user_id)

# Enable filter by arbitrary query parameters
if query_params:
for column, value in query_params.items():
if hasattr(ActivityLog, column) and value:
query = query.filter(
getattr(ActivityLog, column).ilike(f"%{value}%")
)

return query.all()

def delete_activity_log_by_id(self, db: Session, log_id: str):


query = query.order_by(desc(ActivityLog.timestamp))
paginated_logs = query.offset(offset).limit(limit).all()
return paginated_logs

def delete_activity_log_by_id(self, db: Session, log_id: str) -> dict:
"""Delete an activity log by ID with error handling"""
log = db.query(ActivityLog).filter(ActivityLog.id == log_id).first()

if not log:
Expand All @@ -47,5 +52,4 @@ def delete_activity_log_by_id(self, db: Session, log_id: str):

return {"status": "success", "detail": f"Activity log with ID {log_id} deleted successfully"}


activity_log_service = ActivityLogService()
14 changes: 14 additions & 0 deletions tests/v1/activity_logs/test_get_all_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,17 @@ def test_get_all_activity_logs_non_super_admin(mock_user_service, mock_db_sessio
'Authorization': f'Bearer {access_token}'})

assert response.status_code == status.HTTP_403_FORBIDDEN


@pytest.mark.usefixture("mock_db_session", "mock_user_service")
def test_fetch_all_pagination(test_client):
response = test_client.get("/activity-logs?page=1&limit=5")
assert response.status_code == 200
data = response.json()

assert len(data["data"]) == 5 # Ensure it returns exactly 5 logs

# Verify sorting by created_at in descending order
timestamps = [log["created_at"] for log in data["data"]]
assert timestamps == sorted(timestamps, reverse=True)

126 changes: 126 additions & 0 deletions tests/v1/activity_logs/test_pagination_activity_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch, MagicMock
from main import app # Assuming your main app file is main.py
from api.v1.models.user import User
from api.v1.services.user import user_service
from api.v1.models.activity_logs import ActivityLog # Ensure you import your ActivityLog MODEL, not schema
from uuid_extensions import uuid7
from api.db.database import get_db
from fastapi import status
from datetime import datetime, timezone, timedelta
from typing import List


@pytest.mark.usefixtures("mock_db_session", "mock_user_service")
def test_get_all_activity_logs_default_pagination(mock_user_service, mock_db_session):
"""Test default pagination (no page/limit params)."""
mock_user = create_mock_user(mock_user_service, mock_db_session)
access_token = user_service.create_access_token(user_id=str(uuid7()))

# Mock 12 activity logs to test default limit of 10
mock_logs = [
create_mock_activity_log(str(uuid7()), str(uuid7()), f"action_{i}", datetime.now(timezone.utc))
for i in range(12)
]
mock_db_session.query.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = mock_logs[:10] # Mock first 10 for page 1

response = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'})

assert response.status_code == status.HTTP_200_OK
response_data = response.json()
assert isinstance(response_data, list)
assert len(response_data) == 10 # Verify default limit is 10


@pytest.mark.usefixtures("mock_db_session", "mock_user_service")
def test_get_all_activity_logs_limit_5(mock_user_service, mock_db_session):
"""Test setting limit to 5."""
mock_user = create_mock_user(mock_user_service, mock_db_session)
access_token = user_service.create_access_token(user_id=str(uuid7()))

mock_logs = [
create_mock_activity_log(str(uuid7()), str(uuid7()), f"action_{i}", datetime.now(timezone.utc))
for i in range(7) # Mock 7 logs, limit should return max 5
]
mock_db_session.query.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = mock_logs[:5] # Mock first 5 for limit 5


response = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}, params={'limit': 5})

assert response.status_code == status.HTTP_200_OK
response_data = response.json()
assert isinstance(response_data, list)
assert len(response_data) == 5 # Verify limit is respected


@pytest.mark.usefixtures("mock_db_session", "mock_user_service")
def test_get_all_activity_logs_page_2_limit_5(mock_user_service, mock_db_session):
"""Test getting page 2 with limit 5."""
mock_user = create_mock_user(mock_user_service, mock_db_session)
access_token = user_service.create_access_token(user_id=str(uuid7()))

mock_logs = [
create_mock_activity_log(str(uuid7()), str(uuid7()), f"action_{i}", datetime.now(timezone.utc))
for i in range(12) # Mock 12 logs
]
mock_db_session.query.return_value.order_by.return_value.offset.return_value.limit.return_value.all.side_effect = [ # Use side_effect to return different data for each page request
mock_logs[:5], # Page 1 (items 0-4)
mock_logs[5:10], # Page 2 (items 5-9)
[] # Page 3 and beyond (empty list)
]

response_page1 = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}, params={'page': 1, 'limit': 5})
response_page2 = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}, params={'page': 2, 'limit': 5})
response_page3 = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'}, params={'page': 3, 'limit': 5})


assert response_page1.status_code == status.HTTP_200_OK
assert len(response_page1.json()) == 5
assert response_page2.status_code == status.HTTP_200_OK
assert len(response_page2.json()) == 5
assert response_page3.status_code == status.HTTP_200_OK
assert len(response_page3.json()) == 0 # Page 3 should be empty


# Add more pagination test cases here, like testing page beyond last page, etc.


@pytest.mark.usefixtures("mock_db_session", "mock_user_service")
def test_get_all_activity_logs_empty(mock_user_service, mock_db_session):
"""Test for fetching all activity logs with no data."""
mock_user = create_mock_user(mock_user_service, mock_db_session)
access_token = user_service.create_access_token(user_id=str(uuid7()))
mock_db_session.query.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = [] # Mock empty list for no data
response = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'})

assert response.status_code == status.HTTP_200_OK
assert response.json() == [] # Verify empty list when no data


@pytest.mark.usefixtures("mock_db_session", "mock_user_service")
def test_get_all_activity_logs_with_data(mock_user_service, mock_db_session):
"""Test for fetching all activity logs with data."""
mock_user = create_mock_user(mock_user_service, mock_db_session)
access_token = user_service.create_access_token(user_id=str(uuid7()))

mock_logs = [create_mock_activity_log(str(uuid7()), str(uuid7()), "profile Update", datetime.now(timezone.utc))]
mock_db_session.query.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = mock_logs # Mock a single log

response = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'})

assert response.status_code == status.HTTP_200_OK
response_data = response.json()
assert isinstance(response_data, list)
assert len(response_data) == 1 # Verify one log returned


@pytest.mark.usefixtures("mock_db_session", "mock_user_service")
def test_get_all_activity_logs_non_super_admin(mock_user_service, mock_db_session):
"""Test for fetching all activity logs as a non-super admin user."""
mock_user = create_mock_user(
mock_user_service, mock_db_session, is_superadmin=False)
access_token = user_service.create_access_token(user_id=str(uuid7()))
response = client.get(ACTIVITY_LOGS_ENDPOINT, headers={'Authorization': f'Bearer {access_token}'})

assert response.status_code