Alright, Letā€™s Start

You know how some things just need all the pieces lined up perfectly, like assembling furniture without an instruction manual? Thatā€™s kind of what weā€™ve been doing hereā€”building a whole setup for handling cryptocurrency data, step by step. And now, itā€™s time to piece it together with Airflow.

Weā€™re not just moving data around aimlessly, though. The plan is, as it usually goes, to get the data from CoinMarketCap, shape it into something useful, and send it to the FastAPI app for storage. And once itā€™s sitting pretty in a database, weā€™ll have what you could call a playground for analysis.


Ok, So Why Use Airflow Anyway?

Sure, you could string together a bunch of scripts on a timer and call it a day, but Airflow brings a little more structureā€”and letā€™s be honest, it looks cooler when you show it off. Hereā€™s what makes it worth the effort:

  • Itā€™s kind of like a task checklist: You can map out what needs to happen and in what order, and Airflow keeps everything in line.
  • Handles the mess for you: If something doesnā€™t work, youā€™ll know exactly where it broke, which is way better than guessing.
  • Scales up pretty easily: Got more work? Airflow can handle it. Itā€™s sort of like a friend who can carry twice their weight but still remembers to call you back.
  • Logs all the things: You can peek at the history to see what ran, what failed, and what worked like a charm.

So Far, Hereā€™s What Weā€™ve Got

Before we get to the main attraction, letā€™s set the stage with a little recap of what weā€™ve built:

  1. Tables and Such

    • Coins: Where the basics liveā€”names, symbols, and all that.
    • Market Data: A log of prices and volumes over time.
    • Tags: Labels for coins that give them a little extra flavor, like ā€œlayer-1ā€ or ā€œmeme-worthy.ā€
  2. API Magic with FastAPI

    • It handles the database side of things, making sure data gets in the right place without a fuss.
    • With Pydantic schemas, the documentation is practically baked in.
  3. Getting Data from CoinMarketCap

    • Weā€™ve already sorted out how to grab info from their API, figuring out things like limits and the quirks of their credit system.

The Airflow DAG: The Part Where It All Comes Together

So, hereā€™s the idea: weā€™re going to set up a pipeline in Airflow that fetches data, processes it, and sends it to our API endpoints. Think of it as a little data assembly line.


Step 1: Laying Down the DAG Basics

Weā€™ll start by giving Airflow the details on what this whole thing is supposed to do.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import requests

# Base settings for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}

dag = DAG(
    'crypto_data_pipeline',
    default_args=default_args,
    description='An hourly pipeline to handle cryptocurrency data.',
    schedule_interval='0 * * * *',  # Run every hour
    start_date=days_ago(1),
    catchup=False,
)

Step 2: Grabbing the Data

Weā€™ll hit up the CoinMarketCap API, which, letā€™s be real, is mostly just about passing headers and waiting for JSON.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
COINMARKETCAP_API_URL = "https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest"
COINMARKETCAP_API_KEY = "your_coinmarketcap_api_key"

def fetch_crypto_data():
    headers = {
        'Accepts': 'application/json',
        'X-CMC_PRO_API_KEY': COINMARKETCAP_API_KEY,
    }
    params = {
        'start': '1',
        'limit': '500',
        'sort': 'market_cap',
        'cryptocurrency_type': 'coins',
    }
    response = requests.get(COINMARKETCAP_API_URL, headers=headers, params=params)
    response.raise_for_status()
    return response.json()

Step 3: Processing and Posting the Data

Now, we take that raw data, split it into chunks (coins, market data, tags), and send each piece to its endpoint.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
FASTAPI_BASE_URL = "http://your-fastapi-app-url/crypto"

def process_and_post_data():
    crypto_data = fetch_crypto_data()

    for coin in crypto_data['data']:
        # Post coin metadata
        coin_payload = {
            "id": coin["id"],
            "name": coin["name"],
            "symbol": coin["symbol"],
            "slug": coin["slug"],
            "num_market_pairs": coin["num_market_pairs"],
            "date_added": coin["date_added"],
            "max_supply": coin.get("max_supply"),
            "total_supply": coin.get("total_supply"),
            "infinite_supply": coin["infinite_supply"],
        }
        requests.post(f"{FASTAPI_BASE_URL}/coins", json=coin_payload).raise_for_status()

        # Post market data
        market_data_payload = {
            "coin_id": coin["id"],
            "last_updated": coin["quote"]["USD"]["last_updated"],
            "price": coin["quote"]["USD"]["price"],
            "volume_24h": coin["quote"]["USD"]["volume_24h"],
            "percent_change_1h": coin["quote"]["USD"].get("percent_change_1h"),
            "percent_change_24h": coin["quote"]["USD"].get("percent_change_24h"),
            "market_cap": coin["quote"]["USD"].get("market_cap"),
        }
        requests.post(f"{FASTAPI_BASE_URL}/market-data", json=[market_data_payload]).raise_for_status()

        # Post tags
        if "tags" in coin:
            tags_payload = [{"tag": tag} for tag in coin["tags"]]
            requests.post(f"{FASTAPI_BASE_URL}/tags?coin_id={coin['id']}", json=tags_payload).raise_for_status()

Step 4: Putting It All Together

Finally, we connect the dots in the DAG.

1
2
3
4
5
fetch_and_post_task = PythonOperator(
    task_id='fetch_and_post_crypto_data',
    python_callable=process_and_post_data,
    dag=dag,
)

Alright, Letā€™s Wrap This Up

And there it isā€”a complete, hour-by-hour cryptocurrency data pipeline. This setup brings together Airflow, FastAPI, and CoinMarketCapā€™s API into something thatā€™s not just functional but also kind of neat. Next up? Playing with the data weā€™ve collected to find insights or trendsā€”or just to see how RocketPoodleā€™s doing. šŸ•