ETL Pipelines in Kestra
Build ETL pipelines in Kestra using DuckDB, Python and Task Runners.
This tutorial demonstrates building different ETL pipelines in Kestra.
We have used AWS access key and secret key in the example workflows below. To know more about these keys and how to get one, you can refer this page. Once we have these, we can store them in the KV Store or as Secrets.
Using DuckDB
DuckDB can be leveraged to transform the data directly using SQL queries.
In the example below, we fetch CSV files, perform the join transformation using DuckDB Query task, store the result, upload the detailed orders onto S3, perform another transformation on the stored result, and finally upload the file as CSV onto S3.
id: etl_using_duckdbnamespace: company.team
tasks: - id: download_orders_csv type: io.kestra.plugin.core.http.Download description: Download orders.csv file uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: download_products_csv type: io.kestra.plugin.core.http.Download description: Download products.csv file uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv
- id: get_detailed_orders type: io.kestra.plugin.jdbc.duckdb.Query description: Perform JOIN transformation using DuckDB inputFiles: orders.csv: "{{ outputs.download_orders_csv.uri }}" products.csv: "{{ outputs.download_products_csv.uri }}" sql: | SELECT o.order_id, o.customer_name, o.customer_email, o.product_id, o.price, o.quantity, o.total, p.product_name, p.product_category, p.brand FROM read_csv_auto('{{ workingDir }}/orders.csv', header=True) o JOIN read_csv_auto('{{ workingDir }}/products.csv', header=True) p ON o.product_id = p.product_id ORDER BY order_id ASC; store: true
- id: ion_to_csv type: io.kestra.plugin.serdes.csv.IonToCsv description: Convert the result into CSV from: "{{ outputs.get_detailed_orders.uri }}"
- id: upload_detailed_orders_to_s3 type: io.kestra.plugin.aws.s3.Upload description: Upload the resulting CSV file onto S3 accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" region: "eu-central-1" from: "{{ outputs.get_orders_per_product_csv.uri }}" bucket: "my_bucket" key: "orders/detailed_orders"
- id: get_orders_per_product type: io.kestra.plugin.jdbc.duckdb.Query description: Perform aggregation using DuckDB inputFiles: detailed_orders.csv: "{{ outputs.ion_to_csv.uri }}" sql: | SELECT product_id, COUNT(product_id) as order_count, SUM(quantity) as product_count, CAST(SUM(total) AS DECIMAL(10,2)) AS order_total FROM read_csv_auto('{{ workingDir }}/detailed_orders.csv', header=True) GROUP BY product_id ORDER BY product_id ASC store: true
- id: get_orders_per_product_csv type: io.kestra.plugin.serdes.csv.IonToCsv description: Convert the result into CSV from: "{{ outputs.get_orders_per_product.uri }}"
- id: upload_orders_per_product_to_s3 type: io.kestra.plugin.aws.s3.Upload description: Upload the resulting CSV file onto S3 accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" region: "eu-central-1" from: "{{ outputs.get_orders_per_product_csv.uri }}" bucket: "my_bucket" key: "orders/orders_per_product"
Similar Query tasks can be performed on different databases like Snowflake, Postgres, etc.
Using Python
You can choose to perform ETL using python (pandas) and then run it as a Python script.
The ETL performed using DuckDB above can be performed using Python as shown in the example flow below.
id: python_etlnamespace: company.team
tasks: - id: etl type: io.kestra.plugin.scripts.python.Script description: Python ETL Script beforeCommands: - pip install requests pandas script: | import io import requests import pandas as pd
def _extract(url): csv_data = requests.get(url).content return pd.read_csv(io.StringIO(csv_data.decode('utf-8')), header=0)
def run_etl(): orders_data = _extract("https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv") products_data = _extract("https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv") # perform join transformation detailed_orders = orders_data.merge(products_data, how='left', left_on='product_id', right_on='product_id') detailed_orders.to_csv("detailed_orders.csv") # perform aggregation orders_per_product = detailed_orders.groupby('product_id').agg(order_count= ('product_id', 'count'), product_count=('quantity', 'sum'), order_total=('total', 'sum')).sort_values('product_id') orders_per_product['order_total'] = orders_per_product['order_total'].apply(lambda x: float("{:.2f}".format(x))) orders_per_product.to_csv("orders_per_product.csv")
if __name__ == "__main__": run_etl() outputFiles: - detailed_orders.csv - orders_per_product.csv
- id: upload_detailed_orders_to_s3 type: io.kestra.plugin.aws.s3.Upload description: Upload the resulting CSV file onto S3 accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" region: "eu-central-1" from: "{{ outputs.python_etl.outputFiles('detailed_orders.csv') }}" bucket: "my_bucket" key: "orders/detailed_orders"
- id: upload_orders_per_product_to_s3 type: io.kestra.plugin.aws.s3.Upload description: Upload the resulting CSV file onto S3 accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" region: "eu-central-1" from: "{{ outputs.python_etl.outputFiles('orders_per_product.csv') }}" bucket: "my_bucket" key: "orders/orders_per_product"
Using Batch Task Runners
When the python scripts get more compute-intesive or memory-intensive, it is advised to run them on remote batch compute resources using Batch Task Runners.
Kestra provides a variety of Batch Task Runners. Here is an example of how the ETL python script can be run on a AWS Batch Task Runner.
id: aws_batch_task_runner_etlnamespace: company.team
tasks: - id: python_etl_on_aws_task_runner type: io.kestra.plugin.scripts.python.Script description: Run python ETL script on Docker Task Runner containerImage: python:3.11-slim taskRunner: type: io.kestra.plugin.ee.aws.runner.Batch region: eu-central-1 accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" computeEnvironmentArn: "arn:aws:batch:eu-central-1:01234567890:compute-environment/kestraFargateEnvironment" jobQueueArn: "arn:aws:batch:eu-central-1:01234567890:job-queue/kestraJobQueue" executionRoleArn: "arn:aws:iam::01234567890:role/kestraEcsTaskExecutionRole" taskRoleArn: arn:aws:iam::01234567890:role/ecsTaskRole bucket: kestra-product-de beforeCommands: - pip install requests pandas script: | import io import requests import pandas as pd
def _extract(url): csv_data = requests.get(url).content return pd.read_csv(io.StringIO(csv_data.decode('utf-8')), header=0)
def run_etl(): orders_data = _extract("https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv") products_data = _extract("https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv") # perform join transformation detailed_orders = orders_data.merge(products_data, how='left', left_on='product_id', right_on='product_id') detailed_orders.to_csv("detailed_orders.csv") # perform aggregation orders_per_product = detailed_orders.groupby('product_id').agg(order_count= ('product_id', 'count'), product_count=('quantity', 'sum'), order_total=('total', 'sum')).sort_values('product_id') orders_per_product['order_total'] = orders_per_product['order_total'].apply(lambda x: float("{:.2f}".format(x))) orders_per_product.to_csv("orders_per_product.csv")
if __name__ == "__main__": run_etl() outputFiles: - detailed_orders.csv - orders_per_product.csv
- id: upload_detailed_orders_to_s3 type: io.kestra.plugin.aws.s3.Upload description: Upload the resulting CSV file onto S3 accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" region: "eu-central-1" from: "{{ outputs.python_etl.outputFiles('detailed_orders.csv') }}" bucket: "my_bucket" key: "orders/detailed_orders"
- id: upload_orders_per_product_to_s3 type: io.kestra.plugin.aws.s3.Upload description: Upload the resulting CSV file onto S3 accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}" secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}" region: "eu-central-1" from: "{{ outputs.python_etl.outputFiles('orders_per_product.csv') }}" bucket: "my_bucket" key: "orders/orders_per_product"
Using dbt
We can create a similar pipeline based on a ELT model using dbt via Kestra. We can leverage the namespace files for creating the dbt models.
In this example, we will be using dbt + BigQuery and perform the ELT process where we will load data from a http request to Hugging Face into BigQuery tables, perform transformations such as join and aggregates using dbt, and then query the newly generated tables as a result of dbt transformation.
id: dbt_transformationsnamespace: kestra.engineering.bigquery.dbt
tasks: - id: orders_http_download type: io.kestra.plugin.core.http.Download description: Download orders.csv using HTTP Download uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv
- id: products_http_download type: io.kestra.plugin.core.http.Download description: Download products.csv using HTTP Download uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv
- id: create_orders_table type: io.kestra.plugin.gcp.bigquery.CreateTable description: Create orders table in BigQuery serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}" projectId: <gcp_project_id> dataset: ecommerce table: orders tableDefinition: type: TABLE schema: fields: - name: order_id type: INT64 - name: customer_name type: STRING - name: customer_email type: STRING - name: product_id type: INT64 - name: price type: FLOAT64 - name: quantity type: INT64 - name: total type: FLOAT64
- id: create_products_table type: io.kestra.plugin.gcp.bigquery.CreateTable description: Create products table in BigQuery. serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}" projectId: <gcp_project_id> dataset: ecommerce table: products tableDefinition: type: TABLE schema: fields: - name: product_id type: INT64 - name: product_name type: STRING - name: product_category type: STRING - name: brand type: STRING
- id: load_orders_table type: io.kestra.plugin.gcp.bigquery.Load description: Load orders table with data from orders.csv from: "{{ outputs.orders_http_download.uri }}" projectId: <gcp_project_id> serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}" destinationTable: "<gcp_project_id>.ecommerce.orders" format: CSV csvOptions: fieldDelimiter: "," skipLeadingRows: 1
- id: load_products_table type: io.kestra.plugin.gcp.bigquery.Load description: Load products table with data from products.csv from: "{{ outputs.products_http_download.uri }}" projectId: <gcp_project_id> serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}" destinationTable: "<gcp_project_id>.ecommerce.products" format: CSV csvOptions: fieldDelimiter: "," skipLeadingRows: 1
- id: dbt type: io.kestra.plugin.dbt.cli.DbtCLI description: Use dbt build to perform the dbt transformations inputFiles: sa.json: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}" taskRunner: type: io.kestra.plugin.scripts.runner.docker.Docker containerImage: ghcr.io/kestra-io/dbt-bigquery:latest namespaceFiles: enabled : true profiles: | bq_dbt_project: outputs: dev: type: bigquery method: service-account dataset: ecommerce project: <gcp_project_id> keyfile: sa.json location: US priority: interactive threads: 16 timeout_seconds: 300 fixed_retries: 1 target: dev commands: - dbt deps - dbt build
- id: query_detailed_orders type: io.kestra.plugin.gcp.bigquery.Query description: Query the newly generated detailed_orders BigQuery table serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}" projectId: <gcp_project_id> sql: | SELECT * FROM <gcp_project_id>.ecommerce.detailed_orders store: true
- id: query_orders_per_product type: io.kestra.plugin.gcp.bigquery.Query description: Query the newly generated orders_per_product BigQuery table serviceAccount: "{{ secret('GCP_SERVICE_ACCOUNT_JSON') }}" projectId: <gcp_project_id> sql: | SELECT * FROM <gcp_project_id>.ecommerce.orders_per_product store: true
Here are the files that you should create in the Kestra editor.
Firstly, create dbt_project.yml
file, and put the following contents into it.
name: 'bq_dbt_project'version: '1.0.0'config-version: 2
profile: 'bq_dbt_project'
model-paths: ["models"]analysis-paths: ["analyses"]test-paths: ["tests"]seed-paths: ["seeds"]macro-paths: ["macros"]snapshot-paths: ["snapshots"]
clean-targets: - "target" - "dbt_packages"
models: bq_dbt_project: example: +materialized: view
Next, create models
folder. All the upcoming files will be created under the models
folder.
We will create sources.yml
which defines the source tables that will be referenced in other models.
version: 2
sources: - name: ecommerce database: <gcp_project_id> schema: ecommerce tables: - name: orders - name: products
Next, we will create two files stg_orders.sql
and stg_products.sql
which will materialize as views on top of the source tables. The contents of these files respectively will be:
stg_orders.sql
{{ config(materialized="view") }}
select order_id,customer_name,customer_email,product_id,price,quantity,totalfrom {{ source('ecommerce', 'orders') }}
stg_products.sql
{{ config(materialized="view") }}
selectproduct_id,product_name,product_category,brandfrom {{ source('ecommerce', 'products') }}
Next, we will create detailed_orders.sql
which will create the detailed_orders
table. This model will join the stg_orders
and stg_products
view based on product_id
. The contents of detailed_orders.sql
file will be as follows:
{{ config(materialized="table") }}
selecto.order_id,o.customer_name,o.customer_email,o.product_id,p.product_name,p.product_category,p.brand,o.price,o.quantity,o.totalfrom {{ ref('stg_orders') }} o join {{ ref('stg_products') }} pon o.product_id = p.product_id
Next, we will create order_per_product.sql
which will create the order_per_product
table. This model demonstrates aggregation performed on detailed_orders
table. The contents of order_per_product.sql
file will be as follows:
{{ config(materialized="table") }}
selectproduct_id,COUNT(product_id) as order_count,SUM(quantity) as product_count,SUM(total) AS order_totalfrom {{ ref('detailed_orders') }}group by product_idorder by product_id asc
With this, we are ready with all the dbt models. You can now execute the flow. The flow will generate the detailed_orders
and orders_per_product
tables. You can view the content of this table by going to the Outputs of the last two tasks.
Using Spark
We can perform the same ETL process using Spark.
The flow for performing the same transformation using Spark will look as follows:
id: spark_python_submitnamespace: kestra.engineering.spark
tasks: - id: python_submit type: io.kestra.plugin.spark.PythonSubmit runner: DOCKER docker: networkMode: host user: root master: spark://localhost:7077 args: - "10" mainScript: | from pyspark.sql import SparkSession from pyspark import SparkFiles
orders_url = "https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv" products_url = "https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv" spark.sparkContext.addFile(orders_url) spark.sparkContext.addFile(products_url)
if __name__ == "__main__": spark = SparkSession.builder.appName("EcommerceApp").getOrCreate()
#Create orders dataframe based on orders.csv orders_df = spark.read.csv("file://" + SparkFiles.get("orders.csv"), inferSchema=True, header=True)
#Create products dataframe based on orders.csv products_df = spark.read.csv("file://" + SparkFiles.get("products.csv"), inferSchema=True, header=True)
#Create detailed_orders by joining orders_df and products_df detailed_orders_df = orders_df.join(products_df, orders_df.product_id == products_df.product_id, "left")
# Print the contents of detailed_orders_df detailed_orders_df.show(10)
spark.stop()