Consume ​Consume

yaml
type: "io.kestra.plugin.kafka.consume"
yaml
id: kafka_consume
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    topic: test_kestra
    properties:
      bootstrap.servers: localhost:9092
    serdeProperties:
      schema.registry.url: http://localhost:8085
    keyDeserializer: STRING
    valueDeserializer: AVRO

yaml
id: kafka_consume
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    properties:
      security.protocol: SSL
      bootstrap.servers: localhost:19092
      ssl.key.password: my-ssl-password
      ssl.keystore.type: PKCS12
      ssl.keystore.location: my-base64-encoded-keystore
      ssl.keystore.password: my-ssl-password
      ssl.truststore.location: my-base64-encoded-truststore
      ssl.truststore.password: my-ssl-password
    topic:
      - kestra_workerinstance
    keyDeserializer: STRING
    valueDeserializer: STRING

yaml
id: consume-kafka-messages
namespace: company.team

tasks:
  - id: consume
    type: io.kestra.plugin.kafka.Consume
    topic: topic_test
    properties:
      bootstrap.servers: localhost:9093
      auto.offset.reset: earliest
    pollDuration: PT20S
    maxRecords: 50
    keyDeserializer: STRING
    valueDeserializer: JSON

  - id: write_json
    type: io.kestra.plugin.serdes.json.IonToJson
    newLine: true
    from: "{{ outputs.consume.uri }}"
Properties
SubType string
Default STRING
Possible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSON
Format duration
SubType integer
Default PT5S
Format duration
SubType string
Default {}
Default STRING
Possible Values
STRINGINTEGERFLOATDOUBLELONGSHORTBYTE_ARRAYBYTE_BUFFERBYTESUUIDVOIDAVROJSON
Format uri
Default SKIPPED
Possible Values
SKIPPEDDLQSTORE