Fusion Middleware Using Oracle GoldenGate for Big Data
Contents
Page 13 of 33

10 Using the Kafka Handler

Learn how to use the Kafka Handler, which is designed to stream change capture data from an Oracle GoldenGate trail to a Kafka topic.

Topics:

10.1 Overview

The Oracle GoldenGate for Big Data Kafka Handler streams change capture data from an Oracle GoldenGate trail to a Kafka topic. Additionally, the Kafka Handler provides functionality to publish messages to a separate schema topic. Schema publication for Avro and JSON is supported.

Apache Kafka is an open source, distributed, partitioned, and replicated messaging service, see http://kafka.apache.org/.

Kafka can be run as a single instance or as a cluster on multiple servers. Each Kafka server instance is called a broker. A Kafka topic is a category or feed name to which messages are published by the producers and retrieved by consumers.

In Kafka, when the topic name corresponds to the fully-qualified source table name, the Kafka Handler implements a Kafka producer. The Kafka producer writes serialized change data capture, from multiple source tables to either a single configured topic or separating source operations, to different Kafka topics.

10.2 Detailed Functionality

Transaction Versus Operation Mode

The Kafka Handler sends instances of the Kafka ProducerRecord class to the Kafka producer API, which in turn publishes the ProducerRecord to a Kafka topic. The Kafka ProducerRecord effectively is the implementation of a Kafka message. The ProducerRecord has two components: a key and a value. Both the key and value are represented as byte arrays by the Kafka Handler. This section describes how the Kafka Handler publishes data.

Transaction Mode

The following configuration sets the Kafka Handler to transaction mode:

gg.handler.name.Mode=tx

In transaction mode, the serialized data is concatenated for every operation in a transaction from the source Oracle GoldenGate trail files. The contents of the concatenated operation data is the value of the Kafka ProducerRecord object. The key of the Kafka ProducerRecord object is NULL. The result is that Kafka messages comprise data from 1 to N operations, where N is the number of operations in the transaction.

For grouped transactions, all the data for all the operations are concatenated into a single Kafka message. Therefore, grouped transactions may result in very large Kafka messages that contain data for a large number of operations.

Operation Mode

The following configuration sets the Kafka Handler to operation mode:

gg.handler.name.Mode=op

In operation mode, the serialized data for each operation is placed into an individual ProducerRecord object as the value. The ProducerRecord key is the fully qualified table name of the source operation. The ProducerRecord is immediately sent using the Kafka Producer API. This means that there is a 1 to 1 relationship between the incoming operations and the number of Kafka messages produced.

Blocking Versus Non-Blocking Mode

The Kafka Handler can send messages to Kafka in either blocking mode (synchronous) or non-blocking mode (asynchronous).

Blocking Mode

The following configuration property sets the Kafka Handler to blocking mode:

gg.handler.name.BlockingSend=true

Messages are delivered to Kafka on a synchronous basis. The Kafka Handler does not send the next message until the current message has been written to the intended topic and an acknowledgement has been received. Blocking mode provides the best guarantee of message delivery but at the cost of reduced performance.

You must never set the Kafka Producer linger.ms variable when in blocking mode, as this causes the Kafka producer to wait for the entire timeout period before sending the message to the Kafka broker. When this happens, the Kafka Handler waits for acknowledgement that the message has been sent while at the same time the Kafka Producer buffers messages to be sent to the Kafka brokers.

Non-Blocking Mode

The following configuration property sets the Kafka Handler to non-blocking mode:

gg.handler.name.BlockingSend=false

Messages are delivered to Kafka asynchronously. Kafka messages are published one after the other without waiting for acknowledgements. The Kafka Producer client may buffer incoming messages in order to increase throughput.

On each transaction commit, the Kafka producer flush call is invoked to ensure that all outstanding messages are transferred to the Kafka cluster. This allows the Kafka Handler to safely checkpoint, ensuring zero data loss. Invocation of the Kafka producer flush call is not affected by the linger.ms duration. This allows the Kafka Handler to safely checkpoint ensuring zero data loss.

You can control when the Kafka Producer flushes data to the Kafka Broker by a number of configurable properties in the Kafka producer configuration file. In order to enable batch sending of messages by the Kafka Producer, both the batch.size and linger.ms Kafka Producer properties must be set. The batch.size controls the maximum number of bytes to buffer before a send to Kafka, while the linger.ms variable controls the maximum milliseconds to wait before sending data. Data is sent to Kafka once the batch.size is reached or when the linger.ms period expires, whichever comes first. Setting the batch.size variable only ensures that messages are sent immediately to Kafka.

Topic Name Selection

The topic is resolved at runtime using this configuration parameter:

gg.handler.topicMappingTemplate 

You can configure a static string, keywords, or a combination of static strings and keywords to dynamically resolve the topic name at runtime based on the context of the current operation, see Using Templates to Resolve the Topic Name and Message Key.

Kafka Broker Settings

To configure topics to be created automatically, set the auto.create.topics.enable property to true. This is the default setting.

If you set the auto.create.topics.enable property to false, then you must manually create topics before you start the Replicat process.

Schema Propagation

The schema data for all tables is delivered to the schema topic that is configured with the schemaTopicName property. For more information , see Schema Propagation.

10.3 Setting Up and Running the Kafka Handler

Instructions for configuring the Kafka Handler components and running the handler are described in this section.

You must install and correctly configure Kafka either as a single node or a clustered instance, see http://kafka.apache.org/documentation.html.

If you are using a Kafka distribution other than Apache Kafka, then consult the documentation for your Kafka distribution for installation and configuration instructions.

Zookeeper, a prerequisite component for Kafka and Kafka broker (or brokers), must be up and running.

Oracle recommends and considers it best practice that the data topic and the schema topic (if applicable) are preconfigured on the running Kafka brokers. You can create Kafka topics dynamically. However, this relies on the Kafka brokers being configured to allow dynamic topics.

If the Kafka broker is not collocated with the Kafka Handler process, then the remote host port must be reachable from the machine running the Kafka Handler.

Topics:

10.3.1 Classpath Configuration

For the Kafka Handler to connect to Kafka and run, the Kafka Producer properties file and the Kafka client JARs must be configured in the gg.classpath configuration variable. The Kafka client JARs must match the version of Kafka that the Kafka Handler is connecting to. For a list of the required client JAR files by version, see Kafka Handler Client Dependencies.

The recommended storage location for the Kafka Producer properties file is the Oracle GoldenGate dirprm directory.

The default location of the Kafka client JARs is Kafka_Home/libs/*.

The gg.classpath must be configured precisely. The path of the Kafka Producer Properties file must contain the path with no wildcard appended. If the * wildcard is included in the path to the Kafka Producer Properties file, the file is not picked up. Conversely, path to the dependency JARs must include the * wild card character in order to include all the JAR files in that directory in the associated classpath. Do not use *.jar. The following is an example of the correctly configured classpath:

gg.classpath={kafka install dir}/libs/*

10.3.2 Kafka Handler Configuration

The following are the configurable values for the Kafka Handler. These properties are located in the Java Adapter properties file (not in the Replicat properties file).

To enable the selection of the Kafka Handler, you must first configure the handler type by specifying gg.handler.jdbc.type=kafka and the other Kafka properties as follows:

Table 10-1 Configuration Properties for Kafka Handler

Property Name Required / Optional Property Value Default Description

gg.handlerlist

Required

name (choice of any name)

None

List of handlers to be used.

gg.handler.name.type

Required

kafka

None

Type of handler to use.

gg.handler.name.KafkaProducerConfigFile

Optional

Any custom file name

kafka-producer-default.properties

Filename in classpath that holds Apache Kafka properties to configure the Apache Kafka producer.

gg.handler.name.Format

Optional

Formatter class or short code.

delimitedtext

Formatter to use to format payload. Can be one of xml, delimitedtext, json, json_row, avro_row, avro_op

gg.handler.name.SchemaTopicName

Required when schema delivery is required.

Name of the schema topic.

None

Topic name where schema data will be delivered. If this property is not set, schema will not be propagated. Schemas will be propagated only for Avro formatters.

gg.handler.name.SchemaPrClassName

Optional

Fully qualified class name of a custom class that implements Oracle GoldenGate for Big Data Kafka Handler's CreateProducerRecord Java Interface.

Provided this implementation class: oracle.goldengate.handler.kafka

ProducerRecord

Schema is also propagated as a ProducerRecord. The default key is the fully qualified table name. If this needs to be changed for schema records, the custom implementation of the CreateProducerRecord interface needs to be created and this property needs to be set to point to the fully qualified name of the new class.

gg.handler.name.BlockingSend

Optional

true | false

false

If this property is set to true, then delivery to Kafka works in a completely synchronous model. The next payload is sent only after the current payload has been written to the intended topic and an acknowledgement has been received. In transaction mode, this provides exactly once semantics. If this property is set to false, then delivery to Kafka is made to work in an asynchronous model. Payloads are sent one after the other without waiting for acknowledgements. Kafka internal queues may buffer contents to increase throughput. Checkpoints are made only when acknowledgements are received from Kafka brokers using Java callbacks.

gg.handler.name.mode

Optional

tx/op

tx

With Kafka Handler operation mode, each change capture data record (Insert, Update, Delete, and so on) payload is represented as a Kafka Producer Record and is flushed one at a time. With Kafka Handler in transaction mode, all operations within a source transaction are represented as a single Kafka Producer record. This combined byte payload is flushed on a transaction Commit event.

gg.handler.name.topicMappingTemplate

Required

A template string value to resolve the Kafka topic name at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.keyMappingTemplate

Required

A template string value to resolve the Kafka message key at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.hander.name.logSuccessfullySentMessages

Optional

true | false

true

Set to true, the Kafka Handler will log at the INFO level message that have been successfully sent to Kafka. Enabling this property has negative impact onnnperformance.

10.3.3 Java Adapter Properties File

The following is a sample configuration for the Kafka Handler from the Adapter properties file:

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.Type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=oggtopic
gg.handler.kafkahandler.keyMappingTemplate=${currentTimestamp}
gg.handler.kafkahandler.Format = avro_op
gg.handler.kafkahandler.SchemaTopicName = oggSchemaTopic
gg.handler.kafkahandler.SchemaPrClassName = com.company.kafkaProdRec.SchemaRecord
gg.handler.kafkahandler.Mode = tx
gg.handler.kafkahandler.BlockingSend = true

You can find a sample Replicat configuration and a Java Adapter Properties file for a Kafka integration in the following directory:

GoldenGate_install_directory/AdapterExamples/big-data/kafka

10.3.4 Kafka Producer Configuration File

The Kafka Handler must access a Kafka producer configuration file in order to publish messages to Kafka. The file name of the Kafka producer configuration file is controlled by the following configuration in the Kafka Handler properties.

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

The Kafka Handler attempts to locate and load the Kafka producer configuration file by using the Java classpath. Therefore, the Java classpath must include the directory containing the Kafka Producer Configuration File.

The Kafka producer configuration file contains Kafka proprietary properties. The Kafka documentation provides configuration information for the 0.8.2.0 Kafka producer interface properties. The Kafka Handler uses these properties to resolve the host and port of the Kafka brokers, and properties in the Kafka producer configuration file control the behavior of the interaction between the Kafka producer client and the Kafka brokers.

A sample of configuration file for the Kafka producer is as follows:

bootstrap.servers=localhost:9092
acks = 1
compression.type = gzip
reconnect.backoff.ms = 1000
 
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size = 102400
linger.ms = 0
max.request.size = 1048576 
send.buffer.bytes = 131072

10.3.5 Using Templates to Resolve the Topic Name and Message Key

The Kafka Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. The templates use the following configuration properties:

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

Template Modes

Source database transactions are made up of one or more individual operations that are the individual inserts, updates, and deletes. The Kafka Handler can be configured to send one message per operation (insert, update, delete), or alternatively can be configured to group operations into messages at the transaction level. Many template keywords resolve data based on the context of an individual source database operation. Therefore, many of the keywords do not work when sending messages at the transaction level. For example, using ${fullyQualifiedTableName} does not work when sending messages at the transaction level rather it resolves to the qualified source table name for an operation. However, transactions can contain multiple operations for many source tables. Resolving the fully qualified table name for messages at the transaction level is non-deterministic so abends at runtime.

Template Keywords

This table includes a column if the keyword is supported for transaction level messages.

Keyword Explanation Transaction Message Support

${fullyQualifiedTableName}

Resolves to the fully qualified table name including the period (.) delimiter between the catalog, schema, and table names.

For example, test.dbo.table1.

No

${catalogName}

Resolves to the catalog name.

No

${schemaName}

Resolves to the schema name.

No

${tableName}

Resolves to the short table name.

No

${opType}

Resolves to the type of the operation: (INSERT, UPDATE, DELETE, or TRUNCATE)

No

${primaryKeys}

Resolves to the concatenated primary key values delimited by an underscore (_) character.

No

${position}

The sequence number of the source trail file followed by the offset (RBA).

Yes

${opTimestamp}

The operation timestamp from the source trail file.

Yes

${emptyString}

Resolves to “”.

Yes

${groupName}

Resolves to the name of the Replicat process. If using coordinated delivery, it resolves to the name of the Replicat process with the Replicate thread number appended.

Yes

${staticMap[]}

Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square brace in the following format:

${staticMap[dbo.table1=value1,dbo.table2=value2]}

No

${columnValue[]}

Resolves to a column value where the key is the fully-qualified table name and the value is the column name to be resolved. For example:

${staticMap[dbo.table1=col1,dbo.table2=col2]}

No

${currentTimestamp}

Or

${currentTimestamp[]}

Resolves to the current timestamp. You can control the format of the current timestamp using the Java based formatting as described in the SimpleDateFormat class, see https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html.

Examples:

${currentDate}
${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

Yes

${null}

Resolves to a NULL string.

Yes

${custom[]}

It is possible to write a custom value resolver. If required, contact Oracle Support.

Implementation dependent

Example Templates

The following describes example template configuration values and the resolved values.

Example Template Resolved Value

${groupName}_{fullyQualfiedTableName}

KAFKA001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

10.3.6 Kafka Configuring with Kerberos on a Hadoop Platform

Use these steps to configure a Kafka Handler Replicat with Kerberos to enable a Cloudera instance to process an Oracle GoldenGate for Big Data trail to a Kafka topic:

  1. In GGSCI, add a Kafka Replicat:

    GGSCI> add replicat kafka, exttrail dirdat/gg
    
  2. Configure a prm file with these properties:

    replicat kafka
    discardfile ./dirrpt/kafkax.dsc, purge
    SETENV (TZ=PST8PDT)
    GETTRUNCATES
    GETUPDATEBEFORES
    ReportCount Every 1000 Records, Rate
    MAP qasource.*, target qatarget.*;
    
  3. Configure a Replicat properties file as follows:

    ###KAFKA Properties file ###
    gg.log=log4j
    gg.log.level=info
    gg.report.time=30sec
    
    ###Kafka Classpath settings ###
    gg.classpath=/opt/cloudera/parcels/KAFKA-2.1.0-1.2.1.0.p0.115/lib/kafka/libs/*
    jvm.bootoptions=-Xmx64m -Xms64m -Djava.class.path=./ggjava/ggjava.jar -Dlog4j.configuration=log4j.properties -Djava.security.auth.login.config=/scratch/ydama/ogg/v123211/dirprm/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf
    
    javawriter.stats.full=TRUE
    javawriter.stats.display=TRUE
    
    ### native library config ###
    goldengate.userexit.nochkpt=TRUE
    goldengate.userexit.timestamp=utc
    
    ### Kafka handler properties ###
    gg.handlerlist = kafkahandler
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=kafka-producer.properties
    gg.handler.kafkahandler.format=delimitedtext
    gg.handler.kafkahandler.format.PkUpdateHandling=update
    gg.handler.kafkahandler.mode=tx
    gg.handler.kafkahandler.format.includeCurrentTimestamp=false
    #gg.handler.kafkahandler.maxGroupSize=100
    #gg.handler.kafkahandler.minGroupSize=50
    gg.handler.kafkahandler.format.fieldDelimiter=|
    gg.handler.kafkahandler.format.lineDelimiter=CDATA[\n]
    gg.handler.kafkahandler.topicMappingTemplate=myoggtopic
    gg.handler.kafkahandler.keyMappingTemplate=${position}
    
  4. Configure a Kafka Producer file with these properties:

    bootstrap.servers=10.245.172.52:9092
    acks=1
    #compression.type=snappy
    reconnect.backoff.ms=1000
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    batch.size=1024
    linger.ms=2000
    
    security.protocol=SASL_PLAINTEXT
    
    sasl.kerberos.service.name=kafka
    sasl.mechanism=GSSAPI
    
  5. Configure a jaas.conf file with these properties:

    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/scratch/ydama/ogg/v123211/dirtmp/keytabs/slc06unm/kafka.keytab"
    principal="kafka/slc06unm.us.oracle.com@HADOOPTEST.ORACLE.COM";
    };
    
  6. Ensure that you have the latest key.tab files from the Cloudera instance to connect secured Kafka topics.

  7. Start the Replicat from GGSCI and make sure that it is running with INFO ALL.

  8. Review the Replicat report to see the total number of records processed. The report is similar to:

    Oracle GoldenGate for Big Data, 12.3.2.1.1.005
    
    Copyright (c) 2007, 2018. Oracle and/or its affiliates. All rights reserved
    
    Built with Java 1.8.0_161 (class version: 52.0)
    
    2018-08-05 22:15:28 INFO OGG-01815 Virtual Memory Facilities for: COM
    anon alloc: mmap(MAP_ANON) anon free: munmap
    file alloc: mmap(MAP_SHARED) file free: munmap
    target directories:
    /scratch/ydama/ogg/v123211/dirtmp.
    
    Database Version:
    
    Database Language and Character Set:
    
    ***********************************************************************
    ** Run Time Messages **
    ***********************************************************************
    
    
    2018-08-05 22:15:28 INFO OGG-02243 Opened trail file /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000 at 2018-08-05 22:15:28.258810.
    
    2018-08-05 22:15:28 INFO OGG-03506 The source database character set, as determined from the trail file, is UTF-8.
    
    2018-08-05 22:15:28 INFO OGG-06506 Wildcard MAP resolved (entry qasource.*): MAP "QASOURCE"."BDCUSTMER1", target qatarget."BDCUSTMER1".
    
    2018-08-05 22:15:28 INFO OGG-02756 The definition for table QASOURCE.BDCUSTMER1 is obtained from the trail file.
    
    2018-08-05 22:15:28 INFO OGG-06511 Using following columns in default map by name: CUST_CODE, NAME, CITY, STATE.
    
    2018-08-05 22:15:28 INFO OGG-06510 Using the following key columns for target table qatarget.BDCUSTMER1: CUST_CODE.
    
    2018-08-05 22:15:29 INFO OGG-06506 Wildcard MAP resolved (entry qasource.*): MAP "QASOURCE"."BDCUSTORD1", target qatarget."BDCUSTORD1".
    
    2018-08-05 22:15:29 INFO OGG-02756 The definition for table QASOURCE.BDCUSTORD1 is obtained from the trail file.
    
    2018-08-05 22:15:29 INFO OGG-06511 Using following columns in default map by name: CUST_CODE, ORDER_DATE, PRODUCT_CODE, ORDER_ID, PRODUCT_PRICE, PRODUCT_AMOUNT, TRANSACTION_ID.
    
    2018-08-05 22:15:29 INFO OGG-06510 Using the following key columns for target table qatarget.BDCUSTORD1: CUST_CODE, ORDER_DATE, PRODUCT_CODE, ORDER_ID.
    
    2018-08-05 22:15:33 INFO OGG-01021 Command received from GGSCI: STATS.
    
    2018-08-05 22:16:03 INFO OGG-01971 The previous message, 'INFO OGG-01021', repeated 1 times.
    
    2018-08-05 22:43:27 INFO OGG-01021 Command received from GGSCI: STOP.
    
    ***********************************************************************
    * ** Run Time Statistics ** *
    ***********************************************************************
    
    Last record for the last committed transaction is the following:
    ___________________________________________________________________
    Trail name : /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000
    Hdr-Ind : E (x45) Partition : . (x0c)
    UndoFlag : . (x00) BeforeAfter: A (x41)
    RecLength : 0 (x0000) IO Time : 2015-08-14 12:02:20.022027
    IOType : 100 (x64) OrigNode : 255 (xff)
    TransInd : . (x03) FormatType : R (x52)
    SyskeyLen : 0 (x00) Incomplete : . (x00)
    AuditRBA : 78233 AuditPos : 23968384
    Continued : N (x00) RecCount : 1 (x01)
    
    2015-08-14 12:02:20.022027 GGSPurgedata Len 0 RBA 6473
    TDR Index: 2
    ___________________________________________________________________
    
    Reading /scratch/ydama/ogg/v123211/dirdat/kfkCustR/gg000000, current RBA 6556, 20 records, m_file_seqno = 0, m_file_rba = 6556
    
    Report at 2018-08-05 22:43:27 (activity since 2018-08-05 22:15:28)
    
    From Table QASOURCE.BDCUSTMER1 to qatarget.BDCUSTMER1:
    # inserts: 5
    # updates: 1
    # deletes: 0
    # discards: 0
    From Table QASOURCE.BDCUSTORD1 to qatarget.BDCUSTORD1:
    # inserts: 5
    # updates: 3
    # deletes: 5
    # truncates: 1
    # discards: 0
    
    
  9. Ensure that the secure Kafka topic is created:

    /kafka/bin/kafka-topics.sh --zookeeper slc06unm:2181 --list  
    myoggtopic
    
  10. Review the contents of the secure Kafka topic:

    1. Create a consumer.properties file containing:

      security.protocol=SASL_PLAINTEXT
      sasl.kerberos.service.name=kafka
      
    2. Set this environment variable:

      export KAFKA_OPTS="-Djava.security.auth.login.config="/scratch/ogg/v123211/dirprm/jaas.conf"
      
    3. Run the consumer utility to check the records:

      /kafka/bin/kafka-console-consumer.sh --bootstrap-server sys06:9092 --topic myoggtopic --new-consumer --consumer.config consumer.properties
      

10.4 Schema Propagation

The Kafka Handler provides the ability to publish schemas to a schema topic. Currently, the Avro Row and Operation formatters are the only formatters that are enabled for schema publishing. If the Kafka Handler schemaTopicName property is set, then the schema is published for the following events:

  • The Avro schema for a specific table is published the first time an operation for that table is encountered.

  • If the Kafka Handler receives a metadata change event, the schema is flushed. The regenerated Avro schema for a specific table is published the next time an operation for that table is encountered.

  • If the Avro wrapping functionality is enabled, then the generic wrapper Avro schema is published the first time that any operation is encountered. To enable the generic wrapper, Avro schema functionality is enabled in the Avro formatter configuration, see Avro Row Formatter and The Avro Operation Formatter.

The Kafka ProducerRecord value is the schema, and the key is the fully qualified table name.

Because Avro messages directly depend on an Avro schema, user of Avro over Kafka may encounter issues. Avro messages are not human readable because they are binary. To deserialize an Avro message, the receiver must first have the correct Avro schema, but because each table from the source database results in a separate Avro schema, this can be difficult. The receiver of a Kafka message cannot determine which Avro schema to use to deserialize individual messages when the source Oracle GoldenGate trail file includes operations from multiple tables. To solve this problem, you can wrap the specialized Avro messages in a generic Avro message wrapper. This generic Avro wrapper provides the fully-qualified table name, the hashcode of the schema string, and the wrapped Avro message. The receiver can use the fully-qualified table name and the hashcode of the schema string to resolve the associated schema of the wrapped message, and then use that schema to deserialize the wrapped message.

10.5 Performance Considerations

Oracle recommends that you do not use the linger.ms setting in the Kafka producer config file when gg.handler.name.BlockingSend.is set to true. This causes each send to block for at least the value of linger.ms, leading to major performance issues because the Kafka Handler configuration and the Kafka Producer configuration are in conflict with each other. This configuration results in a temporary deadlock scenario, where the Kafka Handler is waits to received a send acknowledgement while the Kafka producer waits for more messages before sending. The deadlock resolves when the linger.ms period expires. This behavior repeats for every message sent.

For the best performance, Oracle recommends that you set the Kafka Handler to operate in operation mode using non-blocking (asynchronous) calls to the Kafka producer. Use the following configuration in your Java Adapter properties file:

gg.handler.name.mode = op
gg.handler.name.BlockingSend = false

Additionally, Oracle recommends that you set the batch.size and linger.ms values in the Kafka Producer properties file. These values are highly dependent upon the use case scenario. Typically, higher values result in better throughput, but latency is increased. Smaller values in these properties reduces latency but overall throughput decreases. If you have a high volume of input data from the source trial files, then set the batch.size and linger.ms size as high as possible.

Use of the Replicat variable GROUPTRANSOPS also improves performance. The recommended setting is 10000.

If the serialized operations from the source trail file must be delivered in individual Kafka messages, then the Kafka Handler must be set to operation mode.

gg.handler.name.mode = op

However, the result is many more Kafka messages and adversely affected performance.

10.6 About Security

Kafka version 0.9.0.0 introduced security through SSL/TLS and SASL (Kerberos). You can secure the Kafka Handler using one or both of the SSL/TLS and SASL security offerings. The Kafka producer client libraries provide an abstraction of security functionality from the integrations that use those libraries. The Kafka Handler is effectively abstracted from security functionality. Enabling security requires setting up security for the Kafka cluster, connecting machines, and then configuring the Kafka producer properties file with the required security properties. For detailed instructions about securing the Kafka cluster, see the Kafka documentation at

You may encounter the inability to decrypt the Kerberos password from the keytab file. This causes the Kerberos authentication to fall back to interactive mode which cannot work because it is being invoked programmatically. The cause of this problem is that the Java Cryptography Extension (JCE) is not installed in the Java Runtime Environment (JRE). Ensure that the JCE is loaded in the JRE, see http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html.

10.7 Metadata Change Events

Metadata change events are now handled in the Kafka Handler. This is relevant only if you have configured a schema topic and the formatter used supports schema propagation (currently Avro row and Avro Operation formatters). The next time an operation is encountered for a table for which the schema has changed, the updated schema is published to the schema topic.

To support metadata change events, the Oracle GoldenGate process capturing changes in the source database must support the Oracle GoldenGate metadata in trail feature, which was introduced in Oracle GoldenGate 12c (12.2).

10.8 Snappy Considerations

The Kafka Producer Configuration file supports the use of compression. One of the configurable options is Snappy, an open source compression and decompression (codec) library that provides better performance than other codec libraries. The Snappy JAR does not run on all platforms. Snappy may work on Linux systems though may or may not work on other UNIX and Windows implementations. If you want to use Snappy compression, test Snappy on all required systems before implementing compression using Snappy. If Snappy does not port to all required systems, then Oracle recommends using an alternate codec library.

10.9 Troubleshooting

Topics:

10.9.1 Verify the Kafka Setup

You can use the command line Kafka producer to write dummy data to a Kafka topic, and you can use a Kafka consumer to read this data from the Kafka topic. Use this method to verify the setup and read/write permissions to Kafka topics on disk, see http://kafka.apache.org/documentation.html#quickstart.

10.9.2 Classpath Issues

Java classpath problems are common. Such problems may include a ClassNotFoundException problem in the log4j log file or may be an error resolving the classpath because of a typographic error in the gg.classpath variable. The Kafka client libraries do not ship with the Oracle GoldenGate for Big Data product. You must obtain the correct version of the Kafka client libraries and properly configure the gg.classpath property in the Java Adapter Properties file to correctly resolve the Java the Kafka client libraries as described in Classpath Configuration.

10.9.3 Invalid Kafka Version

The Kafka Handler does not support Kafka versions 0.8.2.2 or older. If you run an unsupported version of Kafka, a runtime Java exception, java.lang.NoSuchMethodError, occurs. It implies that the  org.apache.kafka.clients.producer.KafkaProducer.flush() method cannot be found. If you encounter this error, migrate to Kafka version 0.9.0.0 or later.

10.9.4 Kafka Producer Properties File Not Found

This problem typically results in the following exception:

ERROR 2015-11-11 11:49:08,482 [main] Error loading the kafka producer properties

Check the gg.handler.kafkahandler.KafkaProducerConfigFile configuration variable to ensure that the Kafka Producer Configuration file name is set correctly. Check the gg.classpath variable to verify that the classpath includes the path to the Kafka Producer properties file, and that the path to the properties file does not contain a * wildcard at the end.

10.9.5 Kafka Connection Problem

This problem occurs when the Kafka Handler is unable to connect to Kafka. You receive the following warnings:

WARN 2015-11-11 11:25:50,784 [kafka-producer-network-thread | producer-1] WARN  (Selector.java:276) - Error in I/O with localhost/127.0.0.1 
java.net.ConnectException: Connection refused

The connection retry interval expires, and the Kafka Handler process abends. Ensure that the Kafka Broker is running and that the host and port provided in the Kafka Producer Properties file are correct. You can use network shell commands (such as netstat -l) on the machine hosting the Kafka broker to verify that Kafka is listening on the expected port.