Orchestrate Data Pipelines
Data teams use orchestration platforms like Kestra to manage complex pipelines — ingest raw data, transform it, and deliver it to data warehouses, lakes, and user-facing applications. The orchestration engine ensures workflows run in the correct sequence, recover from failures, and scale dynamically.
What is Data Orchestration?
Data orchestration automates the execution of interconnected tasks (ETL, Analytics, AI and ML jobs) while governing their dependencies and business logic. It focuses on how data moves between systems, teams and processes.
Kestra’s data orchestration capabilities include:
- Flexible workflow triggers — run data flows on schedule, external events (e.g., a new file in S3/SFTP), or API calls.
- Powerful orchestration engine — control retries, parallel task runs, timeouts, SLAs, concurrency and error handling.
- Dynamic resource allocation — provision containers on-demand (e.g., AWS Fargate, GCP Batch, Azure Batch, Kubernetes) for compute-heavy tasks.
- Visibility — track logs, traces, metrics, inputs, outputs, and lineage across workflows and tasks.
Why Use Kestra for Data Orchestration?
- Simple Declarative Syntax – Define each data pipeline in a self-contained, portable YAML configuration that includes tasks, triggers, dependencies and infrastructure requirements.
- Extensible Integrations – Connect to over 600 services via pre-built plugins. Thanks to plugins, you can avoid writing custom code for boilerplate tasks like file downloads, SQL queries, or REST API calls.
- Execution Control – Set retries, timeouts, SLAs, and concurrency limits.
- Zero Code Changes – Run existing Python/R/SQL scripts as-is (no rewrites needed); specify dependencies via YAML configuration.
- State Management – Pass data of any size between tasks (files, variables, query results) or between workflows (using KV Store) thanks to internal storage.
- Dynamic Scaling – Scale custom code with task runners. Spin up containers on cloud services (AWS ECS Fargate, Google Batch, Kubernetes) dynamically at runtime — no need for dedicated always-on workers (to scale on-premise deployments, you can use worker groups).
- Observability – Monitor flow execution states, durations, logs, inputs, outputs and resource usage in real time.
Example: Data Engineering Pipeline
The following flow triggers a data sync from Airbyte, Fivetran, and dbt Cloud. Then, it downloads a JSON dataset via REST API, filters specific columns using Python, and calculates KPIs with DuckDB. Kestra dynamically provisions a Python container for the task running custom code and terminates it once the task completes:
id: data_pipelinenamespace: tutorialdescription: Process product data to calculate brand price averages
inputs: - id: columns_to_keep type: ARRAY itemType: STRING defaults: - brand - price
tasks: - id: airbyte_sync type: io.kestra.plugin.airbyte.connections.Sync url: http://localhost:8080 connectionId: e3b1ce92-547c-436f-b1e8-23b6936c12cd wait: true
- id: fivetran_sync type: io.kestra.plugin.fivetran.connectors.Sync apiKey: "{{ secret('FIVETRAN_API_KEY') }}" apiSecret: "{{ secret('FIVETRAN_API_SECRET') }}" connectorId: myConnectorId wait: true
- id: dbt_job type: io.kestra.plugin.dbt.cloud.TriggerRun accountId: dbt_account token: "{{ secret('DBT_CLOUD_TOKEN') }}" jobId: abc12345 wait: true
- id: extract type: io.kestra.plugin.core.http.Download uri: https://dummyjson.com/products
# Filter columns in a disposable Python container - id: transform type: io.kestra.plugin.scripts.python.Script containerImage: python:3.11-slim taskRunner: # spins up container on-demand type: io.kestra.plugin.scripts.runner.docker.Docker inputFiles: data.json: "{{ outputs.extract.uri }}" # Input from previous task outputFiles: - "*.json" script: | import json filtered_data = [ {col: product.get(col, "N/A") for col in {{ inputs.columns_to_keep }}} for product in json.load(open("data.json"))["products"] ] json.dump(filtered_data, open("products.json", "w"), indent=4)
# Analyze filtered data with DuckDB - id: query type: io.kestra.plugin.jdbc.duckdb.Query fetchType: STORE inputFiles: products.json: "{{ outputs.transform.outputFiles['products.json'] }}" sql: | INSTALL json; LOAD json; SELECT brand, ROUND(AVG(price), 2) AS avg_price FROM read_json_auto('{{ workingDir }}/products.json') GROUP BY brand ORDER BY avg_price DESC;
Getting Started with Data Orchestration
- Install Kestra – Follow the quick start guide or the full installation instructions for production environments.
- Write Your Workflows – Configure your flow in YAML, declaring inputs, tasks, and triggers. Tasks can be anything — scripts, queries, remote jobs or API calls. Add
retry
,timeout
,concurrency
ortaskRunner
settings to scale tasks dynamically and manage data orchestration logic. - Add Triggers – Execute flows manually, via schedules, API, flow or event triggers (e.g., S3 file uploads).
- Observe and Manage – Use Kestra’s UI to inspect workflow outputs, logs, execution states, and dependencies.
Next Steps
- Explore plugins for databases, data ingestion and transformation tools or custom scripts in any language.
- Explore blueprints for common data workflows and data orchestration patterns.
- Explore How-to Guides for detailed examples on using Kestra for ETL, ELT, ML, and more.
- Explore Task Runners for scaling custom scripts and containers.
- Explore video tutorials on our YouTube channel.
- Join Slack to share flow examples or ask questions.
- Book a demo to discuss how Kestra can help orchestrate your data workflows.