Bytes, Pipelines & Queries #3 – Automating Data Pipelines with Kestra

February 9, 2025 • 4 min read

Welcome to the “Bytes, Pipelines & Queries” series. In this post, I share my experience automating data pipelines with Kestra after Module 2 of the Data Engineering Zoomcamp

I always knew data workflows needed some level of automation, but I had never really thought about how engineers actually orchestrate these pipelines. That changed when I discovered Kestra, an open-source workflow orchestration tool that simplifies scheduling, dependency management, and automation processes. I used it for the automation of an ETL (Extract, Transform, Load) pipeline.

If you've ever written a data pipeline and manually triggered scripts in the right order—or worse, written a bunch of crontabs—you’ll appreciate why workflow orchestration matters. Let's dive in.

What is Workflow Orchestration?

Imagine you have a series of steps in your data pipeline: extract data, transform it, and load it somewhere useful. If you’ve done this before, as an early career engineer like myself, you probably:

  • Manually ran a script for each step 😵‍💫 and hoped nothing broke ⏳
  • Wrote a custom script to handle dependencies 🛠️

Workflow orchestration tools like Kestra eliminate all of that by letting you define workflows declaratively. You tell Kestra what needs to run in YAML and in what order—it takes care of the rest. This 'simple' concept blew me away.

Setting Up Kestra

Getting started with Kestra is pretty straightforward, especially if you're using Docker. You can read about installing Docker here if you don't already have it. Here’s how to spin it up in a few steps:

1. Download the Docker Compose File

curl -o docker-compose.yml https://raw.githubusercontent.com/kestra-io/kestra/develop/docker-compose.yml

2. Start Kestra with Docker Compose

docker-compose up -d

This command downloads the necessary Docker images and starts Kestra along with its PostgreSQL database in detached mode.

3. Access the Kestra UI

Once the services are running, access the Kestra UI will be available and accessible by navigating to http://localhost:8080 in your browser.

Building an ETL Pipeline with Kestra

Now, let's actually build something. We’ll create a workflow to:

  1. Download a dataset (CSV file)
  2. Transform the data (filter and clean it)
  3. Load it into a database (PostgreSQL)

1. Extract: Download the Data

Here’s an example of how we tell Kestra to download a file from the internet:

id: extract_data
type: io.kestra.plugin.core.http.Get
url: "https://example.com/data.csv"

This saves the CSV locally for the next step.

2. Transform: Process the Data

Kestra allows you to run Python scripts directly within a workflow or execute a script stored on your local machine.

Option 1: Running an Inline Script

If you prefer defining the transformation logic directly within the workflow, you can use the following YAML configuration:

id: transform_data
 type: io.kestra.plugin.scripts.python.Runner
 script: |
   import pandas as pd
   df = pd.read_csv('{{ outputs.extract_data.uri }}')
   df_clean = df[df['value'] > 0]
   df_clean.to_csv('clean_data.csv', index=False)

This is useful for very small data transformation scripts.

Option 2: Running an External Script

If you have a script saved on your local machine, you can reference it instead of writing it inline:

id: transform_data
 type: io.kestra.plugin.scripts.python.Runner
 command: python /path/to/your_script.py

💡 Ensure that /path/to/your_script.py is accessible to the Kestra worker.

This method is useful when dealing with complex transformations or when collaborating on script development outside of Kestra.

3. Load: Store the Data in PostgreSQL

Finally, we load the cleaned data into a PostgreSQL database:

id: load_data
type: io.kestra.plugin.tasks.jdbc.Query
connection:
  type: io.kestra.plugin.jdbc.postgresql.PostgreSql
  url: jdbc:postgresql://localhost:5432/mydb
  username: user
  password: pass
sql: |
  COPY my_table FROM 'clean_data.csv' WITH CSV HEADER;

Scheduling, Automating, and Backfilling Workflows

One of Kestra’s biggest strengths is workflow scheduling. Instead of running commands manually, you can schedule workflows with cron expressions:

triggers:
  - id: daily_run
    type: cron
    cron: "0 8 * * *"

This will run our pipeline every day at 8 AM.

But what if we want to process data from the past? Kestra allows for backfilling, meaning we can pretend to "go back in time" and re-run our workflows for specific historical periods. This is useful for cases like filling in missing data or reprocessing past datasets.

backfill:
  start: 2023-01-01
  end: 2023-12-31
  interval: P1D

This configuration ensures that Kestra executes the workflow for each day from January 1st to December 31st, 2023, allowing us to reconstruct or update past records effortlessly.

Common Pitfalls & Fixes

🚧 Workflow Fails Without a Clear Error
👉 Check the logs: docker logs kestra
👉 Validate YAML syntax: kestra validate workflow.yaml

🚧 Kestra UI Won’t Load
👉 Ensure all containers are running: docker ps
👉 Restart Kestra: docker compose restart
👉 Ensure that another application is not running on Kestra's port.

🚧 Database Connection Issues
👉 Double-check credentials in your PostgreSQL setup
👉 Test the connection manually using: psql -h localhost -U user -d mydb

Full Workflow YAML File

Here's what the full Kestra workflow YAML file would look like for our hypothetical ETL pipeline:

id: etl_pipeline
namespace: my_project

triggers:
  - id: daily_run
    type: cron
    cron: "0 8 * * *"
    backfill:
      start: 2023-01-01
      end: 2023-12-31
      interval: P1D

tasks:
  - id: extract_data
    type: io.kestra.plugin.core.http.Get
    url: "https://example.com/data.csv"

  - id: transform_data
    type: io.kestra.plugin.scripts.python.Runner
    script: |
      import pandas as pd
      df = pd.read_csv('{{ outputs.extract_data.uri }}')
      df_clean = df[df['value'] > 0]
      df_clean.to_csv('clean_data.csv', index=False)

  - id: load_data
    type: io.kestra.plugin.tasks.jdbc.Query
    connection:
      type: io.kestra.plugin.jdbc.postgresql.PostgreSql
      url: jdbc:postgresql://localhost:5432/mydb
      username: user
      password: pass
    sql: |
      COPY my_table FROM 'clean_data.csv' WITH CSV HEADER;

Wrapping Up

This ETL pipeline is just a basic example of what Kestra can do. Kestra is capable of handling much more complex workflows, including event-driven pipelines, parallel task execution, dynamic scaling, and real-time processing. Whether you're automating simple data movements or orchestrating multi-step machine learning pipelines, Kestra provides the flexibility and power needed to streamline workflows efficiently.

This was my first time exploring workflow orchestration, and Kestra was a great tool for me to delve into the topic. Instead of hacking together shell or python scripts, I could define my workflows declaratively, track dependencies, and automate everything efficiently. Give Kestra a shot by checking it out here.