Learn how to use the Kafka Connect Handler, which is an extension of the standard Kafka messaging functionality.
Topics:
The Oracle GoldenGate Kafka Connect is an extension of the standard Kafka messaging functionality. Kafka Connect is a functional layer on top of the standard Kafka Producer and Consumer interfaces. It provides standardization for messaging to make it easier to add new source and target systems into your topology.
Confluent is a primary adopter of Kafka Connect and their Confluent Platform offering includes extensions over the standard Kafka Connect functionality. This includes Avro serialization and deserialization, and an Avro schema registry. Much of the Kafka Connect functionality is available in Apache Kafka. A number of open source Kafka Connect integrations are found at:
https://www.confluent.io/product/connectors/
The Kafka Connect Handler is a Kafka Connect source connector. You can capture database changes from any database supported by Oracle GoldenGate and stream that change of data through the Kafka Connect layer to Kafka. You can also connect to Oracle Event Hub Cloud Services (EHCS) with this handler.
Kafka Connect uses proprietary objects to define the schemas (org.apache.kafka.connect.data.Schema
) and the messages (org.apache.kafka.connect.data.Struct
). The Kafka Connect Handler can be configured to manage what data is published and the structure of the published data.
The Kafka Connect Handler does not support any of the pluggable formatters that are supported by the Kafka Handler.
Topics:
Parent topic: Using the Kafka Connect Handler
JSON Converter
The Kafka Connect framework provides converters to convert in-memory Kafka Connect messages to a serialized format suitable for transmission over a network. These converters are selected using configuration in the Kafka Producer properties file.
Kafka Connect and the JSON converter is available as part of the Apache Kafka download. The JSON Converter converts the Kafka keys and values to JSONs which are then sent to a Kafka topic. You identify the JSON Converters with the following configuration in the Kafka Producer properties file:
key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=true
The format of the messages is the message schema information followed by the payload information. JSON is a self describing format so you should not include the schema information in each message published to Kafka.
To omit the JSON schema information from the messages set the following:
key.converter.schemas.enable=false value.converter.schemas.enable=false
Avro Converter
Confluent provides Kafka installations, support for Kafka, and extended functionality built on top of Kafka to help realize the full potential of Kafka. Confluent provides both open source versions of Kafka (Confluent Open Source) and an enterprise edition (Confluent Enterprise), which is available for purchase.
A common Kafka use case is to send Avro messages over Kafka. This can create a problem on the receiving end as there is a dependency for the Avro schema in order to deserialize an Avro message. Schema evolution can increase the problem because received messages must be matched up with the exact Avro schema used to generate the message on the producer side. Deserializing Avro messages with an incorrect Avro schema can cause runtime failure, incomplete data, or incorrect data. Confluent has solved this problem by using a schema registry and the Confluent schema converters.
The following shows the configuration of the Kafka Producer properties file.
key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter.schema.registry.url=http://localhost:8081
When messages are published to Kafka, the Avro schema is registered and stored in the schema registry. When messages are consumed from Kafka, the exact Avro schema used to create the message can be retrieved from the schema registry to deserialize the Avro message. This creates matching of Avro messages to corresponding Avro schemas on the receiving side, which solves this problem.
Following are the requirements to use the Avro Converters:
This functionality is available in both versions of Confluent Kafka (open source or enterprise).
The Confluent schema registry service must be running.
Source database tables must have an associated Avro schema. Messages associated with different Avro schemas must be sent to different Kafka topics.
The Confluent Avro converters and the schema registry client must be available in the classpath.
The schema registry keeps track of Avro schemas by topic. Messages must be sent to a topic that has the same schema or evolving versions of the same schema. Source messages have Avro schemas based on the source database table schema so Avro schemas are unique for each source table. Publishing messages to a single topic for multiple source tables will appear to the schema registry that the schema is evolving every time the message sent from a source table that is different from the previous message.
Parent topic: Using the Kafka Connect Handler
Instructions for configuring the Kafka Connect Handler components and running the handler are described in this section.
Classpath Configuration
Two things must be configured in the gg.classpath
configuration variable so that the Kafka Connect Handler can to connect to Kafka and run. The required items are the Kafka Producer properties file and the Kafka client JARs. The Kafka client JARs must match the version of Kafka that the Kafka Connect Handler is connecting to. For a listing of the required client JAR files by version, see Kafka Handler Client Dependencies Kafka Connect Handler Client Dependencies. The recommended storage location for the Kafka Producer properties file is the Oracle GoldenGate dirprm
directory.
The default location of the Kafka Connect client JARs is the Kafka_Home/libs/*
directory.
The gg.classpath
variable must be configured precisely. Pathing to the Kafka Producer properties file should contain the path with no wildcard appended. The inclusion of the asterisk (*) wildcard in the path to the Kafka Producer properties file causes it to be discarded. Pathing to the dependency JARs should include the * wildcard character to include all of the JAR files in that directory in the associated classpath. Do not use *.jar
.
Following is an example of a correctly configured Apache Kafka classpath:
gg.classpath=dirprm:{kafka_install_dir}/libs/*
Following is an example of a correctly configured Confluent Kafka classpath:
gg.classpath={confluent_install_dir}/share/java/kafka-serde-tools/*:{confluent_install_dir}/share/java/kafka/*:{confluent_install_dir}/share/java/confluent-common/*
Topics:
Parent topic: Using the Kafka Connect Handler
The following are the configurable values for the Kafka Connect Handler. These properties are located in the Java Adapter properties file (not in the Replicat properties file).
To enable the selection of the Kafka Connect Handler, you must first configure the handler type by specifying gg.handler.jdbc.type=kafkaconnect
and the other Kafka Connect properties as follows:
Table 11-1 Kafka Connect Handler Configuration Properties
Properties | Required/ Optional | Legal Values | Default | Explanation |
---|---|---|---|---|
gg.handler.name.type |
Required |
|
None |
The configuration to select the Kafka Connect Handler. |
gg.handler.name.kafkaProducerConfigFile |
Required |
string |
None |
A path to a properties file containing the properties of the Kafka and Kafka Connect configuration properties. |
gg.handler.name.topicMappingTemplate |
Required |
A template string value to resolve the Kafka topic name at runtime. |
None |
See Using Templates to Resolve the Topic Name and Message Key. |
gg.handler.name.keyMappingTemplate |
Required |
A template string value to resolve the Kafka message key at runtime. |
None |
See Using Templates to Resolve the Topic Name and Message Key. |
gg.handler.name.includeTableName |
Optional |
|
|
Set to Set to |
gg.handler.name.includeOpType |
Optional |
|
|
Set to |
gg.handler.name.includeOpTimestamp |
Optional |
|
|
Set to Set to |
gg.handler.name.includeCurrentTimestamp |
Optional |
|
|
Set to Set to |
gg.handler.name.includePosition |
Optional |
|
|
Set to Set to |
gg.handler.name.includePrimaryKeys |
Optional |
|
|
Set to Set to |
gg.handler.name.includeTokens |
Optional |
|
|
Set to Set to |
gg.handler.name.messageFormatting |
Optional |
|
|
Controls how output messages are modeled. Selecting row and the output messages will be modeled as row. Set to op and the output messages will be modeled as operations messages. |
gg.handler.name.insertOpKey |
Optional |
any string |
|
The value of the field |
gg.handler.name.updateOpKey |
Optional |
any string |
|
The value of the field |
gg.handler.name.deleteOpKey |
Optional |
any string |
|
The value of the field |
gg.handler.name.truncateOpKey |
Optional |
any string |
|
The value of the field |
gg.handler.name.treatAllColumnsAsStrings |
Optional |
|
|
Set to true to treat all output fields as strings. Set to false and the Handler will map the corresponding field type from the source trail file to the best corresponding Kafka Connect data type. |
gg.handler.name.mapLargeNumbersAsStrings |
Optional |
|
|
Large numbers are mapping to number fields as Doubles. It is possible to lose precision in certain scenarios. If set to |
gg.handler.name.iso8601Format |
Optional |
|
|
Set to true to output the current date in the ISO8601 format. |
gg.handler.name.pkUpdateHandling |
Optional |
insert | abend | update | delete |
|
Only applicable if modeling row messages |
See Using Templates to Resolve the Stream Name and Partition Name for more information.
Review a Sample Configuration
gg.handlerlist=kafkaconnect
#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
gg.handler.kafkaconnect.mode=op
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkaconnect.topicMappingTemplate=$
{fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=$
{primaryKeys}
#The formatter properties
gg.handler.kafkaconnect.messageFormatting=row
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.iso8601Format=false
gg.handler.kafkaconnect.pkUpdateHandling=abend
gg.handler.kafkaconnect.includeTableName=true
gg.handler.kafkaconnect.includeOpType=true
gg.handler.kafkaconnect.includeOpTimestamp=true
gg.handler.kafkaconnect.includeCurrentTimestamp=true
gg.handler.kafkaconnect.includePosition=true
gg.handler.kafkaconnect.includePrimaryKeys=false
gg.handler.kafkaconnect.includeTokens=false
Parent topic: Setting Up and Running the Kafka Connect Handler
The Kafka Connect Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. Templates are applicable to the following configuration parameters:
gg.handler.name.topicMappingTemplate gg.handler.name.keyMappingTemplate
Template Modes
The Kafka Connect Handler can only send operation messages. The Kafka Connect Handler cannot group operation messages into a larger transaction message.
Template Keywords
Keyword | Explanation |
---|---|
|
Resolves to the fully qualified table name including the period (.) delimiter between the catalog, schema, and table names. For example, |
|
Resolves to the catalog name. |
|
Resolves to the schema name. |
|
Resolves to the short table name. |
|
Resolves to the type of the operation: ( |
|
Resolves to the concatenated primary key values delimited by an underscore (_) character. |
|
The sequence number of the source trail file followed by the offset (RBA). |
|
The operation timestamp from the source trail file. |
|
Resolves to “”. |
|
Resolves to the name of the Replicat process. If using coordinated delivery, it resolves to the name of the Replicat process with the Replicate thread number appended. |
|
Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square brace in the following format: ${staticMap[dbo.table1=value1,dbo.table2=value2]} |
|
Resolves to a column value where the key is the fully-qualified table name and the value is the column name to be resolved. For example: ${staticMap[dbo.table1=col1,dbo.table2=col2]} |
Or
|
Resolves to the current timestamp. You can control the format of the current timestamp using the Java based formatting as described in the Examples: ${currentDate} ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
|
Resolves to a NULL string. |
|
It is possible to write a custom value resolver. If required, contact Oracle Support. |
Example Templates
The following describes example template configuration values and the resolved values.
Example Template | Resolved Value |
---|---|
|
|
|
|
|
|
Parent topic: Setting Up and Running the Kafka Connect Handler
Kafka version 0.9.0.0 introduced security through SSL/TLS or Kerberos. The Kafka Connect Handler can be secured using SSL/TLS or Kerberos. The Kafka producer client libraries provide an abstraction of security functionality from the integrations utilizing those libraries. The Kafka Connect Handler is effectively abstracted from security functionality. Enabling security requires setting up security for the Kafka cluster, connecting machines, and then configuring the Kafka Producer properties file, that the Kafka Handler uses for processing, with the required security properties.
You may encounter the inability to decrypt the Kerberos password from the keytab
file. This causes the Kerberos authentication to fall back to interactive mode which cannot work because it is being invoked programmatically. The cause of this problem is that the Java Cryptography Extension (JCE) is not installed in the Java Runtime Environment (JRE). Ensure that the JCE is loaded in the JRE, see http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html.
Parent topic: Setting Up and Running the Kafka Connect Handler
There are multiple configuration settings both for the Oracle GoldenGate for Big Data configuration and in the Kafka producer which affect performance.
The Oracle GoldenGate parameter have the greatest affect on performance is the Replicat GROUPTRANSOPS
parameter. The GROUPTRANSOPS
parameter allows Replicat to group multiple source transactions into a single target transaction. At transaction commit, the Kafka Connect Handler calls flush on the Kafka Producer to push the messages to Kafka for write durability followed by a checkpoint. The flush call is an expensive call and setting the Replicat GROUPTRANSOPS
setting to larger amount allows the replicat to call the flush call less frequently thereby improving performance.
The default setting for GROUPTRANSOPS
is 1000
and performance improvements can be obtained by increasing the value to 2500, 5000, or even 10000.
The Op mode gg.handler.kafkaconnect.mode=op
parameter can also improve performance than the Tx mode gg.handler.kafkaconnect.mode=tx
.
A number of Kafka Producer properties can affect performance. The following are the parameters with significant impact:
linger.ms
batch.size
acks
buffer.memory
compression.type
Oracle recommends that you start with the default values for these parameters and perform performance testing to obtain a base line for performance. Review the Kafka documentation for each of these parameters to understand its role and adjust the parameters and perform additional performance testing to ascertain the performance effect of each parameter.
Parent topic: Using the Kafka Connect Handler
Topics:
Parent topic: Using the Kafka Connect Handler
Issues with the Java classpath are one of the most common problems. The indication of a classpath problem is a ClassNotFoundException
in the Oracle GoldenGate Java log4j
log file or and error while resolving the classpath if there is a typographic error in the gg.classpath
variable.
The Kafka client libraries do not ship with the Oracle GoldenGate for Big Data product. You are required to obtain the correct version of the Kafka client libraries and to properly configure the gg.classpath
property in the Java Adapter Properties file to correctly resolve the Java the Kafka client libraries as described in Setting Up and Running the Kafka Connect Handler.
Parent topic: Troubleshooting the Kafka Connect Handler
Kafka Connect was introduced in Kafka 0.9.0.0 version. The Kafka Connect Handler does not work with Kafka versions 0.8.2.2 and older. Attempting to use Kafka Connect with Kafka 0.8.2.2 version typically results in a ClassNotFoundException
error at runtime.
Parent topic: Troubleshooting the Kafka Connect Handler
Typically, the following exception message occurs:
ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties
Verify that the gg.handler.kafkahandler.KafkaProducerConfigFile
configuration property for the Kafka Producer Configuration file name is set correctly.
Ensure that the gg.classpath
variable includes the path to the Kafka Producer properties file and that the path to the properties file does not contain a * wildcard at the end.
Parent topic: Troubleshooting the Kafka Connect Handler
Typically, the following exception message appears:
WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN (Selector.java:276) - Error in I/O with localhost/127.0.0.1 java.net.ConnectException: Connection refused
When this occurs, the connection retry interval expires and the Kafka Connection Handler process abends. Ensure that the Kafka Brokers are running and that the host and port provided in the Kafka Producer properties file is correct.
Network shell commands (such as, netstat -l
) can be used on the machine hosting the Kafka broker to verify that Kafka is listening on the expected port.
Parent topic: Troubleshooting the Kafka Connect Handler