In the first part of this series, we set up a FastAPI application with a PostgreSQL database to store weather forecasts. We also designed a robust schema that tracks changes over time using a composite primary key. If you missed it, check out Part 1.

In this post, we’ll:

  1. Create query endpoints to fetch forecasts.
  2. Implement a type-2 SCD approach for tracking forecast revisions.
  3. Discuss strategies for filtering and pagination.

The Challenge 🧠

Weather forecasts often change over time as meteorological data improves. To support historical analysis and ensure data consistency, we need to:

  1. Store multiple revisions of a forecast for the same time period.
  2. Use a type-2 SCD approach to mark the latest forecast as active (is_current = True) while archiving older versions.

Step 1: Type-2 SCD Implementation

We already defined the schema to support this:

  • is_current: Indicates if the record is the latest version.
  • row_start_datetime and row_end_datetime: Track when each forecast version was active.

Here’s how the upsert logic works:

  1. When a new forecast is added:
    • If a forecast for the same start_time exists, it is marked as inactive (is_current = False).
    • The new forecast is added as the latest version with is_current = True.
  2. Historical data is preserved for all previous versions.

Repository Function: Upsert Logic

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime

async def upsert_forecast_data(db: AsyncSession, forecast_data: ForecastCreate):
    """
    Insert or update weather forecast data using Type-2 Slowly Changing Dimensions (SCD).
    """
    query = select(Forecast).filter(
        Forecast.start_time == forecast_data.start_time,
        Forecast.is_current == True
    )
    result = await db.execute(query)
    existing_record = result.scalar_one_or_none()

    # Mark the existing record as inactive
    if existing_record:
        existing_record.is_current = False
        existing_record.row_end_datetime = datetime.utcnow()
        db.add(existing_record)

    # Insert the new forecast
    new_record = Forecast(
        start_time=forecast_data.start_time,
        end_time=forecast_data.end_time,
        temperature=forecast_data.temperature,
        temperature_unit=forecast_data.temperature_unit,
        relative_humidity=forecast_data.relative_humidity,
        wind_speed=forecast_data.wind_speed,
        wind_direction=forecast_data.wind_direction,
        short_forecast=forecast_data.short_forecast,
        icon=forecast_data.icon,
        is_current=True,
        row_start_datetime=datetime.utcnow(),
        row_end_datetime=None,
    )
    db.add(new_record)
    await db.commit()
    await db.refresh(new_record)
    return new_record

Step 2: Fetching Forecasts

Endpoint: Current Forecasts

This endpoint retrieves the active forecast (is_current = True) for a given time range.

Repository Function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
async def get_current_forecast(db: AsyncSession, days: int):
    """
    Retrieve current forecasts for the next specified number of days.
    """
    now = datetime.utcnow()
    end_date = now + timedelta(days=days)

    query = select(Forecast).filter(
        Forecast.is_current == True,
        Forecast.start_time <= end_date
    )
    result = await db.execute(query)
    return result.scalars().all()

Endpoint in router.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.domains.weather.repository import get_current_forecast

router = APIRouter()

@router.get("/current", response_model=list[ForecastResponse])
async def fetch_current_forecasts(days: int, db: AsyncSession = Depends(get_db)):
    """
    Fetch current forecasts for the next specified number of days.
    """
    return await get_current_forecast(db, days)

Endpoint: Historical Revisions

This endpoint fetches all revisions of a forecast for a specific start_time.

Repository Function:

1
2
3
4
5
6
7
async def get_forecast_revisions(db: AsyncSession, start_time: datetime):
    """
    Retrieve all revisions of a forecast for a specific start time.
    """
    query = select(Forecast).filter(Forecast.start_time == start_time)
    result = await db.execute(query)
    return result.scalars().all()

Endpoint in router.py:

1
2
3
4
5
6
7
@router.get("/revisions/{start_time}", response_model=list[ForecastResponse])
async def fetch_forecast_revisions(start_time: str, db: AsyncSession = Depends(get_db)):
    """
    Fetch all revisions of a forecast for the given start time.
    """
    start_time_dt = datetime.fromisoformat(start_time)
    return await get_forecast_revisions(db, start_time_dt)

Step 3: Filtering and Pagination 🔍

As your dataset grows, you’ll need efficient ways to filter and paginate results.

Pagination with Query Parameters

Add limit and offset parameters to control the number of results and where to start.

Example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
async def get_current_forecast_paginated(db: AsyncSession, days: int, limit: int, offset: int):
    now = datetime.utcnow()
    end_date = now + timedelta(days=days)

    query = (
        select(Forecast)
        .filter(Forecast.is_current == True, Forecast.start_time <= end_date)
        .limit(limit)
        .offset(offset)
    )
    result = await db.execute(query)
    return result.scalars().all()

Testing the New Endpoints 🧪

Fetching Current Forecasts

1
curl "http://127.0.0.1:8000/forecast/current?days=3"

Fetching Historical Revisions

1
curl "http://127.0.0.1:8000/forecast/revisions/2024-11-26T15:00:00Z"

What’s Next?

In the next part we will look at the OpenAPI spec and how to enhance the models with all information in all ways possible.

Stay tuned! 🌟