Data storage and processing
Manage data processed by tasks.
Kestra’s primary purpose is to orchestrate data processing via tasks, so data is central to each flow’s execution.
Depending on the task, data can be stored inside the execution context or inside Kestra’s internal storage. You can also manually store data inside Kestra’s KV store by using dedicated tasks.
Some tasks give you the choice of where you want to store the data, usually using a fetchType
property or the three fetch
/fetchOne
/store
properties.
For example, using the DynamoDB Query task:
id: querytype: io.kestra.plugin.aws.dynamodb.QuerytableName: personskeyConditionExpression: id = :idexpressionAttributeValues: :id: "1"fetchType: FETCH
The fetchType
property can have four values:
FETCH_ONE
: fetches the first row and set it in a task output attribute (therow
attribute for DynamoDB); the data is stored inside the execution context.FETCH
: fetches all rows and set them in a task output attribute (therows
attribute for DynamoDB); the data is stored inside the execution context.STORE
: stores all rows inside Kestra’s internal storage. The internal storage returns a URI usually set in the task output attributeuri
and that can be used to retrieve the file from the internal storage.NONE
: does nothing.
The three fetch
/fetchOne
/store
properties do the same but using three different task properties instead of a single one.
Storing data
Storing data inside the flow execution context
Data can be stored as variables inside the flow execution context. This can be convenient for sharing data between tasks.
To do so, tasks store data as output attributes that are then available inside the flow via Pebble expressions like {{outputs.taskName.attributeName}}
.
Be careful, the size of the data is significant, this increases the size of the flow execution context, which can lead to slow execution and increase the size of the execution storage inside Kestra’s repository.
Depending on the Kestra internal queue and repository implementation, there can be a hard limit on the size of the flow execution context as it is stored as a single row/message. Usually, this limit is around 1MB, so this is important to avoid storing large amounts of data inside the flow execution context.
Storing data inside the internal storage
Kestra has an internal storage that can store data of any size. By default, the internal storage uses the host filesystem, but plugins exist to use other implementations like Amazon S3, Google Cloud Storage, or Microsoft Azure Blobs storage. See internal storage configuration.
When using the internal storage, data is, by default, stored using Amazon Ion format.
Tasks that can store data inside the internal storage usually have an output attribute named uri
that can be used to access this file in following tasks.
The following example uses the DynamoDB Query task to query a table and the FTP Upload task to send the retrieved rows to an external FTP server.
tasks:- id: query type: io.kestra.plugin.aws.dynamodb.Query tableName: persons keyConditionExpression: id = :id expressionAttributeValues: :id: "1" fetchType: STORE
- id: upload type: io.kestra.plugin.fs.ftp.Upload host: localhost port: 80 from: "{{ outputs.query.uri }}" to: "/upload/file.ion"
If you need to access data from the internal storage, you can use the read()
function to read the file’s content as a string.
Dedicated tasks allow managing the files stored inside the internal storage:
- Concat: concat multiple files.
- Delete: delete a file.
- Size: get the size of a file.
- Split: split a file into multiple files depending on the size of the file or the number of rows.
This should be the main method for storing and carrying large data from task to task. As an example, if you know that a HTTP Request returns a heavy payload, you should consider using HTTP Download along with a Serdes instead of carrying raw data in Flow Execution Context
Storing data inside the KV store
Dedicated tasks can store data inside Kestra’s KV store. The KV store transparently uses Kestra’s internal storage as its backend store.
The KV store allows storing data that will be shared by all executions of the same namespace. You can think of it as a key/value store dedicated to a namespace.
The following tasks are available:
- Set: set data in key/value pair.
- Get: get data from key/value pair.
- Delete: delete a key/value pair.
Example:
tasks:- id: set_data type: io.kestra.plugin.core.kv.Set key: name value: John Doe
- id: get_data type: io.kestra.plugin.core.kv.Get key: name
In the next example, the flow uses Set
, Get
and Delete
on the data:
Example Flow
id: kv_store_examplenamespace: company.team
tasks: - id: set_data type: io.kestra.plugin.core.kv.Set key: user_name value: John Doe
- id: get_data type: io.kestra.plugin.core.kv.Get key: user_name
- id: log_state type: io.kestra.plugin.core.log.Log message: "{{ kv('user_name') }}"
- id: set_new_data type: io.kestra.plugin.core.kv.Set key: user_name value: Bob Smith
- id: get_new_data type: io.kestra.plugin.core.kv.Get key: user_name
- id: log_new_data type: io.kestra.plugin.core.log.Log message: "{{ kv('user_name') }}"
- id: delete_data type: io.kestra.plugin.core.kv.Delete key: user_name
- id: get_deleted_data type: io.kestra.plugin.core.kv.Get description: You will not get any data as the corresponding key is deleted in the earlier task. key: user_name
When we Set
a new value for user_name
, we have to use another Get
task to get the most up to date value, and then reference the Get
task id
in our log underneath to get the latest value. The same applies to the Delete
task. In order to show that it has been deleted, we try to get the data from the key deleetd in the delete_data
task to show that.
Processing data
For basic data processing, you can leverage Kestra’s Pebble templating engine.
For more complex data transformations, Kestra offers various data processing plugins including transform tasks or custom scripts.
Converting files
Files from the internal storage can be converted from/to the Ion format to/from another format using the Serdes plugin.
The following formats are currently available: Avro, JSON, XML, and Parquet.
Each format offers a reader to read an Ion serialized data file and write it in the target format and a writer to read a file in a specific format and write it as an Ion serialized data file.
For example, to convert an Ion file to CSV, then back to Ion:
tasks:- id: query type: io.kestra.plugin.aws.dynamodb.Query tableName: persons keyConditionExpression: id = :id expressionAttributeValues: :id: "1" fetchType: STORE
- id: convertToCsv type: io.kestra.plugin.serdes.csv.IonToCsv from: "{{outputs.query.uri}}"
- id: convertBackToIon type: io.kestra.plugin.serdes.csv.CsvToIon from: "{{outputs.convertToCsv.uri}}""
Processing data using scripts
Kestra can launch Python, R, Node.js, Shell, Powershell, and Go scripts. Depending on the runner
, they can run directly in a local process on the host or inside Docker containers.
Those script tasks are available in the Scripts Plugin. Below is documentation for each of them:
- The Python task runs a Python script in a Docker container or in a local process.
- The Node task runs a Node.js script in a Docker container or in a local process.
- The R task runs an R script in a Docker container or in a local process.
- The Shell task executes a single Shell command, or a list of commands that you provide.
- The PowerShell task executes a single PowerShell command, or a list of commands that you provide.
- The Go (Script) task executes a single multi-line script, while the Go (Commands) task executes a list of commands that you provide.
The following example queries the BigQuery public dataset with Wikipedia page views to find the top 10 pages, convert it to CSV, and use the CSV file inside a Python task for further transformations using Pandas.
id: wikipedia-top-ten-python-pandanamespace: company.teamdescription: analyze top 10 Wikipedia pages
tasks: - id: query type: io.kestra.plugin.gcp.bigquery.Query sql: | SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023` WHERE DATE(datehour) = current_date() and wiki = 'en' ORDER BY datehour desc, views desc LIMIT 10 store: true projectId: geller serviceAccount: "{{envs.gcp_creds}}"
- id: write-csv type: io.kestra.plugin.serdes.csv.IonToCsv from: "{{outputs.query.uri}}"
- id: wdir type: io.kestra.plugin.core.flow.WorkingDirectory inputFiles: data.csv: "{{outputs['write-csv'].uri}}" tasks: - id: pandas type: io.kestra.plugin.scripts.python.Script containerImage: ghcr.io/kestra-io/pydata:latest script: | import pandas as pd from kestra import Kestra
df = pd.read_csv("data.csv") views = df['views'].sum() Kestra.outputs({'views': int(views)})
Kestra offers several plugins for ingesting and transforming data — check the Plugin list for more details.
Make sure to also check:
- The Script documentation for a detailed overview of how to work with Python, R, Node.js, Shell and Powershell scripts, and how to integrate them with Git and Docker.
- The Blueprints catalog — simply search for the relevant language (e.g., Python, R, Rust) or use case (ETL, Git, dbt, etc.) to find the relevant examples.
Processing data using file transform
Kestra can process data row by row using file transform tasks. The transformation is done with a small script written in Python, JavaScript, or Groovy.
- The GraalVM Python FileTransform task allows transforming rows with Python.
- The GraalVM JavaScript FileTransform task allows transforming rows with JavaScript.
- The Groovy Script task allows running scripts with Groovy.
The following example queries the BigQuery public dataset for Wikipedia pages, convert it row by row with the Nashorn FileTransform, and write it in a CSV file.
id: wikipedia-top-ten-file-transformnamespace: company.teamdescription: A flow that loads wikipedia top 10 EN pagestasks: - id: query-top-ten type: io.kestra.plugin.gcp.bigquery.Query sql: | SELECT DATETIME(datehour) as date, title, views FROM `bigquery-public-data.wikipedia.pageviews_2023` WHERE DATE(datehour) = current_date() and wiki = 'en' ORDER BY datehour desc, views desc LIMIT 10 store: true
- id: file-transform type: io.kestra.plugin.graalvm.python.FileTransform from: "{{outputs['query-top-ten'].uri}}" script: | logger.info('row: {}', row)
if (row['title'] === 'Main_Page' || row['title'] === 'Special:Search' || row['title'] === '-') { // remove un-needed row row = null } else { // add a 'time' column row['time'] = String(row['date']).substring(11) // modify the 'date' column to only keep the date part row['date'] = String(row['date']).substring(0, 10) }
- id: write-csv type: io.kestra.plugin.serdes.csv.IonToCsv from: "{{outputs['file-transform'].uri}}"
The script can access a logger to log messages. Each row is available in a row
variable where each column is accessible using the dictionary notation row['columnName']
.
Purging data
The PurgeExecution task can purge all the files stored inside the internal context by a flow execution. It can be used at the end of a flow to purge all its generated files.
tasks: - id: "purge-execution" type: "io.kestra.plugin.core.storage.PurgeExecution"
The execution context itself is not available after the end of the execution and is automatically deleted from Kestra’s repository after a retention period (seven days by default) that can be changed; see configurations.
Also, the Purge task can be used to purge storages, logs, and executions of previous execution. For example, this flow purges all of these every day:
id: purgenamespace: company.team
tasks: - id: "purge" type: "io.kestra.plugin.core.storage.Purge" endDate: "{{ now() | dateAdd(-1, 'MONTHS') }}"
triggers: - id: schedule type: io.kestra.plugin.core.trigger.Schedule cron: "0 0 * * *"
FAQ
Internal Storage FAQ
How to read a file from the internal storage as a string?
The ‘read’ function expects an argument ‘path’ that is a path to a namespace file or an internal storage URI. Note that when using inputs, outputs or trigger variables, you don’t need any extra quotation marks. Here is how you can use such variables along with the ‘read’ function:
{{ read(inputs.file) }}
for a FILE-type input variable namedfile
{{ read(outputs.mytaskid.uri) }}
for an outputuri
from a task namedmytaskid
{{ read(trigger.uri) }}
for auri
of many triggers incl. Kafka, AWS SQS, GCP PubSub, etc.{{ read(trigger.objects | jq('.[].uri')) }}
for auri
of a trigger that returns a list of detected objects, e.g. AWS S3, GCP GCS, etc.
Note that the read function can only read files within the same execution. If you try to read a file from a previous execution, you will get an Unauthorized error.
Example using a FILE-type inputs variable
id: read_file_as_stringnamespace: company.team
inputs: - id: file type: FILE
tasks: - id: log_internal_storage_uri type: io.kestra.plugin.core.log.Log message: "{{ inputs.file }}"
- id: log_file_content type: io.kestra.plugin.core.log.Log message: "{{ read(inputs.file) }}"
Example with the ForEachItem task reading file's content as a string
When using the ForEachItem
task, you can use the read()
function to read the content of a file as a string. This is especially useful when you want to pass the content of a file as a raw string as an input to a subflow.
Below is a simple subflow example that uses a string input:
id: subflow_raw_string_inputnamespace: company.team
inputs: - id: string_input type: STRING defaults: hey there
tasks: - id: for_each_item type: io.kestra.plugin.core.debug.Return format: "{{ inputs.string_input }}"
Because the ForEachItem
task splits the items
file into batches of smaller files (one file per row by default), you can use the read()
function to read the content of that file for a given batch as a string value and pass it as an input to that subflow shown above.
id: parent_flownamespace: company.team
tasks: - id: extract type: io.kestra.plugin.jdbc.duckdb.Query sql: | INSTALL httpfs; LOAD httpfs; SELECT * FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True); store: true
- id: each_raw type: io.kestra.plugin.core.flow.ForEachItem items: "{{ outputs.extract.uri }}" namespace: company.team flowId: subflow_raw_string_input inputs: string_input: "{{ read(taskrun.items) }}"
How to read a Namespace File as a string?
So far, you’ve seen how to read a file from the internal storage as a string. However, you can use the same read()
function to read a Namespace File as a string. This is especially useful when you want to execute a Python script or a long SQL query stored in a dedicated SQL file.
The read()
function takes the absolute path to the file you want to read. The path must point to a file stored in the same namespace as the flow you are executing.
Below is a simple example showing how you can read a file named hello.py
stored in the scripts
directory of the company.team
namespace:
id: hellonamespace: company.team
tasks: - id: my_python_script type: io.kestra.plugin.scripts.python.Script script: "{{ read('scripts/hello.py') }}"
The same syntax applies to SQL queries, custom scripts, and many more. Check the Namespace Files documentation for more details.
How to read a file from the internal storage as a JSON object?
You can use the Pebble function {{ fromJson(myvar) }}
and a {{ myvar | toJson }}
filter to process JSON data.
The fromJson() function
The function is used to convert a string to a JSON object. For example, the following Pebble expression converts the string {"foo": [666, 1, 2]}
to a JSON object and then returns the first value of the foo
key, which is 42
:
{{ json('{"foo": [42, 43, 44]}').foo[0] }}
You can use the read()
function to read the content of a file as a string and then apply the json()
function to convert it to a JSON object. Afterwards, you can read the value of a specific key in that JSON object. For example, the following Pebble expression reads the content of a file named my.json
and then returns the value of the foo
key, which is 42
:
id: extract_jsonnamespace: company.teamtasks: - id: extract type: io.kestra.plugin.core.http.Download uri: https://huggingface.co/datasets/kestra/datasets/raw/main/json/app_events.json
- id: read_as_string type: io.kestra.plugin.core.log.Log message: "{{ read(outputs.extract.uri) }}"
- id: read_as_json type: io.kestra.plugin.core.log.Log message: "{{ json(read(outputs.extract.uri)) }}"
- id: parse_json_elements type: io.kestra.plugin.core.log.Log message: "{{ json(read(outputs.extract.uri)) | jq('map(.detail | fromjson | .message)') | first }}"
The above flow downloads a JSON file via an HTTP Request, reads its content as a string, converts it to a JSON object, and then in another task, it parses the JSON object and returns the value of a nested key.
The json filter
You can use the json
filter to convert any variable to a JSON string. You can think of it as a reverse process to what the json()
function does.
The example below shows how you can convert a list of numbers to a JSON string '[1, 2, 3]'
using the | json
filter:
{{ [1, 2, 3] | json }}
You typically would never used the | json
filter in combination with the read()
function. Anytime you need to read a file’s content and then convert it to a JSON object, use a combination of the read()
function and the json()
function instead.
::