Flowable Tasks
Control your orchestration logic.
Flowable tasks control orchestration logic — running tasks or subflows in parallel, creating loops, and handling conditional branching. They do not run heavy operations; those are handled by workers.
Flowable tasks use expressions from the execution context to determine which tasks run next. For example, you can use the outputs of a previous task in a Switch
task to decide which task to run next.
Sequential
This task runs tasks sequentially and is typically used to group them.
id: sequentialnamespace: company.team
tasks: - id: sequential type: io.kestra.plugin.core.flow.Sequential tasks: - id: 1st type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: 2nd type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.id }}"
- id: last type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"
You can access the output of a sibling task using the syntax {{ outputs.sibling.value }}
.
For more details on capabilities, check out the Sequential Task documentation.
Parallel
This task runs tasks in parallel, making it convenient to process many tasks simultaneously.
id: parallelnamespace: company.team
tasks: - id: parallel type: io.kestra.plugin.core.flow.Parallel tasks: - id: 1st type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: 2nd type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.id }}"
- id: last type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"
You cannot access the output of a sibling task as tasks will be run in parallel.
For more task details, refer to the Parallel Task documentation.
Switch
This task conditionally runs tasks based on the value of a contextual variable.
In the following example, an input is used to decide which task to run next.
id: switchnamespace: company.team
inputs: - id: param type: BOOLEAN
tasks: - id: decision type: io.kestra.plugin.core.flow.Switch value: "{{ inputs.param }}" cases: true: - id: is_true type: io.kestra.plugin.core.log.Log message: "This is true" false: - id: is_false type: io.kestra.plugin.core.log.Log message: "This is false"
For more plugin details, refer to the Switch Task documentation.
If
This task processes a set of tasks conditionally depending on a condition.
The condition must evaluate to a boolean. Values such as 0
, -0
, null
, and ''
evaluate to false
; all other values evaluate to true
.
The else
branch is optional.
In the following example, an input is used to decide which task to run next.
id: if_conditionnamespace: company.team
inputs: - id: param type: BOOLEAN
tasks: - id: if type: io.kestra.plugin.core.flow.If condition: "{{ inputs.param }}" then: - id: when_true type: io.kestra.plugin.core.log.Log message: "This is true" else: - id: when_false type: io.kestra.plugin.core.log.Log message: "This is false"
For more details, check out the If Task documentation.
ForEach
This task executes a group of tasks for each value in the list.
In the following example, the variable is static, but it could also be generated from a previous task output, starting any number of subtasks.
id: foreach_examplenamespace: company.team
tasks: - id: for_each type: io.kestra.plugin.core.flow.ForEach values: ["value 1", "value 2", "value 3"] tasks: - id: before_if type: io.kestra.plugin.core.debug.Return format: "Before if {{ taskrun.value }}" - id: if type: io.kestra.plugin.core.flow.If condition: '{{ taskrun.value == "value 2" }}' then: - id: after_if type: io.kestra.plugin.core.debug.Return format: "After if {{ parent.taskrun.value }}"
In this execution, you can access:
- The iteration value i.e., the index of a loop (the loop index starts at 0) using the syntax
{{ taskrun.iteration }}
- The output of a sibling task using the syntax
{{ outputs.sibling[taskrun.value].value }}
This example shows how to run tasks in parallel for each value in the list. All child tasks of the parallel task run in parallel. However, due to the concurrencyLimit
property set to 2, only two parallel task groups run at any given time.
id: parallel_tasks_examplenamespace: company.team
tasks: - id: for_each type: io.kestra.plugin.core.flow.ForEach values: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] concurrencyLimit: 2 tasks: - id: parallel type: io.kestra.plugin.core.flow.Parallel tasks: - id: log type: io.kestra.plugin.core.log.Log message: Processing {{ parent.taskrun.value }} - id: shell type: io.kestra.plugin.scripts.shell.Commands commands: - sleep {{ parent.taskrun.value }}
For more information on handling outputs generated from ForEach
, check out this dedicated loop how-to guide.
For processing items, or forwarding processing to a subflow, ForEachItem is better suited.
For more details, refer to the ForEach Task documentation.
ForEachItem
This task iterates over a list of items and runs a subflow for each item, or for each batch of items.
- id: each type: io.kestra.plugin.core.flow.ForEachItem items: "{{ inputs.file }}" # could be also an output variable {{ outputs.extract.uri }} inputs: file: "{{ taskrun.items }}" # items of the batch batch: rows: 4 namespace: company.team flowId: subflow revision: 1 # optional (default: latest) wait: true # wait for the subflow execution transmitFailed: true # fail the task run if the subflow execution fails labels: # optional labels to pass to the subflow to be executed key: value
This executes the subflow company.team.subflow
for each batch of items.
To pass the batch of items to a subflow, you can use inputs. The example above uses an input of FILE
type called file
that takes the URI of an internal storage file containing the batch of items.
The next example shows you how to access the outputs from each subflow executed. The ForEachItem automatically merges the URIs of the outputs from each subflow into a single file. The URI of this file is available through the subflowOutputs
output.
id: for_each_itemnamespace: company.team
tasks: - id: generate type: io.kestra.plugin.scripts.shell.Script script: | for i in $(seq 1 10); do echo "$i" >> data; done outputFiles: - data
- id: for_each_item type: io.kestra.plugin.core.flow.ForEachItem items: "{{ outputs.generate.outputFiles.data }}" batch: rows: 4 wait: true flowId: my_subflow namespace: company.team inputs: value: "{{ taskrun.items }}"
- id: for_each_outputs type: io.kestra.plugin.core.log.Log message: "{{ outputs.forEachItem_merge.subflowOutputs }}" # Log the URI of the file containing the URIs of the outputs from each subflow
For more details, refer to the ForEachItem Task documentation.
ForEach
vs ForEachItem
Both ForEach
and ForEachItem
are similar, but there are specific use cases that suit one over the other:
ForEach
generates a lot of Task Runs which can impact performance.ForEachItem
generates separate executions using Subflows for the group of tasks. This scales better for larger datasets.
Read more about performance optimization in our best practices guides.
AllowFailure
This task allows child tasks to fail.
If any child task fails:
- The
AllowFailure
task is marked with statusWARNING
. - All child tasks inside
AllowFailure
stop immediately. - The execution continues for all other tasks.
- At the end, the execution as a whole is marked as status
WARNING
.
In the following example:
allow_failure
will be labelled asWARNING
.ko
will be labelled asFAILED
.next
will not be run.end
will be run and labelledSUCCESS
.
id: eachnamespace: company.team
tasks: - id: allow_failure type: io.kestra.plugin.core.flow.AllowFailure tasks: - id: ko type: io.kestra.plugin.core.execution.Fail - id: next type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"
- id: end type: io.kestra.plugin.core.debug.Return format: "{{ task.id }} > {{ taskrun.startDate }}"
For more details, refer to the AllowFailure Task documentation.
Fail
This task fails the flow; it can be used with or without conditions.
Without conditions, it can be used, for example, to fail on some switch value.
id: fail_on_switchnamespace: company.team
inputs: - id: param type: STRING required: true
tasks: - id: switch type: io.kestra.plugin.core.flow.Switch value: "{{ inputs.param }}" cases: case1: - id: case1 type: io.kestra.plugin.core.log.Log message: Case 1 case2: - id: case2 type: io.kestra.plugin.core.log.Log message: Case 2 notexist: - id: fail type: io.kestra.plugin.core.execution.Fail default: - id: default type: io.kestra.plugin.core.log.Log message: default
With conditions, it can be used, for example, to validate inputs.
id: fail_on_conditionnamespace: company.team
inputs: - id: param type: STRING required: true
tasks: - id: before type: io.kestra.plugin.core.log.Log message: "I'm before the fail on condition" - id: fail type: io.kestra.plugin.core.execution.Fail condition: "{{ inputs.param == 'fail' }}" - id: after type: io.kestra.plugin.core.log.Log message: "I'm after the fail on condition"
For more information, refer to the Fail Task documentation.
Subflow
This task triggers another flow. This enables you to decouple the first flow from the second and monitor each flow individually.
You can pass flow outputs as inputs to the triggered subflow (those must be declared in the subflow).
id: subflownamespace: company.team
tasks: - id: "subflow" type: io.kestra.plugin.core.flow.Subflow namespace: company.team flowId: my-subflow inputs: file: "{{ inputs.myFile }}" store: 12
For more details, refer to the Subflow Task documentation.
WorkingDirectory
By default, Kestra launches each task in a new working directory, possibly on different workers if multiple ones exist.
The example below runs all tasks nested under the WorkingDirectory
task sequentially in the same directory, allowing downstream tasks to reuse output files from previous ones.” In order to share a working directory, all tasks nested under the WorkingDirectory
task are launched on the same worker.
This task can be particularly useful for compute-intensive file system operations.
id: working_dir_flownamespace: company.team
tasks: - id: working_dir type: io.kestra.plugin.core.flow.WorkingDirectory tasks: - id: first type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - 'echo "{{ taskrun.id }}" > {{ workingDir }}/stay.txt'
- id: second type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - | echo '::{"outputs": {"stay":"'$(cat {{ workingDir }}/stay.txt)'"}}::'
This task can also cache files inside the working directory, for example, to cache script dependencies like the node_modules
of a node Script
task.
id: node_with_cachenamespace: company.team
tasks: - id: working_dir type: io.kestra.plugin.core.flow.WorkingDirectory cache: patterns: - node_modules/** ttl: PT1H tasks: - id: script type: io.kestra.plugin.scripts.node.Script beforeCommands: - npm install colors script: | const colors = require("colors"); console.log(colors.red("Hello"));
This task can also fetch files from namespace files and make them available to all child tasks.
id: node_with_cachenamespace: company.team
tasks: - id: working_dir type: io.kestra.plugin.core.flow.WorkingDirectory namespaceFiles: enabled: true include: - dir1/*.* exclude: - dir2/*.* tasks: - id: shell type: io.kestra.plugin.scripts.shell.Commands commands: - cat dir1/file1.txt
Pause
Kestra flows run until all tasks complete, but sometimes you need to:
- Add a manual validation before continuing the execution
- Wait for some duration before continuing the execution
For this, you can use the Pause task.
In the following example, the validation
task pauses until it is manually resumed, while the wait
task pauses for 5 minutes.
id: pausenamespace: company.team
tasks: - id: validation type: io.kestra.plugin.core.flow.Pause tasks: - id: ok type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - 'echo "started after manual validation"'
- id: wait type: io.kestra.plugin.core.flow.Pause delay: PT5M tasks: - id: waited type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - 'echo "start after 5 minutes"'
A Pause task without delay waits indefinitely until the task state is changed to Running. For this: go to the Gantt tab of the Execution page, click on the task, select Change status on the contextual menu, and select Mark as RUNNING on the form. This makes the task run until its end. For more details, refer to the Pause Task documentation.
DAG
This task allows defining dependencies between tasks by creating a directed acyclic graph (DAG). Instead of an explicit DAG structure, this task defines dependencies for each task using the dependsOn
property. This way, you can set dependencies more implicitly for each task, and Kestra figures out the overall flow structure.
id: dagnamespace: company.teamtasks: - id: dag description: "my task" type: io.kestra.plugin.core.flow.Dag tasks: - task: id: task1 type: io.kestra.plugin.core.log.Log message: I'm the task 1 - task: id: task2 type: io.kestra.plugin.core.log.Log message: I'm the task 2 dependsOn: - task1 - task: id: task3 type: io.kestra.plugin.core.log.Log message: I'm the task 3 dependsOn: - task1 - task: id: task4 type: io.kestra.plugin.core.log.Log message: I'm the task 4 dependsOn: - task2 - task: id: task5 type: io.kestra.plugin.core.log.Log message: I'm the task 5 dependsOn: - task4 - task3
For more details, refer to the Dag Task documentation.
Template (deprecated)
Templates are lists of tasks that can be shared between flows. You can define a template and call it from other flows, allowing them to share a list of tasks and keep these tasks updated without changing your flow.
The following example uses the Template task to use a template.
id: templatenamespace: company.team
tasks: - id: template type: io.kestra.plugin.core.flow.Template namespace: company.team templateId: template