Integrations

Apache Kafka

PipelineDB supports ingesting data from Kafka topics into streams. All of this functionality is contained in the pipeline_kafka extension. Internally, pipeline_kafka uses PostgreSQL’s COPY infrastructure to transform Kafka messages into rows that PipelineDB understands.

Note

The pipeline_kafka extension is officially supported but does not ship with the PipelineDB packages and therefore must be installed separately. The repository for the extension is located here. Instructions for building and installing the extension can be found in the README.md file.

pipeline_kafka internally uses shared memory to sync state between background workers, so it must be loaded as a shared library. You can do so by adding the following line to your pipelinedb.conf file. If you’re already loading some shared libraries, then simply add pipeline_kafka as a comma-separated list.

shared_preload_libraries = pipeline_kafka

You can now load the extention into a database:

postgres=# CREATE EXTENSION pipeline_kafka;
CREATE EXTENSION

Before you can start using pipeline_kafka, you must add a broker for your Kafka deployment.

pipeline_kafka.add_broker ( hostname text )

hostname is a string of the form <host>[:<port>]. Multiple brokers can be added by calling pipeline_kafka.add_broker for each host.

Consuming Messages

pipeline_kafka.consume_begin ( topic text, stream text, format := ‘text’, delimiter := E’\t’, quote := NULL, escape := NULL, batchsize := 1000, maxbytes := 32000000, parallelism := 1, start_offset := NULL )

Launches parallelism number of background worker processes that each reads messages from the given Kafka topic into the given stream. The target stream must be created with CREATE STREAM beforehand. All partitions of the given topic will be spread evenly across each worker process.

The optional format, delimiter, escape and quote arguments are analagous to the FORMAT, DELIMITER ESCAPE and QUOTE options for the PostgreSQL COPY command, except that pipeline_kafka supports one additional format: json. The json format interprets each Kafka message as a JSON object.

batchsize controls the batch_size parameter passed to the Kafka client. We force a COPY and commit cycle after batchsize messages have been buffered.

maxbytes controls the fetch.message.max.bytes parameter passes to the Kafka client. We force a COPY and commit cycle after maxbytes data has been buffered.

start_offset specifies the offset from which to start reading the Kafka topic partitions.

pipeline_kafka continuously saves the offset its read till durably in the database. If start_offset is NULL, we start from the saved offset or the end of the partition if there is no saved offset. A start_offset of -1 will start reading end of each partition and -2 will start consuming from the beginning of each partition. Using any other start_offset would be an odd thing to do, since offsets are unrelated among partitions.

pipeline_kafka.consume_begin ( )

Same as above, but launches all previously created consumers instead of for a specific stream-topic pair.

pipeline_kafka.consume_end ( topic text, stream text )

Terminates background consumer processes for the given stream-topic pair.

pipeline_kafka.consume_end ( )

Same as above, but terminates all consumer processes.

Producing Messages

New in version 0.9.1.

pipeline_kafka.produce_message ( topic text, message bytea, partition := NULL, key := NULL )

Produces a single message into the target topic. Both partition and key are optional. By default the parition remains unassigned so the broker will decide which parition to produce the message to depending on the topic’s paritioner function. If you want to produce the message into a specific partition, specify it as an integer. key is a bytea argument which will be used as the key to the partition function.

pipeline_kafka.emit_tuple ( topic, partition, key )

This is a trigger function that can be used to emit tuples into a Kafka stream in JSON format. It can only be used for a AFTER INSERT OR UPDATE and FOR EACH ROW trigger. In case of an UPDATE, the new updated tuples is emitted. A topic must be provided, where as partition and key are both optional. Since this is a trigger function, all arguments must be passed as string literals and there is no way to specify keyword arguments. If you only want to specify a topic and key, use '-1' as the parition which will keep the partition unassigned. key is the name of the column in the tuple being emitted whose value should be used as the parition key.

Metadata

pipeline_kafka uses several tables to durably keep track of its own state across system restarts:

pipeline_kafka.consumers

Stores the metadata for each stream-topic consumer that is created by pipeline_kafka.consume_begin.

pipeline_kafka.brokers

Stores all Kafka brokers that consumers can connect to.

pipeline_kafka.offsets

Stores Kafka topic offsets so that consumers can begin reading messages from where they left off before termination or system restarts.


Note

See SQL on Kafka for an in-depth tutorial on using Kafka with PipelineDB.

Amazon Kinesis

PipelineDB also supports ingesting data from Amazon Kinesis streams. This functionality is provided by the pipeline_kinesis extension. Internally, the extension manages bgworkers that are consuming data using the AWS SDK, and copying it into pipeline streams.

The repository for the extension is located here. Instructions for building and installing the extension can be found in the README.md file.

To enable the extension, it must be explicitly loaded:

postgres=# CREATE EXTENSION pipeline_kinesis;
CREATE EXTENSION

To start ingestion, you must first tell pipeline where and how to get kinesis data by configuring an endpoint:

pipeline_kinesis.add_endpoint( name text, region text, credfile text := NULL, url text := NULL )

name is a unique identifier for the endpoint. region is a string identifying the AWS region, e.g. us-east-1 or us-west-2.

credfile is an optional parameter that allows overriding the default file location for AWS credentials.

url is an optional parameter that allows the use a different (non-AWS) kinesis server. This is mostly useful for testing with local kinesis servers such as kinesalite.

Consuming Messages

pipeline_kinesis.consume_begin ( endpoint text, stream text, relation text, format text := ‘text’, delimiter text := E’\t’, quote text := NULL, escape text := NULL, batchsize int := 1000, parallelism int := 1, start_offset int := NULL )

Starts a logical consumer group that consumes kinesis messages from kinesis stream at endpoint and copies them into the pipeline stream relation.

parallelism is used to specify the number of background worker processes that should be used per consumer to balance load. Note - this does not need to be set to the number of shards, since the extension is internally threaded. The default value of 1 is sufficient unless the consumer starts to fall behind.

format, delimiter, escape and quote are optional parameters used to control the format of the copied rows, as in PostgreSQL COPY.

batchsize is passed on to the AWS SDK and controls the Limit parameter used in Kinesis GetRecords.

start_offset is used to control the stream position that the extension starts reading from. -1 is used to start reading from the end of the stream, and -2 to read from the start. Internally, these map to TRIM_HORIZON and LATEST. See Kinesis GetShardIterator for more details.

pipeline_kinesis.consume_end (endpoint text, stream text, relation text)

Terminates all background worker process for a particular consumer.

pipeline_kinesis.consume_begin()

Launches all previously created consumers.

pipeline_kinesis.consume_end()

Terminates all background worker processes for all previously started consumers.

Metadata

pipeline_kinesis uses several tables to durably keep track of its own state across system restarts:

pipeline_kinesis.endpoints

Stores the metadata for each endpoint that is created by kinesis_add_endpoint

pipeline_kinsesis.consumers

Stores the metadata for each consumer that is created by kinesis_consume_begin.

pipeline_kinsesis.seqnums

Stores the per-shard metadata for each consumer. Namely, seqnums.