Kafka Connector

Kafka Connector

Kafka sources in mappings

Kafka sources in mappings

To read messages from a Kafka topic, configure a Kafka object as the Source transformation in a mapping.
Specify the name and description of the Kafka source. Configure the source and advanced properties for the source object.
The following table describes the source properties that you can configure for a Kafka source:
Property
Description
Connection
Name of the active Kafka source connection.
Source Type
Type of the Kafka source objects available.
Select
Single Object
. You cannot read data from multiple objects.
To define a parameter, select
Parameter
as the source type, and then specify the parameter in the Parameter property.
When you define a parameter, you must import the Kafka source in Binary or JSON format.
Object
Name of the Kafka source object based on the source type selected.
Parameter
Select a parameter for the source object, or click
New Parameter
to define a new parameter for the source object.
The
Parameter
property appears only if you select Parameter as the source type.
Format
The file format to read data from a Kafka topic.
You can select from the following file format types:
  • None. Reads data from the Kafka topics in binary format.
  • Avro. Reads data from the Kafka topics in Avro format that use the binary encoding type.
  • JSON. Reads data from the Kafka topics in JSON format.
  • Discover Structure². Determines the underlying patterns in a sample file and auto-generates a model for files with the same data and structure.
To configure the formatting options for the file, click
Formatting Options
. For more information about format options, see Formatting options for Kafka topics.
You can read Snappy compressed records from the broker when you run the mapping in advanced mode.
Preview Data
Preview the data of the Kafka source object.
When you preview data, Data Integration displays the first 10 rows.
By default, Data Integration displays the fields in native order. To display the fields in alphabetical order, enable the
Display source fields in alphabetical order
option.
Intelligent Structure Model²
Applicable to the Discover Structure format type. Select the intelligent structure model to determine the underlying patterns and structures of the input that you provide for the model and create a model that can be used to transform, parse, and generate output groups.
Select one of the following options to associate a model with the transformation:
  • Select. Select an existing model.
  • New. Create a new model. Select
    Design New
    to create the model. Select
    Auto-generate from sample file
    for Intelligent Structure Discovery to generate a model based on sample input that you select.
For more information, see
Components
.
¹ Doesn't apply to mappings in advanced mode.
² Applies only to mappings in advanced mode.
The following table describes the advanced properties that you can configure for a Kafka source:
Property
Description
Start position offset
The position of the Kafka consumer from where the Kafka Connector starts reading Kafka messages from a Kafka topic.
You can select one of the following options:
  • Custom. Read messages from a specific time.
  • Earliest. Read all the messages available on the Kafka topic from the beginning.
  • Latest¹. Read messages received by the Kafka topic after the mapping has been deployed.
Connector Mode
Specifies the mode to read data from the Kafka source.
You can select one of the following modes:
  • Batch. The Secure Agent reads the messages available in a Kafka topic based on the position offset you specify. After the Secure Agent reads the data, the mapping terminates. When you configure a mapping for recovery, the Secure Agent reads data from the last checkpoint and stops reading data till the offset when the mapping recovery started.
  • Realtime¹. The Secure Agent reads the messages available in a Kafka topic in real-time. The Secure Agent continues to read messages from the Kafka topic and does not terminate the mapping. You must manually terminate the mapping task.
Custom Start Position Timestamp
The time in GMT from when the Secure Agent starts reading Kafka messages from a Kafka topic.
Specify the time in the following format:
yyyy-mm-dd hh:mm:ss[.fff]
The milliseconds are optional.
This property applies only when you set the Start position offset property to
Custom
.
You can parameterize this property using an in-out or input parameter in a mapping in advanced mode.
Specify the parameter in the following format:
  • In-out parameter:
    $$<parameter_name>
  • Input parameter:
    $<parameter_name>$
Topic Pattern
A regular expression pattern for the topic name that you want to read from.
Use the regular expression syntax guidelines to specify the pattern.
Polling Interval
The time interval, in milliseconds, that the agent waits before retrieving records from the Kafka broker.
You can enter a positive integer value. The default is 1000 milliseconds.
The polling interval applies only to the mapping in which you set the value.
To set the polling interval for all mappings within the organization, you need to set the
pollIntervalInMilliseconds
property as a JVM option under the DTM type in the Secure Agent properties.
Consumer Configuration Properties
The configuration properties for the Kafka consumer.
These properties override the parameters you specified in the
Additional Connection Properties
or
Additional Security Properties
fields in the Kafka connection.
For example, you can configure the
group.id
consumer property at source to enable incremental load when you read from Kafka topics in a mapping in advanced mode.
For more information about incremental load and how to configure the
group.id
consumer property, see Configure incremental load to read from Kafka topics.
For more information about Kafka consumer configuration properties, see the Kafka documentation.
¹ Doesn't apply to mappings in advanced mode.
You can set the tracing level in the advanced properties to determine the amount of details that logs contain for a mapping. Tracing level is not applicable to mappings in advanced mode.
The following table describes the tracing levels that you can configure:
Property
Description
Terse
The Secure Agent logs initialization information, error messages, and notification of rejected data.
Normal
The Secure Agent logs initialization and status information, errors encountered, and skipped rows due to transformation row errors. Summarizes mapping results, but not at the level of individual rows.
Verbose Initialization
In addition to normal tracing, the Secure Agent logs additional initialization details, names of index and data files used, and detailed transformation statistics.
Verbose Data
In addition to verbose initialization tracing, the Secure Agent logs each row that passes into the mapping. Also notes where the Secure Agent truncates string data to fit the precision of a column and provides detailed transformation statistics.
When you configure the tracing level to verbose data, the Secure Agent writes row data for all rows in a block when it processes a transformation.

0 COMMENTS

We’d like to hear from you!