ForEachItem ​For​Each​Item

yaml
type: "io.kestra.plugin.core.flow.foreachitem"
yaml
id: orders_parallel
namespace: company.team

tasks:
  - id: extract
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      INSTALL httpfs;
      LOAD httpfs;
      SELECT *
      FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
    store: true

  - id: each
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.extract.uri }}"
    batch:
      rows: 1
    namespace: company.team
    flowId: orders
    wait: true # wait for the subflow execution
    transmitFailed: true # fail the task run if the subflow execution fails
    inputs:
      order: "{{ taskrun.items }}" # special variable that contains the items of the batch

yaml
id: iterate_over_json
namespace: company.team

tasks:
  - id: download
    type: io.kestra.plugin.core.http.Download
    uri: "https://api.restful-api.dev/objects"
    contentType: application/json
    method: GET
    failOnEmptyResponse: true
    timeout: PT15S

  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: "{{ outputs.download.uri }}"
    newLine: false # regular json

  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L

  - id: for_each_item
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.ion_to_jsonl.uri }}"
    batch:
      rows: 1
    namespace: company.team
    flowId: mysubflow
    wait: true
    transmitFailed: true
    inputs:
      json: "{{ json(read(taskrun.items)) }}"

yaml
id: process_files
namespace: company.team

tasks:
  - id: loop_over_files
    type: io.kestra.plugin.core.flow.ForEach
    values: "{{ trigger.objects | jq('.[].uri') }}"
    tasks:
      - id: subflow_per_batch
        type: io.kestra.plugin.core.flow.ForEachItem
        items: "{{ trigger.uris[parent.taskrun.value] }}"
        batch:
          rows: 1
        flowId: process_batch
        namespace: company.team
        wait: true
        transmitFailed: true
        inputs:
          data: "{{ taskrun.items }}"

triggers:
  - id: s3
    type: io.kestra.plugin.aws.s3.Trigger
    interval: "PT1S"
    accessKeyId: "<access-key>"
    secretKeyId: "<secret-key>"
    region: "us-east-1"
    bucket: "my_bucket"
    prefix: "sub-dir"
    action: NONE
Properties
Min length 1
Min length 1
Min length 1
Default { "rows": "1", "separator": "\n" }
SubType
SubType
Default false
Default RETRY_FAILED
Possible Values
NEW_EXECUTIONRETRY_FAILED
Format date-time
Default true
Default true
Default 1
Default \n