Building a data pipeline with testing in mind

Building a data pipeline with testing in mind

Monitor data pipelines' health with time-series metrics in Prometheus and similar tools.

Tools in a tool box
Image credits : 

Photo by Peter (CC BY-SA 2.0), modified by Rikki Endsley

x

Get the newsletter

Join the 85,000 open source advocates who receive our giveaway alerts and article roundups.

If you've built batch data pipelines, but didn't know the best way to check their health or test changes for potential regressions, this article is for you.

Monitoring and testing batch data pipelines require a different approach from monitoring and testing web services. It's one thing to build a robust data-pipeline process in Python but an entirely different challenge to find tooling and build out the framework that provides confidence that a data system is healthy. To truly iterate and develop a codebase, developers must be able to test confidently during the development process and to monitor the production system. This article offers some solutions to monitoring and configuring alerts on data pipelines.

Monitoring data pipelines vs. web services

So, how does monitoring data pipelines differ from monitoring web services? Most of the core tenets of monitoring any system are directly transferable between data pipelines and web services. The how to monitor is where it begins to differ, since data pipelines, by nature, have different indications of health. The following table outlines common health indicators and compares the monitoring of those indicators for web services compared to batch data services.

  Web Service Data Pipeline
Health Check Have some kind of health check endpoint and check that when you ping/healthcheck you get a 200 status code. Check that a job has succeeded.
Integration Test POST to one endpoint and expect to get the correct data from a corresponding GET endpoint. Verify some fake data made its way through the data transformation. (This can be hard to replicate if there’s no easy way to feed fake data into the data pipeline.)
Latency Measure the average response time of an API. Measure time it takes for a data pipeline to complete.

Monitoring tools

Open source tools like StatsD and Prometheus are commonly used to gather metrics and configure alerts. In addition to continuously gathering metrics, integration tests can be run on tools like Nagios, which primarily checks for successful exit code, and I've worked on other teams that have used Jenkins to periodically run integration tests to check that a build still passes. All these tools can be extrapolated to support both data services and web services.

Time-series data

The data models for these monitoring tools are all different. StatsD has aggregated metrics, Nagios has exit status codes, Prometheus leverages time-series data. In the world of monitoring, time-series data is particularly powerful because it monitors how the system as a whole behaves over time, including not only a system's current state but also changes to the system. The challenge of working with time-series data is that, because you are now essentially storing data points for each moment in time, the dataset can grow very quickly. It can be difficult to query and make sense of the time series data without a strong data model. This is where Prometheus comes in as a powerful monitoring tool. One of Prometheus' core features is: "A multi-dimensional data model with time-series data identified by metric name and key/value pairs."

Prometheus

My team at Button heavily leverages Prometheus in our monitoring stack, so the examples in this blog will be specific to Prometheus, but the general concepts can be extrapolated to any monitoring tool with time-series metrics. In its own words:

"Prometheus works well for recording any purely numeric time series. It fits both machine-centric monitoring as well as monitoring of highly dynamic service-oriented architectures. In a world of microservices, its support for multi-dimensional data collection and querying is a particular strength."

The Prometheus server scrapes data over HTTP, which means your service needs to have an HTTP endpoint exposed to the Prometheus server to enable data collection. Pushing time-series data is also supported via an intermediary push gateway.

Time-series metrics

From the perspective of time-series metrics, the difference between measuring web services and data pipelines is the difference between a real-time system and a batch system. A real-time system consistently produces healthy signals when it is "live" and serving or processing traffic, whereas a batch system might be more sporadic, and "downtime" might not mean the same thing in both. If a batch system reports it isn't "reachable," it doesn't necessarily mean it's not functioning; it could just mean it doesn't have any data to process.

Web services

You can assess a web service's health by checking for continuous, successful status codes and monitor latency by looking for relatively low response times.

To do this in Prometheus, you create and scrape for metrics in your code. For instance, you may want to know about response time and total request metrics:

import prometheus_client

response_time = prometheus_client.Histogram(
    'http_response_time_seconds',
    'Histogram of requests processing time.',
    ['method', 'path'])
requests_total = prometheus_client.Counter(
    'http_requests_total',
    'Total count of requests',
    ['method', 'path', 'code'])

The counter sends the total requests made, as a counter is a cumulative metric in Prometheus that increases as more requests are made. In this example, the data in Prometheus will show all historical counts of requests made to the url path configured in the label and the corresponding response status code in the code label.

A histogram puts the request durations into buckets and enables alerting based on a response time at the 90th percentile for a specific period of time.

In your API's middleware, you want to record these metrics as requests come in.

    def record_status_code(self, request, status_code):
        if hasattr(request, '_start_time'):
            requests_total.labels(
                request.method,
                resolve(request.path_info).url_name,
                str(status_code)
            ).inc()  

    def record_time(self, request):
        if hasattr(request, '_start_time'):
            seconds = (time.time() - request._start_time)
            response_time.labels(
                request.method,
                resolve(request.path_info).url_name,
            ).observe(seconds)

To assess (and receive alerts on) the system's health, you'll want to know if the rate of change of the request counter with a label for a successful status is 0 over a specific period, which indicates there haven't been any requests with status code 200 during that period.

rate(http_requests_total{code="200"}[1m]) == 0

Alternatively, you can also alert when the rate of change of non-200 code is not 0.

rate(http_requests_total{code!="200"}[1m]) != 0

You can also trigger an alert based on latency of API requests. The following Prometheus query calculates the 90th percentile of request durations over the prior minute.

histogram_quantile(0.90, rate(http_response_time_seconds[1m]))

If the result is over a certain threshold, it could warrant an alert.

Data pipelines

The metrics used for a data pipeline are a bit different. Instead of measuring response time and response status code, we want to measure when the data pipeline ran and how long it took or how much data did it process. In order to do this, we will use a gauge to measure the last time a batch job was successful. We can also measure the time it took for a data pipeline to succeeded using a summary — this is the equivalent of latency for a batch data pipeline.

Metrics to send:

job_last_success_unixtime = prometheus_client.Gauge('job_last_success_unixtime',
    'Time of last successful batch job')
job_duration_seconds = prometheus_client.Summary('job_duration_seconds',
    'Duration of batch job run in seconds')

Metrics are calculated at the end of the pipeline as follows:

with job_duration_seconds.time():
  run_pipeline()
  time_now = int(time.time())
  job_last_success_unixtime.set(time_now)

The clear way to alert on the health of a batch job is to check that the last time the job was successful it was within an expected time interval. If, for example, you expect your job to run for five minutes every hour, so if the last time it was successful was well over three hours ago, it could be a sign that the system is unhealthy. The alert in Prometheus would look like this:

Alert IF time() - job_last_success_unixtime > (3 * 60 * 60)
for 5min

Note that this approach works only for long-running batch jobs that expose a port for scraping. For short-lived jobs, such as periodic cron jobs, Prometheus has a push gateway for pushing metrics to the Prometheus server since they cannot be scraped.

Regression testing and stats

In additional to gauging the health of a system, it's good to have data pipelines output some core statistics on their results. Let's say your base data pipeline is just running a csv dump and generating the total count, average, and standard deviation of a numerical value. You should also be able to output things like data validation checks, e.g., number of records rejected due to invalid data, data points outside two standard deviations, total number of records read, total number of records processed, etc.

These fundamental stats may also be used in regression testing to validate that a code change didn't drastically change the system by running the different code base against the same baseline data.

Set thresholds: Establish a baseline

Just like in web services, where what is considered "healthy traffic patterns" might vary depending on the application, the concept of a healthy data pipeline might vary depending on the purposes of the pipeline. It can take a bit of experience with the nuances of your particular pipeline to recognize whether any given signal is alarming or normal.


To learn more, attend Jiaqi Liu's talk, Building a data pipeline with testing in mind, at PyCon Cleveland 2018.

About the author

Jiaqi Liu - Jiaqi is a Software Engineer at Button, where she works on building out the data infrastructure and backend services. Previously, she was a Data Scientist at Capital One. Outside of work, Jiaqi is a Director for the NYC Network of Women Who Code.