12 Using the Kafka REST Proxy Handler

Learn how to use the Kafka REST Proxy Handler to stream messages to the Kafka REST Proxy distributed by Confluent.

Topics:

12.1 Overview

The Kafka REST Proxy Handler allows Kafka messages to be streamed using an HTTPS protocol. The use case for this functionality is to stream Kafka messages from an Oracle GoldenGate On Premises installation to cloud or alternately from cloud to cloud.

The Kafka REST proxy provides a RESTful interface to a Kafka cluster. It makes it easy for you to:

  • produce and consume messages,

  • view the state of the cluster,

  • and perform administrative actions without using the native Kafka protocol or clients.

Kafka REST Proxy is part of the Confluent Open Source and Confluent Enterprise distributions. It is not available in the Apache Kafka distribution. To access Kafka through the REST proxy, you have to install the Confluent Kafka version see https://docs.confluent.io/current/kafka-rest/docs/index.html.

12.2 Setting Up and Starting the Kafka REST Proxy Handler Services

You must download and install the Confluent Open Source or Confluent Enterprise Distribution. You have several installation formats to choose from including ZIP or tar archives, Docker, and Packages.

12.2.1 Using the Kafka REST Proxy Handler

You must download and install the Confluent Open Source or Confluent Enterprise Distribution because the Kafka REST Proxy is not included in Apache, Cloudera, or Hortonworks. You have several installation formats to choose from including ZIP or TAR archives, Docker, and Packages.

The Kafka REST Proxy has dependency on ZooKeeper, Kafka, and the Schema Registry

12.2.2 Kafka REST Proxy Handler Configuration

The following are the configurable values for the Kafka REST Proxy Handler. Oracle recommend that you store the Kafka REST Proxy properties file in the Oracle GoldenGate dirprm directory.

To enable the selection of the Kafka REST Proxy Handler, you must first configure the handler type by specifying gg.handler.name.type=kafkarestproxy and the other Kafka REST Proxy Handler properties as follows:

Table 12-1 Kafka REST Proxy Handler Configuration Properties

Properties Required/ Optional Legal Values Default Explanation

gg.handler.name.type

Required

kafkarestproxy

None

The configuration to select the Kafka REST Proxy Handler.

gg.handler.name.topicMappingTemplate

Required

A template string value to resolve the Kafka topic name at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.keyMappingTemplate

Required

A template string value to resolve the Kafka message key at runtime.

None

See Using Templates to Resolve the Topic Name and Message Key.

gg.handler.name.postDataUrl

Required

The Listener address of the Rest Proxy.

None

Set to the URL of the Kafka REST proxy.

gg.handler.name.format

Required

avro | json

None

Set to the REST proxy payload data format

gg.handler.name.payloadsize

Optional

A value representing the payload size in mega bytes.

5MB

Set to the maximum size of the payload of the HTTP messages.

gg.handler.name.circularRedirectsAllowed

Optional

true | false

false

Set to allow or disallow circular redirects.

gg.handler.name.connectionRequestTimeout

Optional

A value representing milliseconds.

—1

Set the maximum time to wait for the connection manager to return a connection. The connection manager may not be able to return a connection if the max number of connections in the pool are used.

gg.handler.name.connectTimeout

Optional

true | false

-1

Set the timeout in milliseconds until a connection is established.

gg.handler.name.contentCompressionEnabled

Optional

true | false

true

Sets content compression to on or off.

gg.handler.name.maxRedirects

Optional

Integer value representing the redirect count.

50

Sets the maximum number of redirects.

gg.handler.name.proxy

Optional

host:port

None

Sets the proxy.

gg.handler.name.userName

Optional

Any string.

None

Sets the username for the proxy authentication.

gg.handler.name.password

Optional

Any string.

None

Sets the password for the proxy authentication.

gg.handler.name.redirectsEnabled

Optional

true | false

true

Set to check if redirects using relative naming is enabled.

gg.handler.name.relativeRedirectsAllowed

Optional

true | false

true

Set to check if redirects is enabled.

gg.handler.name.socketTimeout

Optional

A value representing milliseconds.

—1

Set the maximum time allowable between data packets on a read.

gg.handler.name.httpClientResetInterval

Optional

A value representing milliseconds.

0

Sets the wait interval between when the HTTP client is destroyed and when it is recreated.

gg.handler.name.apiVersion

Optional

v1 | v2

v2

Sets the API version to use.

gg.handler.name.mode

Optional

op | tx

op

Sets how operations are processed. In op mode, operations are processed as they are received. In tx mode, operations are cached and processed at the transaction commit.

See Using Templates to Resolve the Stream Name and Partition Name for more information.

12.2.3 Security

REST Proxy supports SSL for securing communication between clients and the Kafka REST Proxy Handler. To configure SSL:

  1. Generate a keystore using the keytool for the server. This is also the client-side truststore. Follow the instructions to generate KeyStore in session Procedure to generate KeyStore.

  2. Update the Kafka REST Proxy server configuration, kafka-rest.properties file with these properties:

    listeners=https://server.domain.com:8082
    ssl.keystore.location= path_of_serverkeystore.jks file
    ssl.keystore.password=kafkarest
    ssl.key.password=kafkarest
    ssl.client.auth=false
    
  3. Restart your server.

  4. Append the client-side boot options, in the handler properties file, with:

    -Djavax.net.ssl.trustStore=path_of_serverkeystore.jks file
    -Djavax.net.ssl.trustStorePassword=pwd
    

    For example, in the krp.properties file:

    javawriter.bootoptions=-Xmx512m-Xms32m    
    -Djava.class.path=.:ggjava/ggjava.jar:./dirprm 
    -Djavax.net.ssl.trustStore=/scratch/sabbabu/view_storage/serverkeystore.jks 
    -Djavax.net.ssl.trustStorePassword=kafkarest  
    

12.2.4 Generating a Keystore

You generate the keystore.jks keystore file by executing this statement from the command line:

keytool -genkey -keyalg RSA -alias fun -keystore serverkeystore.jks -validity 365 -keysize 2048

The first name and last name should be the server machine name.

For example:

$ keytool -genkey -keyalg RSA -alias fun –keystore ~/serverkeystore.jks -validity 365 -keysize 2048
Enter keystore password:  
Re-enter new password: 
What is your first and last name?
  [Unknown]:  kkm00cfb.in.oracle.com
What is the name of your organizational unit?
  [Unknown]:  goldengate
What is the name of your organization?
  [Unknown]:  oracle
What is the name of your City or Locality?
  [Unknown]:  bangalore
What is the name of your State or Province?
  [Unknown]:  karnataka
What is the two-letter country code for this unit?
  [Unknown]:  in
Is CN=kkm00cfb.in.oracle.com , OU=goldengate, O=oracle, L=bangalore, ST= karnataka, C=in correct?
  [no]:  yes

Enter key password for fun
(RETURN if same as keystore password):  

This creates a file called serverkeystore.jks. There are two passwords that you need to provide. The keystore password and the key password. These passwords can be the same or different.

You update the Kafka HTTP proxy server configuration file, kafka-rest.properties, and then restart your REST server

12.2.5 Using Templates to Resolve the Topic Name and Message Key

The Kafka REST Proxy Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. The templates use the following configuration properties:

gg.handler.name.topicMappingTemplate
gg.handler.name.keyMappingTemplate

Template Modes

The Kafka REST Proxy Handler can be configured to send one message per operation (insert, update, delete). Alternatively, it can be configured to group operations into messages at the transaction level.

Template Keywords

This table includes a column if the keyword is supported for transaction level messages.

Keyword Explanation Transaction Message Support

${fullyQualifiedTableName}

Resolves to the fully qualified table name including the period (.) delimiter between the catalog, schema, and table names.

For example, test.dbo.table1.

No

${catalogName}

Resolves to the catalog name.

No

${schemaName}

Resolves to the schema name.

No

${tableName}

Resolves to the short table name.

No

${opType}

Resolves to the type of the operation: (INSERT, UPDATE, DELETE, or TRUNCATE)

No

${primaryKeys}

Resolves to the concatenated primary key values delimited by an underscore (_) character.

No

${position}

The sequence number of the source trail file followed by the offset (RBA).

Yes

${opTimestamp}

The operation timestamp from the source trail file.

Yes

${emptyString}

Resolves to “”.

Yes

${groupName}

Resolves to the name of the Replicat process. If using coordinated delivery, it resolves to the name of the Replicat process with the Replicate thread number appended.

Yes

${staticMap[]}

Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square brace in the following format:

${staticMap[dbo.table1=value1,dbo.table2=value2]}

No

${columnValue[]}

Resolves to a column value where the key is the fully-qualified table name and the value is the column name to be resolved. For example:

${staticMap[dbo.table1=col1,dbo.table2=col2]}

No

${currentTimestamp}

Or

${currentTimestamp[]}

Resolves to the current timestamp. You can control the format of the current timestamp using the Java based formatting as described in the SimpleDateFormat class, see https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html.

Examples:

${currentDate}
${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

Yes

${null}

Resolves to a NULL string.

Yes

${custom[]}

It is possible to write a custom value resolver. If required, contact Oracle Support.

Implementation dependent

Example Templates

The following describes example template configuration values and the resolved values.

Example Template Resolved Value

${groupName}_${fullyQualfiedTableName}

KAFKA001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

12.2.6 Kafka REST Proxy Handler Formatter Properties

The following are the configurable values for the Kafka REST Proxy Handler Formatter.

Table 12-2 Kafka REST Proxy Handler Formatter Properties

Properties Optional/ Optional Legal Values Default Explanation
gg.handler.name.format.includeOpType

Optional

true | false

true

Set to true to create a field in the output messages called op_ts. The value is an indicator of the type of source database operation (for example, Ifor insert, Ufor update, Dfor delete).

Set to false to omit this field in the output.

gg.handler.name.format.includeOpTimestamp

Optional

true | false

true

Set to true to create a field in the output messages called op_type. The value is the operation timestamp (commit timestamp) from the source trail file.

Set to false to omit this field in the output.

gg.handler.name.format.includeCurrentTimestamp

Optional

true | false

true

Set to true to create a field in the output messages called current_ts. The value is the current timestamp of when the handler processes the operation.

Set to false to omit this field in the output.

gg.handler.name.format.includePosition

Optional

true | false

true

Set to true to create a field in the output messages called pos. The value is the position (sequence number + offset) of the operation from the source trail file.

Set to false to omit this field in the output.

gg.handler.name.format.includePrimaryKeys

Optional

true | false

true

Set to true to create a field in the output messages called primary_keys. The value is an array of the column names of the primary key columns.

Set to false to omit this field in the output.

gg.handler.name.format.includeTokens

Optional

true | false

true

Set to true to include a map field in output messages. The key is tokens and the value is a map where the keys and values are the token keys and values from the Oracle GoldenGate source trail file.

Set to false to suppress this field.

gg.handler.name.format.insertOpKey

Optional

Any string.

I

The value of the field op_type that indicates an insert operation.

gg.handler.name.format.updateOpKey

Optional

Any string.

U

The value of the field op_type that indicates an update operation.

gg.handler.name.format.deleteOpKey

Optional

Any string.

D

The value of the field op_type that indicates an delete operation.

gg.handler.name.format.truncateOpKey

Optional

Any string.

T

The value of the field op_type that indicates an truncate operation.

gg.handler.name.format.treatAllColumnsAsStrings

Optional

true | false

false

Set to true treat all output fields as strings.

Set to false and the handler maps the corresponding field type from the source trail file to the best corresponding Kafka data type.

gg.handler.name.format.mapLargeNumbersAsStrings

Optional

true | false

false

Set to true and these fields are mapped as strings to preserve precision. This property is specific to the Avro Formatter; it cannot be used with other formatters.

gg.handler.name.format.iso8601Format

Optional

true | false

false

Set to true to output the current date in the ISO8601 format.

gg.handler.name.format.pkUpdateHandling

Optional

abend | update | delete-insert

abend

It is only applicable if you are modeling row messages with the .(gg.handler.name.format.messageFormatting=row property. It is not applicable if you are modeling operations messages as the before and after images are propagated to the message with an update.

12.2.7 Setting Metacolumn Output

The following are the configurable values for the Kafka REST Proxy Handler metacolumns template property that controls metacolumn output.

Table 12-3 Metacolumns Template Property

Properties Required/ Optional Legal Values Default Explanation
gg.handler.name.format.metaColumnsTemplate

Optional

${alltokens} | ${token} | ${env} | ${sys} | ${javaprop} | ${optype} | ${position} | ${timestamp} | ${catalog} | ${schema} | ${table} | ${objectname} | ${csn} | ${xid} | ${currenttimestamp} | ${opseqno} | ${timestampmicro} | ${currenttimestampmicro}

None

The current meta column information can be configured in a simple manner and removes the explicit need to use:

insertOpKey | updateOpKey | deleteOpKey | truncateOpKey | includeTableName | includeOpTimestamp | includeOpType | includePosition | includeCurrentTimestamp, useIso8601Format

It is a comma-delimited string consisting of one or more templated values that represent the template.

This is an example that would produce a list of metacolumns:

${optype}, ${token.ROWID}, ${sys.username}, ${currenttimestamp}

Explanation of the Metacolumn Keywords

${alltokens}

All of the Oracle GoldenGate tokens.

${token}

The value of a specific Oracle GoldenGate token. The token key should follow token key should follow the token using the period (.) operator. For example:

${token.MYTOKEN}
${sys}

A system environmental variable. The variable name should follow sys using the period (.) operator. For example:

${sys.MYVAR}
${env}

An Oracle GoldenGate environment variable. The variable name should follow env using the period (.) operator. For example:

${env.someVariable}
${javaprop}

A Java JVM variable. The variable name should follow javaprop using the period (.) operator. For example:

${javaprop.MYVAR}
${optype}

Operation Type

${position}

Record Position

${timestamp}

Record Timestamp

${catalog}

Catalog Name

${schema}

Schema Name

${table}

Table Name

${objectname}

The fully qualified table name.

${csn}

Source Commit Sequence Number

${xid}

Source Transaction ID

${currenttimestamp}

Current Timestamp

${opseqno}

Record sequence number within the transaction.

${timestampmicro}

Record timestamp (in microseconds after epoch).

${currenttimestampmicro}

Current timestamp (in microseconds after epoch).

Sample Configuration:

gg.handlerlist=kafkarestproxy

#The handler properties
gg.handler.kafkarestproxy.type=kafkarestproxy
gg.handler.kafkarestproxy.mode=tx
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys}
gg.handler.kafkarestproxy.postDataUrl=https://kkm00cfb.in.oracle.com:8082
gg.handler.kafkarestproxy.format=avro
gg.handler.kafkarestproxy.payloadsize=1

gg.handler.kafkarestproxy.socketTimeout=1000
gg.handler.kafkarestproxy.connectTimeout=1000
gg.handler.kafkarestproxy.proxy=host:port
gg.handler.kafkarestproxy.username=username
gg.handler.kafkarestproxy.password=pwd 

#The MetaColumnTemplate formatter properties
gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}

12.3 Consuming the Records

A simple way to consume data from Kafka topics using the Kafka REST Proxy Handler is Curl.

Consume JSON Data

  1. Create a consumer for JSON data.

    curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json"  
    
    https://localhost:8082/consumers/my_json_consumer
    
  2. Subscribe to a topic.

    curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json"    --data '{"topics":["topicname"]}' \
    
    https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
    
  3. Consume records.

    curl –k -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
    
    https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
    

Consume Avro Data

  1. Create a consumer for Avro data.

    curl -k -X POST  -H "Content-Type: application/vnd.kafka.v2+json" \
     --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
    
    https://localhost:8082/consumers/my_avro_consumer
    
  2. Subscribe to a topic.

    curl –k -X POST -H "Content-Type: application/vnd.kafka.v2+json"      --data '{"topics":["topicname"]}' \
    
    https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
    
  3. Consume records.

    curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
    
    https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records
    

12.4 Performance Considerations

There are several configuration settings both for the Oracle GoldenGate for Big Data configuration and in the Kafka producer that affects performance.

The Oracle GoldenGate parameter that has the greatest affect on performance is the Replicat GROUPTRANSOPS parameter. It allows Replicat to group multiple source transactions into a single target transaction. At transaction commit, the Kafka REST Proxy Handler POST’s the data to the Kafka Producer.

Setting the Replicat GROUPTRANSOPS to a larger number allows the Replicat to call the POST less frequently improving performance. The default value for GROUPTRANSOPS is 1000 and performance can be improved by increasing the value to 2500, 5000, or even 10000.

12.5 Kafka REST Proxy Handler Metacolumns Template Property

Problems Starting Kafka REST Proxy server

Sometimes, Flume is set in the classpath, which may stop your REST Proxy server from starting up. Reset the CLASSPATH to “” to overcome the problem.

Problems with Consuming Records

Your proxy could block the calls while consuming the records from Kafka. Disabling the http_proxy variable resolves the issue.