13 Using the Kinesis Streams Handler

Learn how to use the Kinesis Streams Handler, which streams data to applications hosted on the Amazon Cloud or in your environment.

Topics:

13.1 Overview

Amazon Kinesis is a messaging system that is hosted in the Amazon Cloud. Kinesis streams can be used to stream data to other Amazon Cloud applications such as Amazon S3 and Amazon Redshift. Using the Kinesis Streams Handler, you can also stream data to applications hosted on the Amazon Cloud or at your site. Amazon Kinesis streams provides functionality similar to Apache Kafka.

The logical concepts map is as follows:

  • Kafka Topics = Kinesis Streams

  • Kafka Partitions = Kinesis Shards

A Kinesis stream must have at least one shard.

13.2 Detailed Functionality

Topics:

13.2.1 Amazon Kinesis Java SDK

The Oracle GoldenGate Kinesis Streams Handler uses the AWS Kinesis Java SDK to push data to Amazon Kinesis, see Amazon Kinesis Streams Developer Guide at:

http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html.

The Kinesis Steams Handler was designed and tested with the latest AWS Kinesis Java SDK version 1.11.107. These are the dependencies:

  • Group ID: com.amazonaws

  • Artifact ID: aws-java-sdk-kinesis

  • Version: 1.11.107

Oracle GoldenGate for Big Data does not ship with the AWS Kinesis Java SDK. Oracle recommends that you use the AWS Kinesis Java SDK identified in the Certification Matrix, see Verifying Certification and System Requirements.

Note:

It is assumed by moving to the latest AWS Kinesis Java SDK that there are no changes to the interface, which can break compatibility with the Kinesis Streams Handler.

You can download the AWS Java SDK, including Kinesis from:

https://aws.amazon.com/sdk-for-java/

13.2.2 Kinesis Streams Input Limits

The upper input limit for a Kinesis stream with a single shard is 1000 messages per second up to a total data size of 1MB per second. Adding streams or shards can increase the potential throughput such as the following:

  • 1 stream with 2 shards = 2000 messages per second up to a total data size of 2MB per second

  • 3 streams of 1 shard each = 3000 messages per second up to a total data size of 3MB per second

The scaling that you can achieve with the Kinesis Streams Handler depends on how you configure the handler. Kinesis stream names are resolved at runtime based on the configuration of the Kinesis Streams Handler.

Shards are selected by the hash the partition key. The partition key for a Kinesis message cannot be null or an empty string (""). A null or empty string partition key results in a Kinesis error that results in an abend of the Replicat process.

Maximizing throughput requires that the Kinesis Streams Handler configuration evenly distributes messages across streams and shards.

13.3 Setting Up and Running the Kinesis Streams Handler

Instructions for configuring the Kinesis Streams Handler components and running the handler are described in the following sections.

Use the following steps to set up the Kinesis Streams Handler:

  1. Create an Amazon AWS account at https://aws.amazon.com/.

  2. Log into Amazon AWS.

  3. From the main page, select Kinesis (under the Analytics subsection).

  4. Select Amazon Kinesis Streams Go to Streams to create Amazon Kinesis streams and shards within streams.

  5. Create a client ID and secret to access Kinesis.

    The Kinesis Streams Handler requires these credentials at runtime to successfully connect to Kinesis.

  6. Create the client ID and secret:

    1. Select your name in AWS (upper right), and then in the list select My Security Credentials.

    2. Select Access Keys to create and manage access keys.

      Note your client ID and secret upon creation.

      The client ID and secret can only be accessed upon creation. If lost, you have to delete the access key, and then recreate it.

Topics:

13.3.1 Set the Classpath in Kinesis Streams Handler

You must configure the gg.classpath property in the Java Adapter properties file to specify the JARs for the AWS Kinesis Java SDK as follows:

gg.classpath={download_dir}/aws-java-sdk-1.11.107/lib/*:{download_dir}/aws-java-sdk-1.11.107/third-party/lib/*

13.3.2 Kinesis Streams Handler Configuration

You configure the Kinesis Streams Handler operation using the properties file. These properties are located in the Java Adapter properties file (not in the Replicat properties file).

To enable the selection of the Kinesis Streams Handler, you must first configure the handler type by specifying gg.handler.name.type=kinesis_streams and the other Kinesis Streams properties as follows:

Table 13-1 Kinesis Streams Handler Configuration Properties

Properties Required/ Optional Legal Values Default Explanation
gg.handler.name.type

Required

kinesis_streams

None

Selects the Kinesis Streams Handler for streaming change data capture into Kinesis.

gg.handler.name.region

Required

The Amazon region name which is hosting your Kinesis instance.

None

Setting of the Amazon AWS region name is required.

gg.handler.name.proxyServer

Optional

The host name of the proxy server.

None

Set the host name of the proxy server if connectivity to AWS is required to go through a proxy server.

gg.handler.name.proxyPort

Optional

The port number of the proxy server.

None

Set the port name of the proxy server if connectivity to AWS is required to go through a proxy server.

gg.handler.name.proxyUsername

Optional

The username of the proxy server (if credentials are required).

None

Set the username of the proxy server if connectivity to AWS is required to go through a proxy server and the proxy server requires credentials.

gg.handler.name.proxyPassword

Optional

The password of the proxy server (if credentials are required).

None

Set the password of the proxy server if connectivity to AWS is required to go through a proxy server and the proxy server requires credentials.

gg.handler.name.deferFlushAtTxCommit

Optional

true | false

false

When set to false, the Kinesis Streams Handler will flush data to Kinesis at transaction commit for write durability. However, it may be preferable to defer the flush beyond the transaction commit for performance purposes, see Kinesis Handler Performance Considerations.

gg.handler.name.deferFlushOpCount

Optional

Integer

None

Only applicable if gg.handler.name.deferFlushAtTxCommit is set to true. This parameter marks the minimum number of operations that must be received before triggering a flush to Kinesis. Once this number of operations are received, a flush will occur on the next transaction commit and all outstanding operations will be moved from the Kinesis Streams Handler to AWS Kinesis.

gg.handler.name.formatPerOp

Optional

true | false

true

When set to true, it will send messages to Kinesis, once per operation (insert, delete, update). When set to false, operations messages will be concatenated for all the operations and a single message will be sent at the transaction level. Kinesis has a limitation of 1MB max massage size. If 1MB is exceeded then transaction level message will be broken up into multiple messages.

gg.handler.name.customMessageGrouper

Optional

oracle.goldengate.handler.kinesis.KinesisJsonTxMessageGrouper

None

This configuration parameter provides the ability to group Kinesis messages using custom logic. Only one implementation is included in the distribution at this time. The oracle.goldengate.handler.kinesis.KinesisJsonTxMessageGrouperis a custom message which groups JSON operation messages representing operations into a wrapper JSON message that encompasses the transaction. Setting of this value overrides the setting of the gg.handler.formatPerOp setting. Using this feature assumes that the customer is using the JSON formatter (that is gg.handler.name.format=json).

gg.handler.name.streamMappingTemplate

Required

A template string value to resolve the Kinesis message partition key (message key) at runtime.

None

See Using Templates to Resolve the Stream Name and Partition Name for more information.

gg.handler.name.partitionMappingTemplate

Required

A template string value to resolve the Kinesis message partition key (message key) at runtime.

None

See Using Templates to Resolve the Stream Name and Partition Name for more information.

gg.hander.name.format

Required

Any supported pluggable formatter.

delimitedtext | json | json_row | xml | avro_row | avro_opt

Selects the operations message formatter. JSON is likely the best fit for Kinesis.

gg.hander.name.enableStreamCreation

Optional

true

true | false

By default, the Kinesis Handler automatically creates Kinesis streams if they do not already exist. Set to false to disable to automatic creation of Kinesis streams.

gg.hander.name.shardCount

Optional

Positive integer.

1

A Kinesis stream contains 1 or more shards. Controls the number of shards on Kinesis streams that the Kinesis Handler creates. Multiple shards can help improve the ingest performance to a Kinesis stream. Use only when gg.handler.name.enableStreamCreation is set to true.

13.3.3 Using Templates to Resolve the Stream Name and Partition Name

The Kinesis Streams Handler provides the functionality to resolve the stream name and the partition key at runtime using a template configuration value. Templates allow you to configure static values and keywords. Keywords are used to dynamically replace the keyword with the context of the current processing. Templates are applicable to the following configuration parameters:

gg.handler.name.streamMappingTemplate
gg.handler.name.partitionMappingTemplate

Template Modes

Source database transactions are made up of 1 or more individual operations which are the individual inserts, updates, and deletes. The Kinesis Handler can be configured to send one message per operation (insert, update, delete, Alternatively, it can be configured to group operations into messages at the transaction level. Many of the template keywords resolve data based on the context of an individual source database operation. Therefore, many of the keywords do not work when sending messages at the transaction level. For example ${fullyQualifiedTableName} does not work when sending messages at the transaction level. The ${fullyQualifiedTableName} property resolves to the qualified source table name for an operation. Transactions can contain multiple operations for many source tables. Resolving the fully-qualified table name for messages at the transaction level is non-deterministic and so abends at runtime.

Template Keywords

The following table lists the currently supported keyword templates and includes a column if the keyword is supported for transaction level messages:

Keyword Explanation Transaction Message Support

${fullyQualifiedTableName}

Resolves to the fully qualified table name including the period (.) Delimiter between the catalog, schema, and table names.

For example, test.dbo.table1.

No

${catalogName}

Resolves to the catalog name.

No

${schemaName}

Resolves to the schema name

No

${tableName}

Resolves to the short table name.

No

${opType}

Resolves to the type of the operation: (INSERT, UPDATE, DELETE, or TRUNCATE)

No

${primaryKeys}

Resolves to the concatenated primary key values delimited by an underscore (_) character.

No

${position}

The sequence number of the source trail file followed by the offset (RBA).

Yes

${opTimestamp}

The operation timestamp from the source trail file.

Yes

${emptyString}

Resolves to “”.

Yes

${groupName}

Resolves to the name of the replicat process. If using coordinated delivery it resolves to the name of the Replicat process with the replicate thread number appended.

Yes

${staticMap[]}

Resolves to a static value where the key is the fully qualified table name. The keys and values are designated inside of the square brace in the following format:

${staticMap[dbo.table1=value1,dbo.table2=value2]}

No

${columnValue[]}

Resolves to a column value where the key is the fully qualified table name and the value is the column name to be resolved. For example:

${staticMap[dbo.table1=col1,dbo.table2=col2]}

No

${currentTimestamp}

Or

${currentTimestamp[]}

Resolves to the current timestamp. You can control the format of the current timestamp using the Java based formatting as described in the SimpleDateFormat class, see https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html.

Examples:

${currentDate}

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

Yes

${null}

Resolves to a null string.

Yes

${custom[]}

It is possible to write a custom value resolver.

Depends on the implementation.

Example Templates

The following describes example template configuration values and the resolved values.

Example Template Resolved Value

${groupName}_{fullyQualifiedTableName}

KINESIS001_dbo.table1

prefix_${schemaName}_${tableName}_suffix

prefix_dbo_table1_suffix

${currentDate[yyyy-mm-dd hh:MM:ss.SSS]}

2017-05-17 11:45:34.254

13.3.4 Configuring the Client ID and Secret in Kinesis Handler

A client ID and secret are required credentials for the Kinesis Streams Handler to interact with Amazon Kinesis. A client ID and secret are generated through the Amazon AWS website. The retrieval of these credentials and presentation to the Kinesis server are performed on the client side by the AWS Kinesis Java SDK. The AWS Kinesis Java SDK provides multiple ways that the client ID and secret can be resolved at runtime.

The client ID and secret can be set

  • as Java properties so configured in the Java Adapter properties file as follows:

     javawriter.bootoptions=-Xmx512m -Xms32m 
    -Djava.class.path=ggjava/ggjava.jar 
    -Daws.accessKeyId=your_access_key 
    -Daws.secretKey=your_secret_key
    
  • as environmental variables using the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY variables.

  • in the E2C environment on the local machine.

13.3.5 Configuring the Proxy Server for Kinesis Streams Handler

Oracle GoldenGate can be used with a proxy server using the following parameters to enable the proxy server:

  • gg.handler.name.proxyServer=
  • gg.handler.name.proxyPort=80

Access to the proxy servers can be secured using credentials and the following configuration parameters:

  • gg.handler.name.proxyUsername=username
  • gg.handler.name.proxyPassword=password

Sample configurations:

gg.handlerlist=kinesis 
gg.handler.kinesis.type=kinesis_streams 
gg.handler.kinesis.mode=op 
gg.handler.kinesis.format=json 
gg.handler.kinesis.region=us-west-2 
gg.handler.kinesis.partitionMappingTemplate=TestPartitionName
gg.handler.kinesis.streamMappingTemplate=TestStream
gg.handler.kinesis.deferFlushAtTxCommit=true 
gg.handler.kinesis.deferFlushOpCount=1000 
gg.handler.kinesis.formatPerOp=true 
#gg.handler.kinesis.customMessageGrouper=oracle.goldengate.handler.kinesis.KinesisJsonTxMessageGrouper 
gg.handler.kinesis.proxyServer=www-proxy.myhost.com 
gg.handler.kinesis.proxyPort=80

13.3.6 Configuring Security in Kinesis Streams Handler

The AWS Kinesis Java SDK uses HTTPS to communicate with Kinesis. The Kinesis Streams Handler is authenticated by presenting the client ID and secret credentials at runtime using a trusted certificate.

The Kinesis Streams Handler can also be configured to authenticate the server providing mutual authentication. You can do this by generating a certificate from the Amazon AWS website and configuring server authentication. A trust store must be generated on the machine hosting Oracle GoldenGate for Big Data. The trust store and trust store password must be configured in the Kinesis Streams Handler Java Adapter properties file.

The following is an example configuration:

javawriter.bootoptions=-Xmx512m -Xms32m 
-Djava.class.path=ggjava/ggjava.jar 
–Djavax.net.ssl.trustStore=path_to_trust_store_file 
–Djavax.net.ssl.trustStorePassword=trust_store_password

13.4 Kinesis Handler Performance Considerations

Topics:

13.4.1 Kinesis Streams Input Limitations

The maximum write rate to a Kinesis stream with a single shard to be 1000 messages per second up to a maximum of 1MB of data per second. You can scale input to Kinesis by adding additional Kinesis streams or adding shards to streams. Both adding streams and adding shards can linearly increase the Kinesis input capacity and thereby improve performance of the Oracle GoldenGate Kinesis Streams Handler.

Adding streams or shards can linearly increase the potential throughput such as follows:

  • 1 stream with 2 shards = 2000 messages per second up to a total data size of 2MB per second.

  • 3 streams of 1 shard each = 3000 messages per second up to a total data size of 3MB per second.

To fully take advantage of streams and shards, you must configure the Oracle GoldenGate Kinesis Streams Handler to distribute messages as evenly as possible across streams and shards.

Adding additional Kinesis streams or shards does nothing to scale Kinesis input if all data is sent to using a static partition key into a single Kinesis stream. Kinesis streams are resolved at runtime using the selected mapping methodology. For example, mapping the source table name as the Kinesis stream name may provide good distribution of messages across Kinesis streams if operations from the source trail file are evenly distributed across tables. Shards are selected by a hash of the partition key. Partition keys are resolved at runtime using the selected mapping methodology. Therefore, it is best to choose a mapping methodology to a partition key that rapidly changes to ensure a good distribution of messages across shards.

13.4.2 Transaction Batching

The Oracle GoldenGate Kinesis Streams Handler receives messages and then batches together messages by Kinesis stream before sending them via synchronous HTTPS calls to Kinesis. At transaction commit all outstanding messages are flushed to Kinesis. The flush call to Kinesis impacts performance. Therefore, deferring the flush call can dramatically improve performance.

The recommended way to defer the flush call is to use the GROUPTRANSOPS configuration in the replicat configuration. The GROUPTRANSOPS groups multiple small transactions into a single larger transaction deferring the transaction commit call until the larger transaction is completed. The GROUPTRANSOPS parameter works by counting the database operations (inserts, updates, and deletes) and only commits the transaction group when the number of operations equals or exceeds the GROUPTRANSOPS configuration setting. The default GROUPTRANSOPS setting for replicat is 1000.

Interim flushes to Kinesis may be required with the GROUPTRANSOPS setting set to a large amount. An individual call to send batch messages for a Kinesis stream cannot exceed 500 individual messages or 5MB. If the count of pending messages exceeds 500 messages or 5MB on a per stream basis then the Kinesis Handler is required to perform an interim flush.

13.4.3 Deferring Flush at Transaction Commit

The messages are by default flushed to Kinesis at transaction commit to ensure write durability. However, it is possible to defer the flush beyond transaction commit. This is only advisable when messages are being grouped and sent to Kinesis at the transaction level (that is one transaction = one Kinesis message or chunked into a small number of Kinesis messages), when the user is trying to capture the transaction as a single messaging unit.

This may require setting the GROUPTRANSOPS replication parameter to 1 so as not to group multiple smaller transactions from the source trail file into a larger output transaction. This can impact performance as only one or few messages are sent per transaction and then the transaction commit call is invoked which in turn triggers the flush call to Kinesis.

In order to maintain good performance the Oracle GoldenGate Kinesis Streams Handler allows the user to defer the Kinesis flush call beyond the transaction commit call. The Oracle GoldenGate replicat process maintains the checkpoint in the .cpr file in the {GoldenGate Home}/dirchk directory. The Java Adapter also maintains a checkpoint file in this directory named .cpj. The Replicat checkpoint is moved beyond the checkpoint for which the Oracle GoldenGate Kinesis Handler can guarantee message loss will not occur. However, in this mode of operation the GoldenGate Kinesis Streams Handler maintains the correct checkpoint in the .cpj file. Running in this mode will not result in message loss even with a crash as on restart the checkpoint in the .cpj file is parsed if it is before the checkpoint in the .cpr file.

13.5 Troubleshooting

Topics:

13.5.1 Java Classpath

The most common initial error is an incorrect classpath to include all the required AWS Kinesis Java SDK client libraries and creates a ClassNotFound exception in the log file.

You can troubleshoot by setting the Java Adapter logging to DEBUG, and then rerun the process. At the debug level, the logging includes information about which JARs were added to the classpath from the gg.classpath configuration variable.

The gg.classpath variable supports the wildcard asterisk (*) character to select all JARs in a configured directory. For example, /usr/kinesis/sdk/*, see Setting Up and Running the Kinesis Streams Handler.

13.5.2 Kinesis Handler Connectivity Issues

If the Kinesis Streams Handler is unable to connect to Kinesis when running on premise, the problem can be the connectivity to the public Internet is protected by a proxy server. Proxy servers act a gateway between the private network of a company and the public Internet. Contact your network administrator to get the URLs of your proxy server, and then follow the directions in Configuring the Proxy Server for Kinesis Streams Handler.

13.5.3 Logging

The Kinesis Streams Handler logs the state of its configuration to the Java log file.

This is helpful because you can review the configuration values for the handler. Following is a sample of the logging of the state of the configuration:

**** Begin Kinesis Streams Handler - Configuration Summary ****   
Mode of operation is set to op.   
   The AWS region name is set to [us-west-2].   
   A proxy server has been set to [www-proxy.us.oracle.com] using port [80].   
   The Kinesis Streams Handler will flush to Kinesis at transaction commit.  
    Messages from the GoldenGate source trail file will be sent at the operation level. 
   One operation = One Kinesis Message   
The stream mapping template of [${fullyQualifiedTableName}] resolves to [fully qualified table name].  
 The partition mapping template of [${primaryKeys}] resolves to [primary keys].   
**** End Kinesis Streams Handler - Configuration Summary ****