Alright, Let’s Start

You know how some things just need all the pieces lined up perfectly, like assembling furniture without an instruction manual? That’s kind of what we’ve been doing here—building a whole setup for handling cryptocurrency data, step by step. And now, it’s time to piece it together with Airflow.

We’re not just moving data around aimlessly, though. The plan is, as it usually goes, to get the data from CoinMarketCap, shape it into something useful, and send it to the FastAPI app for storage. And once it’s sitting pretty in a database, we’ll have what you could call a playground for analysis.


Ok, So Why Use Airflow Anyway?

Sure, you could string together a bunch of scripts on a timer and call it a day, but Airflow brings a little more structure—and let’s be honest, it looks cooler when you show it off. Here’s what makes it worth the effort:

  • It’s kind of like a task checklist: You can map out what needs to happen and in what order, and Airflow keeps everything in line.
  • Handles the mess for you: If something doesn’t work, you’ll know exactly where it broke, which is way better than guessing.
  • Scales up pretty easily: Got more work? Airflow can handle it. It’s sort of like a friend who can carry twice their weight but still remembers to call you back.
  • Logs all the things: You can peek at the history to see what ran, what failed, and what worked like a charm.

So Far, Here’s What We’ve Got

Before we get to the main attraction, let’s set the stage with a little recap of what we’ve built:

  1. Tables and Such

    • Coins: Where the basics live—names, symbols, and all that.
    • Market Data: A log of prices and volumes over time.
    • Tags: Labels for coins that give them a little extra flavor, like ā€œlayer-1ā€ or ā€œmeme-worthy.ā€
  2. API Magic with FastAPI

    • It handles the database side of things, making sure data gets in the right place without a fuss.
    • With Pydantic schemas, the documentation is practically baked in.
  3. Getting Data from CoinMarketCap

    • We’ve already sorted out how to grab info from their API, figuring out things like limits and the quirks of their credit system.

The Airflow DAG: The Part Where It All Comes Together

So, here’s the idea: we’re going to set up a pipeline in Airflow that fetches data, processes it, and sends it to our API endpoints. Think of it as a little data assembly line.


Step 1: Laying Down the DAG Basics

We’ll start by giving Airflow the details on what this whole thing is supposed to do.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import requests

# Base settings for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}

dag = DAG(
    'crypto_data_pipeline',
    default_args=default_args,
    description='An hourly pipeline to handle cryptocurrency data.',
    schedule_interval='0 * * * *',  # Run every hour
    start_date=days_ago(1),
    catchup=False,
)

Step 2: Grabbing the Data

We’ll hit up the CoinMarketCap API, which, let’s be real, is mostly just about passing headers and waiting for JSON.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
COINMARKETCAP_API_URL = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest"
COINMARKETCAP_API_KEY = "your_coinmarketcap_api_key"

def fetch_crypto_data():
    headers = {
        'Accepts': 'application/json',
        'X-CMC_PRO_API_KEY': COINMARKETCAP_API_KEY,
    }
    params = {
        'start': '1',
        'limit': '500',
        'sort': 'market_cap',
        'cryptocurrency_type': 'coins',
    }
    response = requests.get(COINMARKETCAP_API_URL, headers=headers, params=params)
    response.raise_for_status()
    return response.json()

Step 3: Processing and Posting the Data

Now, we take that raw data, split it into chunks (coins, market data, tags), and send each piece to its endpoint.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
FASTAPI_BASE_URL = "http://your-fastapi-app-url/crypto"

def process_and_post_data():
    crypto_data = fetch_crypto_data()

    for coin in crypto_data['data']:
        # Post coin metadata
        coin_payload = {
            "id": coin["id"],
            "name": coin["name"],
            "symbol": coin["symbol"],
            "slug": coin["slug"],
            "num_market_pairs": coin["num_market_pairs"],
            "date_added": coin["date_added"],
            "max_supply": coin.get("max_supply"),
            "total_supply": coin.get("total_supply"),
            "infinite_supply": coin["infinite_supply"],
        }
        requests.post(f"{FASTAPI_BASE_URL}/coins", json=coin_payload).raise_for_status()

        # Post market data
        market_data_payload = {
            "coin_id": coin["id"],
            "last_updated": coin["quote"]["USD"]["last_updated"],
            "price": coin["quote"]["USD"]["price"],
            "volume_24h": coin["quote"]["USD"]["volume_24h"],
            "percent_change_1h": coin["quote"]["USD"].get("percent_change_1h"),
            "percent_change_24h": coin["quote"]["USD"].get("percent_change_24h"),
            "market_cap": coin["quote"]["USD"].get("market_cap"),
        }
        requests.post(f"{FASTAPI_BASE_URL}/market-data", json=[market_data_payload]).raise_for_status()

        # Post tags
        if "tags" in coin:
            tags_payload = [{"tag": tag} for tag in coin["tags"]]
            requests.post(f"{FASTAPI_BASE_URL}/tags?coin_id={coin['id']}", json=tags_payload).raise_for_status()

Step 4: Putting It All Together

Finally, we connect the dots in the DAG.

1
2
3
4
5
fetch_and_post_task = PythonOperator(
    task_id='fetch_and_post_crypto_data',
    python_callable=process_and_post_data,
    dag=dag,
)

Alright, Let’s Wrap This Up

And there it is—a complete, hour-by-hour cryptocurrency data pipeline. This setup brings together Airflow, FastAPI, and CoinMarketCap’s API into something that’s not just functional but also kind of neat. Next up? Playing with the data we’ve collected to find insights or trends—or just to see how RocketPoodle’s doing. šŸ•