Data storage and processing

Manage data processed by tasks.

Kestra’s primary purpose is to orchestrate data processing via tasks, so data is central to each flow’s execution.

Depending on the task, data can be stored inside the execution context or inside Kestra’s internal storage. You can also manually store data inside Kestra’s KV store by using dedicated tasks.

Some tasks give you the choice of where you want to store the data, usually using a fetchType property or the three fetch/fetchOne/store properties.

For example, using the DynamoDB Query task:

id: query
type: io.kestra.plugin.aws.dynamodb.Query
tableName: persons
keyConditionExpression: id = :id
expressionAttributeValues:
:id: "1"
fetchType: FETCH

The fetchType property can have four values:

  • FETCH_ONE: fetches the first row and set it in a task output attribute (the row attribute for DynamoDB); the data is stored inside the execution context.
  • FETCH: fetches all rows and set them in a task output attribute (the rows attribute for DynamoDB); the data is stored inside the execution context.
  • STORE: stores all rows inside Kestra’s internal storage. The internal storage returns a URI usually set in the task output attribute uri and that can be used to retrieve the file from the internal storage.
  • NONE: does nothing.

The three fetch/fetchOne/store properties do the same but using three different task properties instead of a single one.

Storing data

Storing data inside the flow execution context

Data can be stored as variables inside the flow execution context. This can be convenient for sharing data between tasks.

To do so, tasks store data as output attributes that are then available inside the flow via Pebble expressions like {{outputs.taskName.attributeName}}.

Be careful, the size of the data is significant, this increases the size of the flow execution context, which can lead to slow execution and increase the size of the execution storage inside Kestra’s repository.

Storing data inside the internal storage

Kestra has an internal storage that can store data of any size. By default, the internal storage uses the host filesystem, but plugins exist to use other implementations like Amazon S3, Google Cloud Storage, or Microsoft Azure Blobs storage. See internal storage configuration.

When using the internal storage, data is, by default, stored using Amazon Ion format.

Tasks that can store data inside the internal storage usually have an output attribute named uri that can be used to access this file in following tasks.

The following example uses the DynamoDB Query task to query a table and the FTP Upload task to send the retrieved rows to an external FTP server.

tasks:
- id: query
type: io.kestra.plugin.aws.dynamodb.Query
tableName: persons
keyConditionExpression: id = :id
expressionAttributeValues:
:id: "1"
fetchType: STORE
- id: upload
type: io.kestra.plugin.fs.ftp.Upload
host: localhost
port: 80
from: "{{ outputs.query.uri }}"
to: "/upload/file.ion"

If you need to access data from the internal storage, you can use the read() function to read the file’s content as a string.

Dedicated tasks allow managing the files stored inside the internal storage:

  • Concat: concat multiple files.
  • Delete: delete a file.
  • Size: get the size of a file.
  • Split: split a file into multiple files depending on the size of the file or the number of rows.

Storing data inside the KV store

Dedicated tasks can store data inside Kestra’s KV store. The KV store transparently uses Kestra’s internal storage as its backend store.

The KV store allows storing data that will be shared by all executions of the same namespace. You can think of it as a key/value store dedicated to a namespace.

The following tasks are available:

  • Set: set data in key/value pair.
  • Get: get data from key/value pair.
  • Delete: delete a key/value pair.

Example:

tasks:
- id: set_data
type: io.kestra.plugin.core.kv.Set
key: name
value: John Doe
- id: get_data
type: io.kestra.plugin.core.kv.Get
key: name

In the next example, the flow uses Set, Get and Delete on the data:

Example Flow
id: kv_store_example
namespace: company.team
tasks:
- id: set_data
type: io.kestra.plugin.core.kv.Set
key: user_name
value: John Doe
- id: get_data
type: io.kestra.plugin.core.kv.Get
key: user_name
- id: log_state
type: io.kestra.plugin.core.log.Log
message: "{{ kv('user_name') }}"
- id: set_new_data
type: io.kestra.plugin.core.kv.Set
key: user_name
value: Bob Smith
- id: get_new_data
type: io.kestra.plugin.core.kv.Get
key: user_name
- id: log_new_data
type: io.kestra.plugin.core.log.Log
message: "{{ kv('user_name') }}"
- id: delete_data
type: io.kestra.plugin.core.kv.Delete
key: user_name
- id: get_deleted_data
type: io.kestra.plugin.core.kv.Get
description: You will not get any data as the corresponding key is deleted in the earlier task.
key: user_name

When we Set a new value for user_name, we have to use another Get task to get the most up to date value, and then reference the Get task id in our log underneath to get the latest value. The same applies to the Delete task. In order to show that it has been deleted, we try to get the data from the key deleetd in the delete_data task to show that.

Processing data

For basic data processing, you can leverage Kestra’s Pebble templating engine.

For more complex data transformations, Kestra offers various data processing plugins including transform tasks or custom scripts.

Converting files

Files from the internal storage can be converted from/to the Ion format to/from another format using the Serdes plugin.

The following formats are currently available: Avro, JSON, XML, and Parquet.

Each format offers a reader to read an Ion serialized data file and write it in the target format and a writer to read a file in a specific format and write it as an Ion serialized data file.

For example, to convert an Ion file to CSV, then back to Ion:

tasks:
- id: query
type: io.kestra.plugin.aws.dynamodb.Query
tableName: persons
keyConditionExpression: id = :id
expressionAttributeValues:
:id: "1"
fetchType: STORE
- id: convertToCsv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{outputs.query.uri}}"
- id: convertBackToIon
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{outputs.convertToCsv.uri}}""

Processing data using scripts

Kestra can launch Python, R, Node.js, Shell, Powershell, and Go scripts. Depending on the runner, they can run directly in a local process on the host or inside Docker containers.

Those script tasks are available in the Scripts Plugin. Below is documentation for each of them:

  • The Python task runs a Python script in a Docker container or in a local process.
  • The Node task runs a Node.js script in a Docker container or in a local process.
  • The R task runs an R script in a Docker container or in a local process.
  • The Shell task executes a single Shell command, or a list of commands that you provide.
  • The PowerShell task executes a single PowerShell command, or a list of commands that you provide.
  • The Go (Script) task executes a single multi-line script, while the Go (Commands) task executes a list of commands that you provide.

The following example queries the BigQuery public dataset with Wikipedia page views to find the top 10 pages, convert it to CSV, and use the CSV file inside a Python task for further transformations using Pandas.

id: wikipedia-top-ten-python-panda
namespace: company.team
description: analyze top 10 Wikipedia pages
tasks:
- id: query
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023`
WHERE DATE(datehour) = current_date() and wiki = 'en'
ORDER BY datehour desc, views desc
LIMIT 10
store: true
projectId: geller
serviceAccount: "{{envs.gcp_creds}}"
- id: write-csv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{outputs.query.uri}}"
- id: wdir
type: io.kestra.plugin.core.flow.WorkingDirectory
inputFiles:
data.csv: "{{outputs['write-csv'].uri}}"
tasks:
- id: pandas
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
import pandas as pd
from kestra import Kestra
df = pd.read_csv("data.csv")
views = df['views'].sum()
Kestra.outputs({'views': int(views)})

Kestra offers several plugins for ingesting and transforming data — check the Plugin list for more details.

Make sure to also check:

  1. The Script documentation for a detailed overview of how to work with Python, R, Node.js, Shell and Powershell scripts, and how to integrate them with Git and Docker.
  2. The Blueprints catalog — simply search for the relevant language (e.g., Python, R, Rust) or use case (ETL, Git, dbt, etc.) to find the relevant examples.

Processing data using file transform

Kestra can process data row by row using file transform tasks. The transformation is done with a small script written in Python, JavaScript, or Groovy.

The following example queries the BigQuery public dataset for Wikipedia pages, convert it row by row with the Nashorn FileTransform, and write it in a CSV file.

id: wikipedia-top-ten-file-transform
namespace: company.team
description: A flow that loads wikipedia top 10 EN pages
tasks:
- id: query-top-ten
type: io.kestra.plugin.gcp.bigquery.Query
sql: |
SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023`
WHERE DATE(datehour) = current_date() and wiki = 'en'
ORDER BY datehour desc, views desc
LIMIT 10
store: true
- id: file-transform
type: io.kestra.plugin.graalvm.python.FileTransform
from: "{{outputs['query-top-ten'].uri}}"
script: |
logger.info('row: {}', row)
if (row['title'] === 'Main_Page' || row['title'] === 'Special:Search' || row['title'] === '-') {
// remove un-needed row
row = null
} else {
// add a 'time' column
row['time'] = String(row['date']).substring(11)
// modify the 'date' column to only keep the date part
row['date'] = String(row['date']).substring(0, 10)
}
- id: write-csv
type: io.kestra.plugin.serdes.csv.IonToCsv
from: "{{outputs['file-transform'].uri}}"

Purging data


The PurgeExecution task can purge all the files stored inside the internal context by a flow execution. It can be used at the end of a flow to purge all its generated files.

tasks:
- id: "purge-execution"
type: "io.kestra.plugin.core.storage.PurgeExecution"

The execution context itself is not available after the end of the execution and is automatically deleted from Kestra’s repository after a retention period (seven days by default) that can be changed; see configurations.

Also, the Purge task can be used to purge storages, logs, and executions of previous execution. For example, this flow purges all of these every day:

id: purge
namespace: company.team
tasks:
- id: "purge"
type: "io.kestra.plugin.core.storage.Purge"
endDate: "{{ now() | dateAdd(-1, 'MONTHS') }}"
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 0 * * *"

FAQ

Internal Storage FAQ

How to read a file from the internal storage as a string?

The ‘read’ function expects an argument ‘path’ that is a path to a namespace file or an internal storage URI. Note that when using inputs, outputs or trigger variables, you don’t need any extra quotation marks. Here is how you can use such variables along with the ‘read’ function:

  • {{ read(inputs.file) }} for a FILE-type input variable named file
  • {{ read(outputs.mytaskid.uri) }} for an output uri from a task named mytaskid
  • {{ read(trigger.uri) }} for a uri of many triggers incl. Kafka, AWS SQS, GCP PubSub, etc.
  • {{ read(trigger.objects | jq('.[].uri')) }} for a uri of a trigger that returns a list of detected objects, e.g. AWS S3, GCP GCS, etc.

Note that the read function can only read files within the same execution. If you try to read a file from a previous execution, you will get an Unauthorized error.

Example using a FILE-type inputs variable
id: read_file_as_string
namespace: company.team
inputs:
- id: file
type: FILE
tasks:
- id: log_internal_storage_uri
type: io.kestra.plugin.core.log.Log
message: "{{ inputs.file }}"
- id: log_file_content
type: io.kestra.plugin.core.log.Log
message: "{{ read(inputs.file) }}"
Example with the ForEachItem task reading file's content as a string

When using the ForEachItem task, you can use the read() function to read the content of a file as a string. This is especially useful when you want to pass the content of a file as a raw string as an input to a subflow.

Below is a simple subflow example that uses a string input:

id: subflow_raw_string_input
namespace: company.team
inputs:
- id: string_input
type: STRING
defaults: hey there
tasks:
- id: for_each_item
type: io.kestra.plugin.core.debug.Return
format: "{{ inputs.string_input }}"

Because the ForEachItem task splits the items file into batches of smaller files (one file per row by default), you can use the read() function to read the content of that file for a given batch as a string value and pass it as an input to that subflow shown above.

id: parent_flow
namespace: company.team
tasks:
- id: extract
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
INSTALL httpfs;
LOAD httpfs;
SELECT *
FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
store: true
- id: each_raw
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.extract.uri }}"
namespace: company.team
flowId: subflow_raw_string_input
inputs:
string_input: "{{ read(taskrun.items) }}"

How to read a Namespace File as a string?

So far, you’ve seen how to read a file from the internal storage as a string. However, you can use the same read() function to read a Namespace File as a string. This is especially useful when you want to execute a Python script or a long SQL query stored in a dedicated SQL file.

The read() function takes the absolute path to the file you want to read. The path must point to a file stored in the same namespace as the flow you are executing.

Below is a simple example showing how you can read a file named hello.py stored in the scripts directory of the company.team namespace:

id: hello
namespace: company.team
tasks:
- id: my_python_script
type: io.kestra.plugin.scripts.python.Script
script: "{{ read('scripts/hello.py') }}"

The same syntax applies to SQL queries, custom scripts, and many more. Check the Namespace Files documentation for more details.


How to read a file from the internal storage as a JSON object?

You can use the Pebble function {{ fromJson(myvar) }} and a {{ myvar | toJson }} filter to process JSON data.

The fromJson() function

The function is used to convert a string to a JSON object. For example, the following Pebble expression converts the string {"foo": [666, 1, 2]} to a JSON object and then returns the first value of the foo key, which is 42:

{{ json('{"foo": [42, 43, 44]}').foo[0] }}

You can use the read() function to read the content of a file as a string and then apply the json() function to convert it to a JSON object. Afterwards, you can read the value of a specific key in that JSON object. For example, the following Pebble expression reads the content of a file named my.json and then returns the value of the foo key, which is 42:

id: extract_json
namespace: company.team
tasks:
- id: extract
type: io.kestra.plugin.core.http.Download
uri: https://huggingface.co/datasets/kestra/datasets/raw/main/json/app_events.json
- id: read_as_string
type: io.kestra.plugin.core.log.Log
message: "{{ read(outputs.extract.uri) }}"
- id: read_as_json
type: io.kestra.plugin.core.log.Log
message: "{{ json(read(outputs.extract.uri)) }}"
- id: parse_json_elements
type: io.kestra.plugin.core.log.Log
message: "{{ json(read(outputs.extract.uri)) | jq('map(.detail | fromjson | .message)') | first }}"

The above flow downloads a JSON file via an HTTP Request, reads its content as a string, converts it to a JSON object, and then in another task, it parses the JSON object and returns the value of a nested key.

The json filter

You can use the json filter to convert any variable to a JSON string. You can think of it as a reverse process to what the json() function does.

The example below shows how you can convert a list of numbers to a JSON string '[1, 2, 3]' using the | json filter:

{{ [1, 2, 3] | json }}

::