In the first part of this series, we set up a FastAPI application with a PostgreSQL database to store weather forecasts. We also designed a robust schema that tracks changes over time using a composite primary key. If you missed it, check out Part 1.
In this post, we’ll:
- Create query endpoints to fetch forecasts.
- Implement a type-2 SCD approach for tracking forecast revisions.
- Discuss strategies for filtering and pagination.
The Challenge 🧠#
Weather forecasts often change over time as meteorological data improves. To support historical analysis and ensure data consistency, we need to:
- Store multiple revisions of a forecast for the same time period.
- Use a type-2 SCD approach to mark the latest forecast as active (
is_current = True
) while archiving older versions.
Step 1: Type-2 SCD Implementation#
We already defined the schema to support this:
is_current
: Indicates if the record is the latest version.row_start_datetime
and row_end_datetime
: Track when each forecast version was active.
Here’s how the upsert logic works:
- When a new forecast is added:
- If a forecast for the same
start_time
exists, it is marked as inactive (is_current = False
). - The new forecast is added as the latest version with
is_current = True
.
- Historical data is preserved for all previous versions.
Repository Function: Upsert Logic#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime
async def upsert_forecast_data(db: AsyncSession, forecast_data: ForecastCreate):
"""
Insert or update weather forecast data using Type-2 Slowly Changing Dimensions (SCD).
"""
query = select(Forecast).filter(
Forecast.start_time == forecast_data.start_time,
Forecast.is_current == True
)
result = await db.execute(query)
existing_record = result.scalar_one_or_none()
# Mark the existing record as inactive
if existing_record:
existing_record.is_current = False
existing_record.row_end_datetime = datetime.utcnow()
db.add(existing_record)
# Insert the new forecast
new_record = Forecast(
start_time=forecast_data.start_time,
end_time=forecast_data.end_time,
temperature=forecast_data.temperature,
temperature_unit=forecast_data.temperature_unit,
relative_humidity=forecast_data.relative_humidity,
wind_speed=forecast_data.wind_speed,
wind_direction=forecast_data.wind_direction,
short_forecast=forecast_data.short_forecast,
icon=forecast_data.icon,
is_current=True,
row_start_datetime=datetime.utcnow(),
row_end_datetime=None,
)
db.add(new_record)
await db.commit()
await db.refresh(new_record)
return new_record
|
Step 2: Fetching Forecasts#
Endpoint: Current Forecasts#
This endpoint retrieves the active forecast (is_current = True
) for a given time range.
Repository Function:#
1
2
3
4
5
6
7
8
9
10
11
12
13
| async def get_current_forecast(db: AsyncSession, days: int):
"""
Retrieve current forecasts for the next specified number of days.
"""
now = datetime.utcnow()
end_date = now + timedelta(days=days)
query = select(Forecast).filter(
Forecast.is_current == True,
Forecast.start_time <= end_date
)
result = await db.execute(query)
return result.scalars().all()
|
Endpoint in router.py
:#
1
2
3
4
5
6
7
8
9
10
11
12
13
| from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.domains.weather.repository import get_current_forecast
router = APIRouter()
@router.get("/current", response_model=list[ForecastResponse])
async def fetch_current_forecasts(days: int, db: AsyncSession = Depends(get_db)):
"""
Fetch current forecasts for the next specified number of days.
"""
return await get_current_forecast(db, days)
|
Endpoint: Historical Revisions#
This endpoint fetches all revisions of a forecast for a specific start_time
.
Repository Function:#
1
2
3
4
5
6
7
| async def get_forecast_revisions(db: AsyncSession, start_time: datetime):
"""
Retrieve all revisions of a forecast for a specific start time.
"""
query = select(Forecast).filter(Forecast.start_time == start_time)
result = await db.execute(query)
return result.scalars().all()
|
Endpoint in router.py
:#
1
2
3
4
5
6
7
| @router.get("/revisions/{start_time}", response_model=list[ForecastResponse])
async def fetch_forecast_revisions(start_time: str, db: AsyncSession = Depends(get_db)):
"""
Fetch all revisions of a forecast for the given start time.
"""
start_time_dt = datetime.fromisoformat(start_time)
return await get_forecast_revisions(db, start_time_dt)
|
Step 3: Filtering and Pagination 🔍#
As your dataset grows, you’ll need efficient ways to filter and paginate results.
Add limit
and offset
parameters to control the number of results and where to start.
Example:#
1
2
3
4
5
6
7
8
9
10
11
12
| async def get_current_forecast_paginated(db: AsyncSession, days: int, limit: int, offset: int):
now = datetime.utcnow()
end_date = now + timedelta(days=days)
query = (
select(Forecast)
.filter(Forecast.is_current == True, Forecast.start_time <= end_date)
.limit(limit)
.offset(offset)
)
result = await db.execute(query)
return result.scalars().all()
|
Testing the New Endpoints 🧪#
Fetching Current Forecasts#
1
| curl "http://127.0.0.1:8000/forecast/current?days=3"
|
Fetching Historical Revisions#
1
| curl "http://127.0.0.1:8000/forecast/revisions/2024-11-26T15:00:00Z"
|
What’s Next?#
In the next part we will look at the OpenAPI spec and how to enhance the models with all information in all ways possible.
Stay tuned! 🌟