Observability is one of those words that sounds impressive in meetings but gets downright intimidating when you sit down to implement it. Sure, logging failures and sending notifications sounds straightforward—until you’re knee-deep in designing custom tables, chasing down import errors, and trying to remember why you thought this was a good idea in the first place. But here’s the thing: observability isn’t just about logging and dashboards; it’s about creating a system where the data works for you, not the other way around. Alerts and notifications are the heart of this approach, delivering critical insights directly to you so you can act in real-time. When done right, observability transforms complexity into clarity and makes being data-driven not just possible, but practical.

But I digress. Let me walk you through how I built a custom Events and Notifications solution for my data stack, combining clean design, modular domains, and a pinch of database wizardry.


The Goal: Custom Observability

I wanted something specific: a way to track events (system occurrences like process failures or warnings) and tie those events to notifications (emails, Slack messages, etc.) sent to relevant stakeholders. Think of it as a DIY monitoring system, but without the generic dashboards and overused “mission-critical” buzzwords.

This meant creating two related tables:

  • Events: A catch-all for logging system activity with details like type, severity, and timestamps.
  • Notifications: Messages tied to events, complete with delivery methods, recipients, and status.

At first glance, logging notifications might seem unnecessary—but it’s a key step toward my bigger goal: enabling an AI to analyze events and intelligently decide whether to notify me about something. Part of that decision-making process relies on knowing if I’ve already been notified. By keeping a record of notifications alongside events, I can pave the way for a smarter system that doesn’t just log data but actively helps me stay focused on what matters most.


Step 1: Designing the Tables

The database design was simple but effective:

  • Events got fields for timestamps, type, severity, and optional details. Think of it as a structured log.
  • Notifications had fields for delivery method, recipient, and a link to the event that triggered it.

With that in mind, I whipped up the SQLAlchemy models:

Event Model

1
2
3
4
5
6
7
8
9
class Event(Base):
    __tablename__ = "events"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
    timestamp = Column(DateTime, nullable=False)
    type = Column(String, nullable=False)
    severity = Column(String, nullable=False)
    details = Column(JSON, nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)

The details field uses a JSON column, which trades normalization for flexibility. Instead of creating additional tables to handle various types of event metadata, the JSON column allows storing arbitrary key-value pairs. This is particularly useful when the structure of event details might vary across different event types. However, this approach comes with trade-offs: querying and indexing JSON fields can be slower and more complex than working with fully normalized data.

Notification Model

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class Notification(Base):
    __tablename__ = "notifications"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid4)
    event_id = Column(UUID(as_uuid=True), ForeignKey("events.id"), nullable=False)
    method = Column(String, nullable=False)
    recipient = Column(String, nullable=False)
    message = Column(Text, nullable=False)
    status = Column(String, nullable=False)
    sent_at = Column(DateTime, nullable=True)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)

Why Use UUIDs?

UUIDs (Universally Unique Identifiers) serve as primary keys for both tables. Unlike auto-incrementing integers, UUIDs provide globally unique identifiers, which is particularly useful in distributed systems or when data might be merged across environments. They also obscure the sequence of record creation, adding a layer of security by not exposing how many records exist.

I reserve the right to regret this later.


Step 2: Creating the Domains

Here’s where things started to get interesting. Initially, I crammed everything into one domain. After all, events and notifications are tightly coupled, right? Well, that lasted about as long as it took to realize I was creating a tangled mess of cross-dependencies.

The solution? Split the domains:

  1. Events Domain:
    • Focused on logging system activity.
    • Endpoints for filtering events by type, severity, or date range.
  2. Notifications Domain:
    • Focused on delivering messages.
    • Endpoints for creating notifications, fetching the latest ones, or tying them to specific events.

This separation made the codebase cleaner and the individual domains easier to test.


Step 3: Building the Repositories

Next, I built repository layers for both domains to handle database interactions. These included functions for creating records, fetching filtered results, and rolling back transactions on errors.

Here’s an example from the Notifications Repository:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
async def create_notification(db: AsyncSession, notification: Notification):
    try:
        db.add(notification)
        await db.commit()
        await db.refresh(notification)
        logger.info(f"Notification created successfully: {notification.id}")
        return notification
    except IntegrityError as e:
        logger.error(f"Integrity error while creating notification: {str(e)}")
        await db.rollback()
        raise RepositoryError("Failed to create notification due to database constraints.")

This layer insulated my API endpoints from direct database interaction, making the system more robust (and slightly less terrifying to debug).


Step 4: Writing the Routers

With the repositories in place, I wrote routers for each domain. For Events, I added endpoints to:

  • Create new events.
  • Retrieve the latest events.
  • Filter events by type, severity, or date range.

Here’s an example of the /events/by-daterange endpoint:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@router.get("/by-daterange", response_model=List[schemas.EventResponse])
async def get_events_by_date_range(
    start_date: datetime, end_date: datetime, db: AsyncSession = Depends(get_db)
):
    try:
        events = await repository.get_events_by_datetime_range(db, start_date, end_date)
        return events
    except Exception as e:
        logger.error(f"Error fetching events by date range: {str(e)}")
        raise HTTPException(status_code=500, detail="Failed to fetch events.")

For Notifications, the endpoints included:

  • Create a notification tied to an event.
  • Retrieve the latest notifications.
  • Fetch all notifications tied to a specific event.

Step 5: Testing the System

This is where things got satisfying. After a few rounds of debugging (and some choice words for import errors), I fired up the API and tested everything in Swagger and ReDoc.

I created a fake event for a failed process:

1
2
3
4
5
6
{
  "timestamp": "2024-11-26T15:00:00Z",
  "type": "process_failed",
  "severity": "ERROR",
  "details": {"step": "data_load", "error": "File not found"}
}

Then, I tied a notification to it:

1
2
3
4
5
6
7
8
{
  "event_id": "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11",
  "method": "email",
  "recipient": "admin@example.com",
  "message": "Process failed at step data_load: File not found",
  "status": "sent",
  "sent_at": "2024-11-26T16:01:00Z"
}

Both endpoints returned the expected results. The observability solution was officially functional—and dare I say, elegant.


Final Thoughts

This project was a deep dive into balancing flexibility, modularity, and sanity. Building a custom observability system meant designing tables, creating repositories, and splitting domains to keep things manageable. Along the way, I learned (and re-learned) the importance of:

  • Checking imports before hitting run.
  • Splitting domains when the responsibilities diverge.
  • Writing modular code that doesn’t tie itself in knots.

Now, my data stack has its own observability layer, sweet. I’m sure this will work just fine, maybe. 😏