Consume
yaml
type: "io.kestra.plugin.kafka.consume"
Examples
yaml
id: kafka_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
yaml
id: kafka_consume
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
properties:
security.protocol: SSL
bootstrap.servers: localhost:19092
ssl.key.password: my-ssl-password
ssl.keystore.type: PKCS12
ssl.keystore.location: my-base64-encoded-keystore
ssl.keystore.password: my-ssl-password
ssl.truststore.location: my-base64-encoded-truststore
ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING
yaml
id: consume-kafka-messages
namespace: company.team
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
topic: topic_test
properties:
bootstrap.servers: localhost:9093
auto.offset.reset: earliest
pollDuration: PT20S
maxRecords: 50
keyDeserializer: STRING
valueDeserializer: JSON
- id: write_json
type: io.kestra.plugin.serdes.json.IonToJson
newLine: true
from: "{{ outputs.consume.uri }}"
Properties
properties *Requiredobject
SubType string
groupId string
keyDeserializer string
Default
STRING
Possible Values
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
maxDuration string
Format
duration
maxRecords integerstring
onSerdeError Non-dynamicKafkaConsumerInterface-OnSerdeError
partitions array
SubType integer
pollDuration string
Default
PT5S
Format
duration
serdeProperties object
SubType string
Default
{}
since string
topic Non-dynamicobject
topicPattern string
valueDeserializer string
Default
STRING
Possible Values
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
Outputs
messagesCount integer
uri string
Format
uri
Definitions
io.kestra.plugin.kafka.KafkaConsumerInterface-OnSerdeError
topic string
type string
Default
SKIPPED
Possible Values
SKIPPED
DLQ
STORE