Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/soft delete #1230

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions api/utils/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ def paginated_response(
if filters and join is None:
# Apply filters
for attr, value in filters.items():

if value is not None:
query = query.filter(getattr(model, attr).like(f"%{value}%"))
column = getattr(model, attr)
if isinstance(value, bool):
query = query.filter(column == value)
else:
query = query.filter(column.like(f"%{value}%"))

elif filters and join is not None:
# Apply filters
Expand All @@ -84,10 +89,7 @@ def paginated_response(
results = jsonable_encoder(query.offset(skip).limit(limit).all())
total_pages = int(total / limit) + (total % limit > 0)

return success_response(
status_code=200,
message="Successfully fetched items",
data={
return {
"pages": total_pages,
"total": total,
"skip": skip,
Expand All @@ -102,4 +104,4 @@ def paginated_response(
}
)
}
)

63 changes: 59 additions & 4 deletions api/v1/routes/blog.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def create_blog(
if not current_user:
raise HTTPException(status_code=401, detail="You are not Authorized")
blog_service = BlogService(db)
new_blogpost = blog_service.create(db=db, schema=blog, author_id=current_user.id)
new_blogpost = blog_service.create(schema=blog, author_id=current_user.id)

return success_response(
message="Blog created successfully!",
Expand All @@ -55,14 +55,40 @@ def create_blog(
def get_all_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0):
"""Endpoint to get all blogs"""

return paginated_response(
blog = paginated_response(
db=db,
model=Blog,
limit=limit,
skip=skip,
)
return success_response(200, message="Successfully fetched all blogs", data=blog)

@blog.get("/active", response_model=success_response)
def get_all_active_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0):

blog = paginated_response(
db=db,
model=Blog,
limit=limit,
skip=skip,
filters={"is_deleted": False} #filter out soft-deleted blogs
)

return success_response(200, message="Successfully fetched active blogs", data=blog)

@blog.get("/archive", response_model=success_response)
def get_all_blogs(db: Session = Depends(get_db), limit: int = 10, skip: int = 0):

blog = paginated_response(
db=db,
model=Blog,
limit=limit,
skip=skip,
filters={"is_deleted": True}
)
return success_response(200, message="Successfully fetched all archived blogs", data=blog)


# blog search endpoint
@blog.get("/search", response_model=BlogSearchResponse)
def search_blogs(
Expand Down Expand Up @@ -174,6 +200,8 @@ def search_blogs(
"total_results": search_results["total"],
"blogs": processed_blogs
}
return success_response(200, message="Successfully fetched all blogs", data=blog)


@blog.get("/{id}", response_model=BlogPostResponse)
def get_blog_by_id(id: str, db: Session = Depends(get_db)):
Expand Down Expand Up @@ -361,11 +389,10 @@ async def archive_blog_post(
db: Session = Depends(get_db),
current_user: User = Depends(user_service.get_current_super_admin),
):

"""Endpoint to archive/soft-delete a blog post"""

blog_service = BlogService(db=db)
blog_post = blog_service.fetch(blog_id=id)
blog_post = blog_service.fetch(blog_id=blog_id)
if not blog_post:
raise HTTPException(status_code=404, detail="Post not found")
#check if admin/ authorized user
Expand All @@ -382,6 +409,34 @@ async def archive_blog_post(
data=jsonable_encoder(blog_post),
)

@blog.put("/{blog_id}/restore")
async def restore_blog_post(
blog_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(user_service.get_current_super_admin),
):

"""Endpoint to restore a soft-deleted blog post"""

blog_service = BlogService(db=db)
blog_post = blog_service.fetch(blog_id=blog_id)
if not blog_post:
raise HTTPException(status_code=404, detail="Post not found")
#check if admin/ authorized user
if not (blog_post.author_id != current_user.id or current_user.is_superadmin):
raise HTTPException(status_code=403, detail="You don't have permission to perform this action")
if not blog_post.is_deleted:
raise HTTPException(status_code=400, detail="Blog post is already active")
blog_post.is_deleted = False
db.commit()
db.refresh(blog_post)

return success_response(
message="Blog post restored successfully!",
status_code=200,
data=jsonable_encoder(blog_post),
)



# Post a comment to a blog
Expand Down
53 changes: 10 additions & 43 deletions tests/v1/blog/get_all_blogs_test.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock

import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from uuid_extensions import uuid7

from api.v1.models.blog import Blog
from api.v1.routes.blog import get_db

from main import app


# Mock database dependency
@pytest.fixture
def db_session_mock():
db_session = MagicMock(spec=Session)
return db_session
return MagicMock(spec=Session)

@pytest.fixture
def client(db_session_mock):
Expand All @@ -26,60 +18,35 @@ def client(db_session_mock):
app.dependency_overrides = {}

def test_get_all_blogs_empty(client, db_session_mock):
# Mock data
mock_blog_data = []

mock_query = MagicMock()
mock_query.count.return_value = 0
mock_query.all.return_value = []


mock_query.filter.return_value = mock_query
mock_query.offset.return_value = mock_query
mock_query.limit.return_value = mock_query
db_session_mock.query.return_value = mock_query
db_session_mock.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = mock_blog_data


# Call the endpoint
response = client.get("/api/v1/blogs")

# Assert the response
assert response.status_code == 200
assert response.json()["data"]["items"] == []

def test_get_all_blogs_with_data(client, db_session_mock):
blog_id = str(uuid7())
author_id = str(uuid7())
timezone_offset = -8.0
tzinfo = timezone(timedelta(hours=timezone_offset))
timeinfo = datetime.now(tzinfo)
created_at = timeinfo
updated_at = timeinfo

# Mock data
mock_blog_data = [
Blog(
id=blog_id,
author_id=author_id,
title="Test Blog",
content="Test Content",
image_url="http://example.com/image.png",
tags=["test", "blog"],
is_deleted=False,
excerpt="Test Excerpt",
created_at=created_at,
updated_at=updated_at
)
{"id": "123", "title": "Test Blog", "content": "Test Content"}
]

mock_query = MagicMock()
mock_query.count.return_value = 1
db_session_mock.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = mock_blog_data
mock_query.all.return_value = mock_blog_data

mock_query.filter.return_value = mock_query
mock_query.offset.return_value = mock_query
mock_query.limit.return_value = mock_query
db_session_mock.query.return_value = mock_query

# Call the endpoint
response = client.get("/api/v1/blogs")

# Assert the response
assert response.status_code == 200
assert len(response.json().get('data')) >= 1

assert len(response.json()["data"]["items"]) >= 1
100 changes: 100 additions & 0 deletions tests/v1/blog/get_archive_blogs_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock

import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from uuid_extensions import uuid7

from api.v1.models.blog import Blog
from api.v1.routes.blog import get_db

from main import app
@pytest.fixture
def mock_db_session():
mock_db = MagicMock(spec=Session)
return mock_db

@pytest.fixture
def client(db_session_mock):
app.dependency_overrides[get_db] = lambda: db_session_mock
client = TestClient(app)
yield client
app.dependency_overrides = {}


def test_get_archive_blogs(mock_db_session, monkeypatch):
def override_get_db():
return mock_db_session

monkeypatch.setattr("api.v1.routes.blog.get_db", override_get_db)

mock_db_session.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = []

response = client.get("/blog/archive?limit=5&skip=0")


assert response.status_code == 200
json_data = response.json()
assert json_data["status"] == "success"
assert json_data["message"] == "Successfully fetched all archived blogs"
assert isinstance(json_data["data"], list)


# def test_get_all_active_blogs(mock_db_session):
# response = client.get("/api/v1/blogs/active")
# assert response.status_code == 200
# assert response.json()["message"] == "Successfully fetched active blogs"
# assert isinstance(response.json()["data"], list)



@pytest.fixture
def mock_db_session():
return MagicMock(spec=Session)

@pytest.fixture
def client(mock_db_session):
app.dependency_overrides[get_db] = lambda: mock_db_session
client = TestClient(app)
yield client
app.dependency_overrides = {}

def test_get_archive_blogs(client, mock_db_session):
mock_db_session.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = []

response = client.get("/api/v1/blogs/archive?limit=5&skip=0")

assert response.status_code == 200
json_data = response.json()
assert json_data["status"] == "success"
assert json_data["message"] == "Successfully fetched all archived blogs"
assert isinstance(json_data["data"]["items"], list)


from unittest.mock import MagicMock
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from api.v1.routes.blog import get_db
from main import app

@pytest.fixture
def mock_db_session():
return MagicMock(spec=Session)

@pytest.fixture
def client(mock_db_session):
app.dependency_overrides[get_db] = lambda: mock_db_session
client = TestClient(app)
yield client
app.dependency_overrides = {}

def test_get_all_active_blogs(client, mock_db_session):
mock_db_session.query.return_value.filter.return_value.offset.return_value.limit.return_value.all.return_value = []

response = client.get("/api/v1/blogs/active")

assert response.status_code == 200
assert response.json()["message"] == "Successfully fetched active blogs"
assert isinstance(response.json()["data"]["items"], list)
Loading
Loading