So, you’ve got weather data from the Weather.gov API. You’ve built a FastAPI app that transforms and stores data using a Type 2 Slowly Changing Dimension (SCD) model. And now you’re wondering: “How do I tie this all together in a way that looks effortless but secretly makes me feel like a data wizard?”

Enter Airflow, the glue that orchestrates your data pipeline. Today, we’re bringing together orchestration, transformation, and storage into a single, elegant workflow that will:

  1. 🌍 Fetch hourly weather data from the Weather.gov API.
  2. 🛠️ Transform it into something useful (and historically traceable) with FastAPI.
  3. 🗄️ Store it in PostgreSQL for future analysis, because nothing says “I’m serious about data” like a well-indexed database.

Ready? Let’s dive into the details.


The Layers of Our Data Stack 💡

Modern data pipelines are like onions (or parfaits, if you prefer)—they have layers. Here’s how ours stacks up:

  1. Orchestration (Airflow): Keeps everything running smoothly and on schedule. Think of it as the responsible friend who makes sure no one forgets to show up.
  2. Transformation (FastAPI): Processes raw API data into a Type 2 SCD format, because sometimes you just need to know exactly when a forecast said it wouldn’t rain and then promptly rained anyway.
  3. Storage (PostgreSQL): Holds the data so you can analyze it later. Bonus: it’s highly queryable, which is just a fancy way of saying, “You can actually find stuff when you need it.”

By layering these tools, we’ve built a system that’s robust, scalable, and slightly over-engineered—just the way we like it.


The Airflow DAG: Orchestrating the Magic ⚙️

Airflow lets us define workflows as Directed Acyclic Graphs (DAGs), which is just a technical way of saying, “Things happen in a specific order, and we don’t go backward.” In this case, our DAG will:

  1. Fetch weather data from the Weather.gov API.
  2. Send that data to the FastAPI app for transformation and storage.

Here’s the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests

# Task 1: Fetch Weather Data 🌦️
def fetch_weather_data():
    latitude, longitude = 37.209, -93.2923  # Springfield, MO
    point_url = f"https://api.weather.gov/points/{latitude},{longitude}"
    headers = {"User-Agent": "WeatherDataPipeline (your_email@example.com)"}
    point_response = requests.get(point_url, headers=headers)
    point_response.raise_for_status()
    
    # Extract the forecastHourly URL
    forecast_hourly_url = point_response.json()["properties"]["forecastHourly"]
    forecast_response = requests.get(forecast_hourly_url, headers=headers)
    forecast_response.raise_for_status()
    
    return forecast_response.json()["properties"]["periods"]

# Task 2: Transform and Store Data 🛠️
def post_to_api(forecast_data):
    api_url = "http://fastapi-container:8000/forecast/"
    headers = {"Content-Type": "application/json"}
    
    for period in forecast_data:
        payload = {
            "start_time": period["startTime"],
            "end_time": period["endTime"],
            "temperature": period["temperature"],
            "temperature_unit": period["temperatureUnit"],
            "relative_humidity": period["relativeHumidity"]["value"],
            "wind_speed": period["windSpeed"],
            "wind_direction": period["windDirection"],
            "short_forecast": period["shortForecast"],
            "icon": period["icon"],
        }
        response = requests.post(api_url, json=payload, headers=headers)
        response.raise_for_status()

# Default arguments for the DAG
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Define the DAG 🌟
with DAG(
    "weather_pipeline",
    default_args=default_args,
    description="Pipeline to fetch, transform, and store weather data",
    schedule_interval="0 */2 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    # Define tasks
    fetch_task = PythonOperator(
        task_id="fetch_weather_data",
        python_callable=fetch_weather_data,
    )

    post_task = PythonOperator(
        task_id="post_to_api",
        python_callable=post_to_api,
        op_args=[fetch_task.output],  # Pass output from fetch_task
    )

    # Set task dependencies
    fetch_task >> post_task

What’s Next? 🚀

Now that you’ve built a fully operational data pipeline (and celebrated its success, I assume), it’s time to consider what else you can add to this masterpiece. Here are a few ideas to keep things interesting:

  • 🛠️ Custom Events and Observability: Knowing what’s happening in your pipeline is half the battle. I’m planning to add custom endpoints to FastAPI for logging events and notifications, so Airflow can tattletale on itself. Curious? Stick around for tomorrow’s article! 🎉

  • 📊 Dashboard Integration: Sure, your pipeline is running fine now, but why not make it look impressive too? Add a dashboard with Grafana or Superset so you can keep an eye on trends, anomalies, or that one temperature reading that always seems suspicious.

  • 🤖 Machine Learning: Use your Type 2 SCD weather data for predictive analytics. Whether it’s forecasting energy needs, optimizing logistics, or just proving you know what “gradient descent” means, the possibilities are endless.

This pipeline isn’t just a project; it’s a jumping-off point. What you build next is up to you—just make sure to tell people you’re doing it for science and not because you enjoy fiddling with your data stack more than you probably should. 😉