Learn how to use the Kafka REST Proxy Handler to stream messages to the Kafka REST Proxy distributed by Confluent.
Topics:
The Kafka REST Proxy Handler allows Kafka messages to be streamed using an HTTPS protocol. The use case for this functionality is to stream Kafka messages from an Oracle GoldenGate On Premises installation to cloud or alternately from cloud to cloud.
The Kafka REST proxy provides a RESTful interface to a Kafka cluster. It makes it easy for you to:
produce and consume messages,
view the state of the cluster,
and perform administrative actions without using the native Kafka protocol or clients.
Kafka REST Proxy is part of the Confluent Open Source and Confluent Enterprise distributions. It is not available in the Apache Kafka distribution. To access Kafka through the REST proxy, you have to install the Confluent Kafka version see https://docs.confluent.io/current/kafka-rest/docs/index.html.
Parent topic: Using the Kafka REST Proxy Handler
You must download and install the Confluent Open Source or Confluent Enterprise Distribution. You have several installation formats to choose from including ZIP or tar archives, Docker, and Packages.
Parent topic: Using the Kafka REST Proxy Handler
You must download and install the Confluent Open Source or Confluent Enterprise Distribution because the Kafka REST Proxy is not included in Apache, Cloudera, or Hortonworks. You have several installation formats to choose from including ZIP or TAR archives, Docker, and Packages.
The Kafka REST Proxy has dependency on ZooKeeper, Kafka, and the Schema Registry
The following are the configurable values for the Kafka REST Proxy Handler. Oracle recommend that you store the Kafka REST Proxy properties file in the Oracle GoldenGate dirprm
directory.
To enable the selection of the Kafka REST Proxy Handler, you must first configure the handler type by specifying gg.handler.name.type=kafkarestproxy
and the other Kafka REST Proxy Handler properties as follows:
Table 12-1 Kafka REST Proxy Handler Configuration Properties
Properties | Required/ Optional | Legal Values | Default | Explanation |
---|---|---|---|---|
|
Required |
|
None |
The configuration to select the Kafka REST Proxy Handler. |
|
Required |
A template string value to resolve the Kafka topic name at runtime. |
None |
See Using Templates to Resolve the Topic Name and Message Key. |
|
Required |
A template string value to resolve the Kafka message key at runtime. |
None |
See Using Templates to Resolve the Topic Name and Message Key. |
|
Required |
The Listener address of the Rest Proxy. |
None |
Set to the URL of the Kafka REST proxy. |
|
Required |
|
None |
Set to the REST proxy payload data format |
|
Optional |
A value representing the payload size in mega bytes. |
|
Set to the maximum size of the payload of the HTTP messages. |
|
Optional |
|
|
Set to allow or disallow circular redirects. |
|
Optional |
A value representing milliseconds. |
|
Set the maximum time to wait for the connection manager to return a connection. The connection manager may not be able to return a connection if the max number of connections in the pool are used. |
|
Optional |
|
|
Set the timeout in milliseconds until a connection is established. |
|
Optional |
|
|
Sets content compression to on or off. |
|
Optional |
Integer value representing the redirect count. |
|
Sets the maximum number of redirects. |
|
Optional |
|
None |
Sets the proxy. |
|
Optional |
Any string. |
None |
Sets the username for the proxy authentication. |
|
Optional |
Any string. |
None |
Sets the password for the proxy authentication. |
|
Optional |
|
|
Set to check if redirects using relative naming is enabled. |
|
Optional |
|
|
Set to check if redirects is enabled. |
|
Optional |
A value representing milliseconds. |
|
Set the maximum time allowable between data packets on a read. |
|
Optional |
A value representing milliseconds. |
|
Sets the wait interval between when the HTTP client is destroyed and when it is recreated. |
|
Optional |
|
|
Sets the API version to use. |
|
Optional |
|
|
Sets how operations are processed. In |
See Using Templates to Resolve the Stream Name and Partition Name for more information.
REST Proxy supports SSL for securing communication between clients and the Kafka REST Proxy Handler. To configure SSL:
Generate a keystore using the keytool for the server. This is also the client-side truststore. Follow the instructions to generate KeyStore in session Procedure to generate KeyStore.
Update the Kafka REST Proxy server configuration, kafka-rest.properties
file with these properties:
listeners=https://server.domain.com:8082 ssl.keystore.location= path_of_serverkeystore.jks file ssl.keystore.password=kafkarest ssl.key.password=kafkarest ssl.client.auth=false
Restart your server.
Append the client-side boot options, in the handler properties file, with:
-Djavax.net.ssl.trustStore=path_of_serverkeystore.jks file -Djavax.net.ssl.trustStorePassword=pwd
For example, in the krp.properties
file:
javawriter.bootoptions=-Xmx512m-Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm -Djavax.net.ssl.trustStore=/scratch/sabbabu/view_storage/serverkeystore.jks -Djavax.net.ssl.trustStorePassword=kafkarest
You generate the keystore.jks
keystore file by executing this statement from the command line:
keytool -genkey -keyalg RSA -alias fun -keystore serverkeystore.jks -validity 365 -keysize 2048
The first name and last name should be the server machine name.
For example:
$ keytool -genkey -keyalg RSA -alias fun –keystore ~/serverkeystore.jks -validity 365 -keysize 2048 Enter keystore password: Re-enter new password: What is your first and last name? [Unknown]: kkm00cfb.in.oracle.com What is the name of your organizational unit? [Unknown]: goldengate What is the name of your organization? [Unknown]: oracle What is the name of your City or Locality? [Unknown]: bangalore What is the name of your State or Province? [Unknown]: karnataka What is the two-letter country code for this unit? [Unknown]: in Is CN=kkm00cfb.in.oracle.com , OU=goldengate, O=oracle, L=bangalore, ST= karnataka, C=in correct? [no]: yes Enter key password for fun (RETURN if same as keystore password):
This creates a file called serverkeystore.jks
. There are two passwords that you need to provide. The keystore password and the key password. These passwords can be the same or different.
You update the Kafka HTTP proxy server configuration file, kafka-rest.properties
, and then restart your REST server
The Kafka REST Proxy Handler provides functionality to resolve the topic name and the message key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. The templates use the following configuration properties:
gg.handler.name.topicMappingTemplate gg.handler.name.keyMappingTemplate
Template Modes
The Kafka REST Proxy Handler can be configured to send one message per operation (insert, update, delete). Alternatively, it can be configured to group operations into messages at the transaction level.
Template Keywords
This table includes a column if the keyword is supported for transaction level messages.
Keyword | Explanation | Transaction Message Support |
---|---|---|
|
Resolves to the fully qualified table name including the period (.) delimiter between the catalog, schema, and table names. For example, |
No |
|
Resolves to the catalog name. |
No |
|
Resolves to the schema name. |
No |
|
Resolves to the short table name. |
No |
|
Resolves to the type of the operation: ( |
No |
|
Resolves to the concatenated primary key values delimited by an underscore (_) character. |
No |
|
The sequence number of the source trail file followed by the offset (RBA). |
Yes |
|
The operation timestamp from the source trail file. |
Yes |
|
Resolves to “”. |
Yes |
|
Resolves to the name of the Replicat process. If using coordinated delivery, it resolves to the name of the Replicat process with the Replicate thread number appended. |
Yes |
|
Resolves to a static value where the key is the fully-qualified table name. The keys and values are designated inside of the square brace in the following format: ${staticMap[dbo.table1=value1,dbo.table2=value2]} |
No |
|
Resolves to a column value where the key is the fully-qualified table name and the value is the column name to be resolved. For example: ${staticMap[dbo.table1=col1,dbo.table2=col2]} |
No |
Or
|
Resolves to the current timestamp. You can control the format of the current timestamp using the Java based formatting as described in the Examples: ${currentDate} ${currentDate[yyyy-mm-dd hh:MM:ss.SSS]} |
Yes |
|
Resolves to a NULL string. |
Yes |
|
It is possible to write a custom value resolver. If required, contact Oracle Support. |
Implementation dependent |
Example Templates
The following describes example template configuration values and the resolved values.
Example Template | Resolved Value |
---|---|
|
|
|
|
|
|
The following are the configurable values for the Kafka REST Proxy Handler Formatter.
Table 12-2 Kafka REST Proxy Handler Formatter Properties
Properties | Optional/ Optional | Legal Values | Default | Explanation |
---|---|---|---|---|
gg.handler.name.format.includeOpType |
Optional |
|
|
Set to Set to |
gg.handler.name.format.includeOpTimestamp |
Optional |
|
|
Set to Set to |
gg.handler.name.format.includeCurrentTimestamp |
Optional |
|
|
Set to Set to |
gg.handler.name.format.includePosition |
Optional |
|
|
Set to Set to |
gg.handler.name.format.includePrimaryKeys |
Optional |
|
|
Set to Set to |
gg.handler.name.format.includeTokens |
Optional |
|
|
Set to Set to |
gg.handler.name.format.insertOpKey |
Optional |
Any string. |
|
The value of the field |
gg.handler.name.format.updateOpKey |
Optional |
Any string. |
|
The value of the field |
gg.handler.name.format.deleteOpKey |
Optional |
Any string. |
|
The value of the field |
gg.handler.name.format.truncateOpKey |
Optional |
Any string. |
|
The value of the field |
gg.handler.name.format.treatAllColumnsAsStrings |
Optional |
|
|
Set to Set to |
gg.handler.name.format.mapLargeNumbersAsStrings |
Optional |
|
|
Set to |
gg.handler.name.format.iso8601Format |
Optional |
|
|
Set to |
gg.handler.name.format.pkUpdateHandling |
Optional |
|
|
It is only applicable if you are modeling row messages with the |
The following are the configurable values for the Kafka REST Proxy Handler metacolumns template property that controls metacolumn output.
Table 12-3 Metacolumns Template Property
Properties | Required/ Optional | Legal Values | Default | Explanation |
---|---|---|---|---|
gg.handler.name.format.metaColumnsTemplate |
Optional |
|
None |
The current meta column information can be configured in a simple manner and removes the explicit need to use: insertOpKey | updateOpKey | deleteOpKey | truncateOpKey | includeTableName | includeOpTimestamp | includeOpType | includePosition | includeCurrentTimestamp, useIso8601Format It is a comma-delimited string consisting of one or more templated values that represent the template. |
This is an example that would produce a list of metacolumns:
${optype}, ${token.ROWID}, ${sys.username}, ${currenttimestamp}
Explanation of the Metacolumn Keywords
${alltokens}
All of the Oracle GoldenGate tokens.
${token}
The value of a specific Oracle GoldenGate token. The token key should follow token key should follow the token using the period (.
) operator. For example:
${token.MYTOKEN}
${sys}
A system environmental variable. The variable name should follow sys
using the period (.
) operator. For example:
${sys.MYVAR}
${env}
An Oracle GoldenGate environment variable. The variable name should follow env
using the period (.
) operator. For example:
${env.someVariable}
${javaprop}
A Java JVM variable. The variable name should follow javaprop
using the period (.
) operator. For example:
${javaprop.MYVAR}
${optype}
Operation Type
${position}
Record Position
${timestamp}
Record Timestamp
${catalog}
Catalog Name
${schema}
Schema Name
${table}
Table Name
${objectname}
The fully qualified table name.
${csn}
Source Commit Sequence Number
${xid}
Source Transaction ID
${currenttimestamp}
Current Timestamp
${opseqno}
Record sequence number within the transaction.
${timestampmicro}
Record timestamp (in microseconds after epoch).
${currenttimestampmicro}
Current timestamp (in microseconds after epoch).
Sample Configuration:
gg.handlerlist=kafkarestproxy #The handler properties gg.handler.kafkarestproxy.type=kafkarestproxy gg.handler.kafkarestproxy.mode=tx #The following selects the topic name based on the fully qualified table name gg.handler.kafkarestproxy.topicMappingTemplate=${fullyQualifiedTableName} #The following selects the message key using the concatenated primary keys gg.handler.kafkarestproxy.keyMappingTemplate=${primaryKeys} gg.handler.kafkarestproxy.postDataUrl=https://kkm00cfb.in.oracle.com:8082 gg.handler.kafkarestproxy.format=avro gg.handler.kafkarestproxy.payloadsize=1 gg.handler.kafkarestproxy.socketTimeout=1000 gg.handler.kafkarestproxy.connectTimeout=1000 gg.handler.kafkarestproxy.proxy=host:port gg.handler.kafkarestproxy.username=username gg.handler.kafkarestproxy.password=pwd #The MetaColumnTemplate formatter properties gg.handler.kafkarestproxy.format.metaColumnsTemplate=${optype},${timestampmicro},${currenttimestampmicro}
A simple way to consume data from Kafka topics using the Kafka REST Proxy Handler is Curl.
Consume JSON Data
Create a consumer for JSON data.
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" https://localhost:8082/consumers/my_json_consumer
Subscribe to a topic.
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topicname"]}' \ https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
Consume records.
curl –k -X GET -H "Accept: application/vnd.kafka.json.v2+json" \ https://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
Consume Avro Data
Create a consumer for Avro data.
curl -k -X POST -H "Content-Type: application/vnd.kafka.v2+json" \ --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \ https://localhost:8082/consumers/my_avro_consumer
Subscribe to a topic.
curl –k -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["topicname"]}' \ https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/subscription
Consume records.
curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \ https://localhost:8082/consumers/my_avro_consumer/instances/my_consumer_instance/records
Parent topic: Using the Kafka REST Proxy Handler
There are several configuration settings both for the Oracle GoldenGate for Big Data configuration and in the Kafka producer that affects performance.
The Oracle GoldenGate parameter that has the greatest affect on performance is the Replicat GROUPTRANSOPS
parameter. It allows Replicat to group multiple source transactions into a single target transaction. At transaction commit, the Kafka REST Proxy Handler POST
’s the data to the Kafka Producer.
Setting the Replicat GROUPTRANSOPS
to a larger number allows the Replicat to call the POST
less frequently improving performance. The default value for GROUPTRANSOPS
is 1000 and performance can be improved by increasing the value to 2500, 5000, or even 10000.
Parent topic: Using the Kafka REST Proxy Handler
Problems Starting Kafka REST Proxy server
Sometimes, Flume is set in the classpath, which may stop your REST Proxy server from starting up. Reset the CLASSPATH
to “”
to overcome the problem.
Problems with Consuming Records
Your proxy could block the calls while consuming the records from Kafka. Disabling the http_proxy
variable resolves the issue.
Parent topic: Using the Kafka REST Proxy Handler