11 Using the Kafka Connect Handler

Learn how to use the Kafka Connect Handler, which is an extension of the standard Kafka messaging functionality.

Topics:

11.1 Overview

The Oracle GoldenGate Kafka Connect is an extension of the standard Kafka messaging functionality. Kafka Connect is a functional layer on top of the standard Kafka Producer and Consumer interfaces. It provides standardization for messaging to make it easier to add new source and target systems into your topology.

Confluent is a primary adopter of Kafka Connect and their Confluent Platform offering includes extensions over the standard Kafka Connect functionality. This includes Avro serialization and deserialization, and an Avro schema registry. Much of the Kafka Connect functionality is available in Apache Kafka. A number of open source Kafka Connect integrations are found at:

https://www.confluent.io/product/connectors/

The Kafka Connect Handler is a Kafka Connect source connector. You can capture database changes from any database supported by Oracle GoldenGate and stream that change of data through the Kafka Connect layer to Kafka. You can also connect to Oracle Event Hub Cloud Services (EHCS) with this handler.

Kafka Connect uses proprietary objects to define the schemas (org.apache.kafka.connect.data.Schema) and the messages (org.apache.kafka.connect.data.Struct). The Kafka Connect Handler can be configured to manage what data is published and the structure of the published data.

The Kafka Connect Handler does not support any of the pluggable formatters that are supported by the Kafka Handler.

Topics:

11.2 Detailed Functionality

JSON Converter

The Kafka Connect framework provides converters to convert in-memory Kafka Connect messages to a serialized format suitable for transmission over a network. These converters are selected using configuration in the Kafka Producer properties file.

Kafka Connect and the JSON converter is available as part of the Apache Kafka download. The JSON Converter converts the Kafka keys and values to JSONs which are then sent to a Kafka topic. You identify the JSON Converters with the following configuration in the Kafka Producer properties file:

key.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=true 
value.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter.schemas.enable=true

The format of the messages is the message schema information followed by the payload information. JSON is a self describing format so you should not include the schema information in each message published to Kafka.

To omit the JSON schema information from the messages set the following:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

Avro Converter

Confluent provides Kafka installations, support for Kafka, and extended functionality built on top of Kafka to help realize the full potential of Kafka. Confluent provides both open source versions of Kafka (Confluent Open Source) and an enterprise edition (Confluent Enterprise), which is available for purchase.

A common Kafka use case is to send Avro messages over Kafka. This can create a problem on the receiving end as there is a dependency for the Avro schema in order to deserialize an Avro message. Schema evolution can increase the problem because received messages must be matched up with the exact Avro schema used to generate the message on the producer side. Deserializing Avro messages with an incorrect Avro schema can cause runtime failure, incomplete data, or incorrect data. Confluent has solved this problem by using a schema registry and the Confluent schema converters.

The following shows the configuration of the Kafka Producer properties file.

key.converter=io.confluent.connect.avro.AvroConverter 
value.converter=io.confluent.connect.avro.AvroConverter 
key.converter.schema.registry.url=http://localhost:8081 
value.converter.schema.registry.url=http://localhost:8081 

When messages are published to Kafka, the Avro schema is registered and stored in the schema registry. When messages are consumed from Kafka, the exact Avro schema used to create the message can be retrieved from the schema registry to deserialize the Avro message. This creates matching of Avro messages to corresponding Avro schemas on the receiving side, which solves this problem.

Following are the requirements to use the Avro Converters:

  • This functionality is available in both versions of Confluent Kafka (open source or enterprise).

  • The Confluent schema registry service must be running.

  • Source database tables must have an associated Avro schema. Messages associated with different Avro schemas must be sent to different Kafka topics.

  • The Confluent Avro converters and the schema registry client must be available in the classpath.

The schema registry keeps track of Avro schemas by topic. Messages must be sent to a topic that has the same schema or evolving versions of the same schema. Source messages have Avro schemas based on the source database table schema so Avro schemas are unique for each source table. Publishing messages to a single topic for multiple source tables will appear to the schema registry that the schema is evolving every time the message sent from a source table that is different from the previous message.

11.3 Setting Up and Running the Kafka Connect Handler

Instructions for configuring the Kafka Connect Handler components and running the handler are described in this section.

Classpath Configuration

Two things must be configured in the gg.classpath configuration variable so that the Kafka Connect Handler can to connect to Kafka and run. The required items are the Kafka Producer properties file and the Kafka client JARs. The Kafka client JARs must match the version of Kafka that the Kafka Connect Handler is connecting to. For a listing of the required client JAR files by version, see Kafka Handler Client Dependencies Kafka Connect Handler Client Dependencies. The recommended storage location for the Kafka Producer properties file is the Oracle GoldenGate dirprm directory.

The default location of the Kafka Connect client JARs is the Kafka_Home/libs/* directory.

The gg.classpath variable must be configured precisely. Pathing to the Kafka Producer properties file should contain the path with no wildcard appended. The inclusion of the asterisk (*) wildcard in the path to the Kafka Producer properties file causes it to be discarded. Pathing to the dependency JARs should include the * wildcard character to include all of the JAR files in that directory in the associated classpath. Do not use *.jar.

Following is an example of a correctly configured Apache Kafka classpath:

gg.classpath=dirprm:{kafka_install_dir}/libs/*

Following is an example of a correctly configured Confluent Kafka classpath:

gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*

Topics:

11.3.1 Kafka Connect Handler Configuration

The following are the configurable values for the Kafka Connect Handler. These properties are located in the Java Adapter properties file (not in the Replicat properties file).

To enable the selection of the Kafka Connect Handler, you must first configure the handler type by specifying gg.handler.jdbc.type=kafkaconnect and the other Kafka Connect properties as follows:

Table 11-1 Kafka Connect Handler Configuration Properties

Properties Required/ Optional Legal Values Default Explanation
gg.handler.name.type

Required

kafkaconnect

None

The configuration to select the Kafka Connect Handler.

gg.handler.name.kafkaProducerConfigFile

Required

string

None

A path to a properties file containing the properties of the Kafka and Kafka Connect configuration properties.

gg.handler.name.topicMappingTemplate

Required

A template string value to resolve the Kafka topic name at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.keyMappingTemplate

Required

A template string value to resolve the Kafka message key at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.includeTableName

Optional

true | false

true

Set to true to create a field in the output messages called “table” for which the value is the fully qualified table name.

Set to false to omit this field in the output.

gg.handler.name.includeOpType

Optional

true | false

true

Set to true to create a field in the output messages called op_type for which the value is is an indicator of the type of source database operation (for example, I for insert, U for update, and Dfor delete). Set to false to omit this field in the output.

gg.handler.name.includeOpTimestamp

Optional

true | false

true

Set to true to create a field in the output messages called op_ts for which the value is the operation timestamp (commit timestamp) from the source trail file.

Set to false to omit this field in the output.

gg.handler.name.includeCurrentTimestamp

Optional

true | false

true

Set to true to create a field in the output messages called current_ts for which the value is the current timestamp of when the handler processes the operation.

Set to false to omit this field in the output.

gg.handler.name.includePosition

Optional

true | false

true

Set to true to create a field in the output messages called pos for which the value is the position (sequence number + offset) of the operation from the source trail file.

Set to false to omit this field in the output.

gg.handler.name.includePrimaryKeys

Optional

true | false

false

Set to true to include a field in the message called primary_keys and the value of which is an array of the column names of the primary key columns.

Set to false to suppress this field.

gg.handler.name.includeTokens

Optional

true | false

false

Set to true to include a map field in output messages. The key is tokens and the value is a map where the keys and values are the token keys and values from the Oracle GoldenGate source trail file.

Set to false to suppress this field.

gg.handler.name.messageFormatting

Optional

row | op

row

Controls how output messages are modeled. Selecting row and the output messages will be modeled as row. Set to op and the output messages will be modeled as operations messages.

gg.handler.name.insertOpKey

Optional

any string

I

The value of the field op_type to indicate an insert operation.

gg.handler.name.updateOpKey

Optional

any string

U

The value of the field op_type to indicate an insert operation.

gg.handler.name.deleteOpKey

Optional

any string

D

The value of the field op_type to indicate a delete operation.

gg.handler.name.truncateOpKey

Optional

any string

T

The value of the field op_type to indicate a truncate operation.

gg.handler.name.treatAllColumnsAsStrings

Optional

true | false

false

Set to true to treat all output fields as strings. Set to false and the Handler will map the corresponding field type from the source trail file to the best corresponding Kafka Connect data type.

gg.handler.name.mapLargeNumbersAsStrings

Optional

true | false

false

Large numbers are mapping to number fields as Doubles. It is possible to lose precision in certain scenarios.

If set to true these fields will be mapped as Strings in order to preserve precision.

gg.handler.name.iso8601Format

Optional

True | False

false

Set to true to output the current date in the ISO8601 format.

gg.handler.name.pkUpdateHandling

Optional

insert | abend | update | delete

abend

Only applicable if modeling row messages gg.handler.name.messageFormatting=row. Not applicable if modeling operations messages as the before and after images are propagated to the message in the case of an update.

See Using Templates to Resolve the Stream Name and Partition Name for more information.

Review a Sample Configuration

gg.handlerlist=kafkaconnect

#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
gg.handler.kafkaconnect.mode=op
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkaconnect.topicMappingTemplate=$

{fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=$

{primaryKeys}
#The formatter properties
gg.handler.kafkaconnect.messageFormatting=row
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.iso8601Format=false
gg.handler.kafkaconnect.pkUpdateHandling=abend
gg.handler.kafkaconnect.includeTableName=true
gg.handler.kafkaconnect.includeOpType=true
gg.handler.kafkaconnect.includeOpTimestamp=true
gg.handler.kafkaconnect.includeCurrentTimestamp=true
gg.handler.kafkaconnect.includePosition=true
gg.handler.kafkaconnect.includePrimaryKeys=false
gg.handler.kafkaconnect.includeTokens=false

11.3.2 Using Templates to Resolve the Topic Name and Message Key

The Kafka Connect Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. Templates are applicable to the following configuration parameters:

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

Template Modes

The Kafka Connect Handler can only send operation messages. The Kafka Connect Handler cannot group operation messages into a larger transaction message.

Template Keywords

Keyword Explanation

${fullyQualifiedTableName}

Resolves to the fully qualified table name including the period (.) delimiter between the catalog, schema, and table names.

For example, test.dbo.table1.

${catalogName}

Resolves to the catalog name.

${schemaName}

Resolves to the schema name.

${tableName}

Resolves to the short table name.

${opType}

Resolves to the type of the operation: (INSERT, UPDATE, DELETE, or TRUNCATE)

${primaryKeys}

Resolves to the concatenated primary key values delimited by an underscore (_) character.

${position}

The sequence number of the source trail file followed by the offset (RBA).

${opTimestamp}

The operation timestamp from the source trail file.

${emptyString}

Resolves to “”.

${groupName}

Resolves to the name of the Replicat process. If using coordinated delivery, it resolves to the name of the Replicat process with the Replicate thread number appended.

${staticMap[]}

Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square brace in the following format:

${staticMap[dbo.table1=value1,dbo.table2=value2]}

${columnValue[]}

Resolves to a column value where the key is the fully-qualified table name and the value is the column name to be resolved. For example:

${staticMap[dbo.table1=col1,dbo.table2=col2]}

${currentTimestamp}

Or

${currentTimestamp[]}

Resolves to the current timestamp. You can control the format of the current timestamp using the Java based formatting as described in the SimpleDateFormat class, see https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html.

Examples:

${currentDate}
${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

${null}

Resolves to a NULL string.

${custom[]}

It is possible to write a custom value resolver. If required, contact Oracle Support.

Example Templates

The following describes example template configuration values and the resolved values.

Example Template Resolved Value

${groupName}_{fullyQualfiedTableName}

KAFKA001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

11.3.3 Configuring Security in the Kafka Connect Handler

Kafka version 0.9.0.0 introduced security through SSL/TLS or Kerberos. The Kafka Connect Handler can be secured using SSL/TLS or Kerberos. The Kafka producer client libraries provide an abstraction of security functionality from the integrations utilizing those libraries. The Kafka Connect Handler is effectively abstracted from security functionality. Enabling security requires setting up security for the Kafka cluster, connecting machines, and then configuring the Kafka Producer properties file, that the Kafka Handler uses for processing, with the required security properties.

You may encounter the inability to decrypt the Kerberos password from the keytab file. This causes the Kerberos authentication to fall back to interactive mode which cannot work because it is being invoked programmatically. The cause of this problem is that the Java Cryptography Extension (JCE) is not installed in the Java Runtime Environment (JRE). Ensure that the JCE is loaded in the JRE, see http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html.

11.4 Kafka Connect Handler Performance Considerations

There are multiple configuration settings both for the Oracle GoldenGate for Big Data configuration and in the Kafka producer which affect performance.

The Oracle GoldenGate parameter have the greatest affect on performance is the Replicat GROUPTRANSOPS parameter. The GROUPTRANSOPS parameter allows Replicat to group multiple source transactions into a single target transaction. At transaction commit, the Kafka Connect Handler calls flush on the Kafka Producer to push the messages to Kafka for write durability followed by a checkpoint. The flush call is an expensive call and setting the Replicat GROUPTRANSOPS setting to larger amount allows the replicat to call the flush call less frequently thereby improving performance.

The default setting for GROUPTRANSOPS is 1000 and performance improvements can be obtained by increasing the value to 2500, 5000, or even 10000.

The Op mode gg.handler.kafkaconnect.mode=op parameter can also improve performance than the Tx mode gg.handler.kafkaconnect.mode=tx.

A number of Kafka Producer properties can affect performance. The following are the parameters with significant impact:

  • linger.ms

  • batch.size

  • acks

  • buffer.memory

  • compression.type

Oracle recommends that you start with the default values for these parameters and perform performance testing to obtain a base line for performance. Review the Kafka documentation for each of these parameters to understand its role and adjust the parameters and perform additional performance testing to ascertain the performance effect of each parameter.

11.5 Troubleshooting the Kafka Connect Handler

Topics:

11.5.1 Java Classpath for Kafka Connect Handler

Issues with the Java classpath are one of the most common problems. The indication of a classpath problem is a ClassNotFoundException in the Oracle GoldenGate Java log4j log file or and error while resolving the classpath if there is a typographic error in the gg.classpath variable.

The Kafka client libraries do not ship with the Oracle GoldenGate for Big Data product. You are required to obtain the correct version of the Kafka client libraries and to properly configure the gg.classpath property in the Java Adapter Properties file to correctly resolve the Java the Kafka client libraries as described in Setting Up and Running the Kafka Connect Handler.

11.5.2 Invalid Kafka Version

Kafka Connect was introduced in Kafka 0.9.0.0 version. The Kafka Connect Handler does not work with Kafka versions 0.8.2.2 and older. Attempting to use Kafka Connect with Kafka 0.8.2.2 version typically results in a ClassNotFoundException error at runtime.

11.5.3 Kafka Producer Properties File Not Found

Typically, the following exception message occurs:

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

Verify that the gg.handler.kafkahandler.KafkaProducerConfigFile configuration property for the Kafka Producer Configuration file name is set correctly.

Ensure that the gg.classpath variable includes the path to the Kafka Producer properties file and that the path to the properties file does not contain a * wildcard at the end.

11.5.4 Kafka Connection Problem

Typically, the following exception message appears:

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] 

WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1  java.net.ConnectException: Connection refused

When this occurs, the connection retry interval expires and the Kafka Connection Handler process abends. Ensure that the Kafka Brokers are running and that the host and port provided in the Kafka Producer properties file is correct.

Network shell commands (such as, netstat -l) can be used on the machine hosting the Kafka broker to verify that Kafka is listening on the expected port.