yaml
type: "io.kestra.plugin.core.flow.foreachitem"
Examples
yaml
id: orders_parallel
namespace: company.team
tasks:
- id: extract
type: io.kestra.plugin.jdbc.duckdb.Query
sql: |
INSTALL httpfs;
LOAD httpfs;
SELECT *
FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
store: true
- id: each
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.extract.uri }}"
batch:
rows: 1
namespace: company.team
flowId: orders
wait: true # wait for the subflow execution
transmitFailed: true # fail the task run if the subflow execution fails
inputs:
order: "{{ taskrun.items }}" # special variable that contains the items of the batch
yaml
id: iterate_over_json
namespace: company.team
tasks:
- id: download
type: io.kestra.plugin.core.http.Download
uri: "https://api.restful-api.dev/objects"
contentType: application/json
method: GET
failOnEmptyResponse: true
timeout: PT15S
- id: json_to_ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: "{{ outputs.download.uri }}"
newLine: false # regular json
- id: ion_to_jsonl
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ outputs.json_to_ion.uri }}"
newLine: true # JSON-L
- id: for_each_item
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.ion_to_jsonl.uri }}"
batch:
rows: 1
namespace: company.team
flowId: mysubflow
wait: true
transmitFailed: true
inputs:
json: "{{ json(read(taskrun.items)) }}"
yaml
id: process_files
namespace: company.team
tasks:
- id: loop_over_files
type: io.kestra.plugin.core.flow.ForEach
values: "{{ trigger.objects | jq('.[].uri') }}"
tasks:
- id: subflow_per_batch
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ trigger.uris[parent.taskrun.value] }}"
batch:
rows: 1
flowId: process_batch
namespace: company.team
wait: true
transmitFailed: true
inputs:
data: "{{ taskrun.items }}"
triggers:
- id: s3
type: io.kestra.plugin.aws.s3.Trigger
interval: "PT1S"
accessKeyId: "<access-key>"
secretKeyId: "<secret-key>"
region: "us-east-1"
bucket: "my_bucket"
prefix: "sub-dir"
action: NONE
Properties
flowId *Requiredstring
Min length
1
items *Requiredstring
Min length
1
namespace *Requiredstring
Min length
1
batch Non-dynamicForEachItem-Batch
Default
{
"rows": "1",
"separator": "\n"
}
inheritLabels Non-dynamicboolean
Default
false
inputs object
labels arrayobject
restartBehavior Non-dynamicstring
Default
RETRY_FAILED
Possible Values
NEW_EXECUTION
RETRY_FAILED
revision Non-dynamicinteger
scheduleDate string
Format
date-time
transmitFailed Non-dynamicboolean
Default
true
wait Non-dynamicboolean
Default
true
Definitions
io.kestra.plugin.core.flow.ForEachItem-Batch
bytes string
partitions integerstring
rows integerstring
Default
1
separator string
Default
\n