Bytes, Pipelines & Queries #3 – Automating Data Pipelines with Kestra
February 9, 2025 • 4 min read
Welcome to the “Bytes, Pipelines & Queries” series. In this post, I share my experience automating data pipelines with Kestra after Module 2 of the Data Engineering Zoomcamp
I always knew data workflows needed some level of automation, but I had never really thought about how engineers actually orchestrate these pipelines. That changed when I discovered Kestra, an open-source workflow orchestration tool that simplifies scheduling, dependency management, and automation processes. I used it for the automation of an ETL (Extract, Transform, Load) pipeline.
If you've ever written a data pipeline and manually triggered scripts in the right order—or worse, written a bunch of crontabs—you’ll appreciate why workflow orchestration matters. Let's dive in.
What is Workflow Orchestration?
Imagine you have a series of steps in your data pipeline: extract data, transform it, and load it somewhere useful. If you’ve done this before, as an early career engineer like myself, you probably:
- Manually ran a script for each step 😵💫 and hoped nothing broke ⏳
- Wrote a custom script to handle dependencies 🛠️
Workflow orchestration tools like Kestra eliminate all of that by letting you define workflows declaratively. You tell Kestra what needs to run in YAML and in what order—it takes care of the rest. This 'simple' concept blew me away.
Setting Up Kestra
Getting started with Kestra is pretty straightforward, especially if you're using Docker. You can read about installing Docker here if you don't already have it. Here’s how to spin it up in a few steps:
1. Download the Docker Compose File
curl -o docker-compose.yml https://raw.githubusercontent.com/kestra-io/kestra/develop/docker-compose.yml
2. Start Kestra with Docker Compose
docker-compose up -d
This command downloads the necessary Docker images and starts Kestra along with its PostgreSQL database in detached mode.
3. Access the Kestra UI
Once the services are running, access the Kestra UI will be available and accessible by navigating to http://localhost:8080 in your browser.
Building an ETL Pipeline with Kestra
Now, let's actually build something. We’ll create a workflow to:
- Download a dataset (CSV file)
- Transform the data (filter and clean it)
- Load it into a database (PostgreSQL)
1. Extract: Download the Data
Here’s an example of how we tell Kestra to download a file from the internet:
id: extract_data
type: io.kestra.plugin.core.http.Get
url: "https://example.com/data.csv"
This saves the CSV locally for the next step.
2. Transform: Process the Data
Kestra allows you to run Python scripts directly within a workflow or execute a script stored on your local machine.
Option 1: Running an Inline Script
If you prefer defining the transformation logic directly within the workflow, you can use the following YAML configuration:
id: transform_data
type: io.kestra.plugin.scripts.python.Runner
script: |
import pandas as pd
df = pd.read_csv('{{ outputs.extract_data.uri }}')
df_clean = df[df['value'] > 0]
df_clean.to_csv('clean_data.csv', index=False)
This is useful for very small data transformation scripts.
Option 2: Running an External Script
If you have a script saved on your local machine, you can reference it instead of writing it inline:
id: transform_data
type: io.kestra.plugin.scripts.python.Runner
command: python /path/to/your_script.py
💡 Ensure that /path/to/your_script.py
is accessible to the Kestra worker.
This method is useful when dealing with complex transformations or when collaborating on script development outside of Kestra.
3. Load: Store the Data in PostgreSQL
Finally, we load the cleaned data into a PostgreSQL database:
id: load_data
type: io.kestra.plugin.tasks.jdbc.Query
connection:
type: io.kestra.plugin.jdbc.postgresql.PostgreSql
url: jdbc:postgresql://localhost:5432/mydb
username: user
password: pass
sql: |
COPY my_table FROM 'clean_data.csv' WITH CSV HEADER;
Scheduling, Automating, and Backfilling Workflows
One of Kestra’s biggest strengths is workflow scheduling. Instead of running commands manually, you can schedule workflows with cron expressions:
triggers:
- id: daily_run
type: cron
cron: "0 8 * * *"
This will run our pipeline every day at 8 AM.
But what if we want to process data from the past? Kestra allows for backfilling, meaning we can pretend to "go back in time" and re-run our workflows for specific historical periods. This is useful for cases like filling in missing data or reprocessing past datasets.
backfill:
start: 2023-01-01
end: 2023-12-31
interval: P1D
This configuration ensures that Kestra executes the workflow for each day from January 1st to December 31st, 2023, allowing us to reconstruct or update past records effortlessly.
Common Pitfalls & Fixes
🚧 Workflow Fails Without a Clear Error
👉 Check the logs: docker logs kestra
👉 Validate YAML syntax: kestra validate workflow.yaml
🚧 Kestra UI Won’t Load
👉 Ensure all containers are running: docker ps
👉 Restart Kestra: docker compose restart
👉 Ensure that another application is not running on Kestra's port.
🚧 Database Connection Issues
👉 Double-check credentials in your PostgreSQL setup
👉 Test the connection manually using: psql -h localhost -U user -d mydb
Full Workflow YAML File
Here's what the full Kestra workflow YAML file would look like for our hypothetical ETL pipeline:
id: etl_pipeline
namespace: my_project
triggers:
- id: daily_run
type: cron
cron: "0 8 * * *"
backfill:
start: 2023-01-01
end: 2023-12-31
interval: P1D
tasks:
- id: extract_data
type: io.kestra.plugin.core.http.Get
url: "https://example.com/data.csv"
- id: transform_data
type: io.kestra.plugin.scripts.python.Runner
script: |
import pandas as pd
df = pd.read_csv('{{ outputs.extract_data.uri }}')
df_clean = df[df['value'] > 0]
df_clean.to_csv('clean_data.csv', index=False)
- id: load_data
type: io.kestra.plugin.tasks.jdbc.Query
connection:
type: io.kestra.plugin.jdbc.postgresql.PostgreSql
url: jdbc:postgresql://localhost:5432/mydb
username: user
password: pass
sql: |
COPY my_table FROM 'clean_data.csv' WITH CSV HEADER;
Wrapping Up
This ETL pipeline is just a basic example of what Kestra can do. Kestra is capable of handling much more complex workflows, including event-driven pipelines, parallel task execution, dynamic scaling, and real-time processing. Whether you're automating simple data movements or orchestrating multi-step machine learning pipelines, Kestra provides the flexibility and power needed to streamline workflows efficiently.
This was my first time exploring workflow orchestration, and Kestra was a great tool for me to delve into the topic. Instead of hacking together shell or python scripts, I could define my workflows declaratively, track dependencies, and automate everything efficiently. Give Kestra a shot by checking it out here.