Skip to main content

Command Palette

Search for a command to run...

How I Built a Real-Time DDoS Detection Engine From Scratch

Updated
10 min read
How I Built a Real-Time DDoS Detection Engine From Scratch

Imagine you run a shop. Every day, hundreds of customers walk in, browse around, and buy things. That's normal. Now imagine one morning, ten thousand people rush through your door at the same time, flooding every angle, blocking real customers from getting in, and eventually crashing the building's infrastructure. That's a DDoS attack, and that's exactly what this project helps to prevent against.

In this article, I'll walk you through how I built a real-time anomaly detection engine that watches HTTP traffic, learns what normal looks like, and automatically locks the door when something goes wrong. I'll explain every concept step-by-step as we build this together.

What Is a DDoS Attack and Why Does It Matter?

DDoS stands for Distributed Denial of Service. The goal of the attacker is to overwhelm your service and make your website unavailable to real users with fake traffic.

Here's the thing that makes it tricky: a DDoS request looks exactly like a real request. An attacker sending ten thousand requests to your homepage looks identical to ten thousand real users doing the same thing until you look at the pattern.

That pattern is what this tool detects.

The Tool - What I Built

Before writing any code, I designed the architecture of how the system behaves

Three components. Nginx sees everything and writes it down. The detector reads those notes and thinks. When it spots trouble, it acts.

Part 1: The Log File

Nginx (the web server acting as a gatekeeper in front of our app) writes one line to a log file for every single HTTP request it receives. I configured it to write in JSON format so our Python code can parse it easily.

A single log line looks like this:

{
  "source_ip": "1.2.3.4",
  "timestamp": "2025-04-25T14:32:01+00:00",
  "method": "GET",
  "path": "/index.php/apps/files",
  "status": 200,
  "response_size": 4096
}

Every field matters:

  • source_ip - the client IP, that is, who sent the request

  • timestamp - when it arrived

  • status - did it succeed (200) or fail (404, 500)?

  • path - What page were they hitting?

The detector reads this file continuously, like tail -f in a terminal and processes each line the moment Nginx writes it.

# monitor.py — simplified version of how we tail the log
def tail(self, callback):
    # Open the file and jump to the END
    # (we don't care about old history, only new requests)
    with open(self.log_path, "r") as f:
        f.seek(0, 2)   # seek to end of file

        while True:
            line = f.readline()
            if line:
                entry = parse_json(line)   # turn JSON string into Python dict
                callback(entry)            # hand off to detection logic
            else:
                time.sleep(0.1)            # nothing new - wait briefly

Think of this like a security guard reading a visitor log in real time. Every time a new entry appears, they look at it.

Part 2: The Sliding Window

Reading log lines isn't enough. We need to answer the question: "How many requests is this IP sending per second right now?"

The basic approach would be to count all requests from a given IP and divide by total tupime. But that's not efficient, if an attacker started attacking us 10 seconds ago, their average over 24 hours would look completely normal.

We need a sliding window that only looks at the last 60 seconds, and constantly move that window forward with time.

How It Works

The Sliding Window is like a clipboard that constantly pushes old data out the left side, pushes new data in the right side, and lets you count exactly how many events happened in the last 60 seconds at any given moment.

In Python, this is implemented with a deque (double-ended queue):

from collections import deque
import time

# One deque per IP address
ip_windows = {}

def record_request(ip, timestamp):
    if ip not in ip_windows:
        ip_windows[ip] = deque()

    # Add this request's timestamp to the right end of the deque
    ip_windows[ip].append(timestamp)

    # Evict timestamps older than 60 seconds from the LEFT end
    cutoff = time.time() - 60
    while ip_windows[ip] and ip_windows[ip][0] < cutoff:
        ip_windows[ip].popleft()   # remove oldest

    # Rate = how many timestamps remain / 60 seconds
    rate = len(ip_windows[ip]) / 60
    return rate

Part 3: The Baseline - Learning What Normal Looks Like

Now that we know the current rate. But how do we determine if a request from an IP per time is suspicious?

During a product launch with thousands of users for example, 42 req/s from an IP might be normal. But at 4am with barely any traffic, 42 req/s is almost certainly an attack.

This is why we can't hardcode a threshold. We need the system to learn what normal looks like and judge against that. This is what the Baseline does. There are different thresholds we need to set for the system to learn from:

The Rolling Window

Every second, we record how many requests arrived that second. We keep a rolling 30-minute history of these per-second counts:

Second 1:   42 requests
Second 2:   38 requests
Second 3:   55 requests
...
Second 1800: 41 requests i.e 30 minutes of data (1800 seconds)

From this history, we calculate:

  • Mean - the average requests per second over the last 30 minutes

  • Standard deviation - how much it normally varies

import math

def compute_stats(counts):
    mean = sum(counts) / len(counts)
    variance = sum((x - mean)**2 for x in counts) / len(counts)
    stddev = math.sqrt(variance)
    return mean, stddev

Per-Hour Slots

Traffic during peak hours (2 pm) is different from traffic at 2am. These differences need to be separated and accounted for in order to get an accurate average and catch an attack during off-peak hours.

For this reason, we maintain 24 separate baselines: one per hour of the day. When judging whether current traffic is anomalous, we prefer this hour's historical data.

Floor Values

At 4am with near-zero traffic, our stddev might be 0.01. Then one automated script hits us with 5 requests, and the z-score becomes 500 which is a false alarm. To prevent this, we set minimum floor values:

mean   = max(computed_mean,   1.0)   # never below 1 req/s
stddev = max(computed_stddev, 1.0)   # never below 1

This prevents a false alarm during off-peak hours.

Part 4: The Detection Logic

Now we have:

  • current_rate - what's happening right now

  • mean - what normally happens

  • stddev - how much it normally varies

We use these to compute a Z-score:

A z-score is a measure that indicate how many standard deviation a data point is above or below the average (mean).

z_score = (current_rate - mean) / stddev

The z-score answers: "How many standard deviations above normal is this?"

  • z = 1.0 > slightly above normal, happens all the time

  • z = 2.0 > somewhat unusual

  • z = 3.0 > happens less than 0.3% of the time under normal conditions and that should be suspicious

  • z = 50.0 → this is an attack

def is_anomalous(current_rate, mean, stddev):
    z_score = (current_rate - mean) / stddev

    # Condition 1: statistically anomalous
    if z_score > 3.0:
        return True, f"z-score={z_score:.2f} exceeds threshold of 3.0"

    # Condition 2: raw multiplier check
    # Catches fast attacks before stddev has time to adapt
    if current_rate > mean * 5.0:
        return True, f"rate={current_rate} is more than 5× the mean={mean}"

    return False, None

We check two conditions because they catch different attack patterns:

  • Z-score catches attacks relative to current traffic levels

  • 5× multiplier catches sudden overwhelming bursts before the baseline has adapted

Part 5: Blocking With iptables

When an IP is flagged, we want to stop it as fast as possible. The most effective way is to block it at the Linux kernel level using iptables before the packet even reaches Nginx.

Think of iptables as a bouncer who checks IDs before anyone enters the building. Our detector tells the bouncer: "Don't let 1.2.3.4 in."

import subprocess

def ban_ip(ip):
    # Add a DROP rule to the INPUT chain for this IP
    # -I INPUT = insert at top of the INPUT chain (checked first)
    # -s ip    = match packets FROM this source
    # -j DROP  = silently discard - attacker gets no response
    subprocess.run([
        "iptables", "-I", "INPUT",
        "-s", ip,
        "-j", "DROP"
    ])

def unban_ip(ip):
    # Remove the rule when the ban expires
    # -D = delete matching rule
    subprocess.run([
        "iptables", "-D", "INPUT",
        "-s", ip,
        "-j", "DROP"
    ])

DROP vs REJECT:

  • REJECT sends back an error response - the attacker knows they're blocked

  • DROP silently discards the packet - the attacker's tool just times out, revealing nothing

The Backoff Schedule

We don't want to permanently ban IPs that might have been misconfigured rather than malicious. We use an escalating ban schedule:

Offense Duration
1st 10 minutes
2nd 30 minutes
3rd 2 hours
4th+ Permanent

A background thread checks every 30 seconds for bans that have expired and removes them automatically.

Part 6: Alerts - Notification

It is a imperative to send a notification to the team when an attack has occured and when a ban fires. We send a Slack notification within 10 seconds. The message includes everything the on-call engineer needs:

  • Which IP was banned

  • What triggered the ban (z-score exceeded / 5× multiplier)

  • Current rate vs baseline

  • How long will the ban last

We use Slack's Incoming Webhooks, a simple HTTP POST with a JSON payload:

import urllib.request
import json

def send_slack_alert(ip, condition, rate, baseline, duration):
    payload = {
        "attachments": [{
            "title": "🚨 IP BANNED",
            "color": "#FF0000",
            "fields": [
                {"title": "Banned IP",     "value": ip,        "short": True},
                {"title": "Condition",     "value": condition, "short": False},
                {"title": "Current Rate",  "value": f"{rate} req/s", "short": True},
                {"title": "Baseline",      "value": f"{baseline} req/s", "short": True},
                {"title": "Ban Duration",  "value": duration,  "short": True},
            ]
        }]
    }
    body = json.dumps(payload).encode("utf-8")
    req  = urllib.request.Request(WEBHOOK_URL, data=body,
                                   headers={"Content-Type": "application/json"})
    urllib.request.urlopen(req, timeout=5)

Part 7: The Dashboard

A web dashboard serves live metrics every 3 seconds using Flask. It shows:

  • Global requests/second (live)

  • Currently banned IPs and when they expire

  • Top 10 busiest source IPs

  • CPU and memory usage

  • Baseline mean and stddev

  • A graph of baseline mean over time

The frontend is pure JavaScript using fetch() to poll a /api/metrics endpoint.

You can get the full source code from here:

https://github.com/cyberar/hng14-stage3-devops

What I Learned

Security tooling must be simple. The more complex the detection logic, the harder it is to trust. Every piece of this system is explainable in plain English which is exactly what you want when you're trying to convince yourself (and your team) that the detector isn't going to start banning your legitimate users.

Statistical methods beat hardcoded rules. A hardcoded threshold of "block anyone over 100 req/s" would have missed the SlowLoris attack that sends 2 req/s but holds connections open forever. Z-scores catch anomalies relative to the current reality, not some rule written months ago.

The OS is your most powerful tool. iptables is not glamorous. But it operates at the kernel level, costs virtually no CPU, and blocks packets before they consume any application resources.

Source Code

The full project is open source: 👉 https://github.com/cyberar/hng14-stage3-devops

The live dashboard is running at: 👉 http://kemicodes.online:8080