Clients

Since PipelineDB runs as an extension of PostgreSQL 10.1+ and 11.0+, it doesn’t have its own client libraries. Instead, any client that works with PostgreSQL (or any SQL database for that matter) will work with PipelineDB.

Here you’ll find examples of a simple PipelineDB application written in a few different languages and clients. The application simply creates the continuous view:

CREATE VIEW continuous view WITH (action=materialize) AS
      SELECT x::integer, COUNT(*) FROM stream GROUP BY x;

The application then emits 100,000 events resulting in 10 unique groupings for the continuous view, and prints out the results.

Python

For this example in Python, you’ll need to have psycopg2 installed.

import psycopg2

conn = psycopg2.connect('dbname=test user=user host=localhost port=5432')
pipeline = conn.cursor()

create_stream = """
CREATE FOREIGN TABLE stream (x integer) SERVER pipelinedb
"""
pipeline.execute(create_stream)

create_cv = """
CREATE VIEW continuous_view WITH (action=materialize) AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x
"""
pipeline.execute(create_cv)
conn.commit()

rows = []

for n in range(100000):
    # 10 unique groupings
    x = n % 10
    rows.append({'x': x})

# Now write the rows to the stream
pipeline.executemany('INSERT INTO stream (x) VALUES (%(x)s)', rows)

# Now read the results
pipeline.execute('SELECT * FROM continuous_view')
rows = pipeline.fetchall()

for row in rows:
    x, count = row

    print x, count

pipeline.execute('DROP VIEW continuous_view')
pipeline.close()

Ruby

This example in Ruby uses the pg gem.

require 'pg'
pipeline = PGconn.connect("dbname='test' user='user' host='localhost' port=5432")

# This continuous view will perform 3 aggregations on page view traffic, grouped by url:
#
# total_count - count the number of total page views for each url
# uniques     - count the number of unique users for each url
# p99_latency - determine the 99th-percentile latency for each url

s = "
CREATE FOREIGN TABLE page_views (
        url text,
        cookie text,
        latency integer
) SERVER pipelinedb"
pipeline.exec(s)

q = "
CREATE VIEW v WITH (action=materialize) AS
SELECT
  url,
  count(*) AS total_count,
  count(DISTINCT cookie) AS uniques,
  percentile_cont(0.99) WITHIN GROUP (ORDER BY latency) AS p99_latency
FROM page_views GROUP BY url"

pipeline.exec(q)

for n in 1..10000 do
  # 10 unique urls
  url = '/some/url/%d' % (n % 10)

  # 1000 unique cookies
  cookie = '%032d' % (n % 1000)

  # latency uniformly distributed between 1 and 100
  latency = rand(101)

  # NOTE: it would be much faster to batch these into a single INSERT
  # statement, but for simplicity's sake let's do one at a time
  pipeline.exec(
  "INSERT INTO page_views (url, cookie, latency) VALUES ('%s', '%s', %d)"
        % [url, cookie, latency])
end

# The output of a continuous view can be queried like any other table or view
rows = pipeline.exec('SELECT * FROM v ORDER BY url')

rows.each do |row|
  puts row
end

# Clean up
pipeline.exec('DROP VIEW v')

Java

For this example you’ll need to have JDBC installed and on your CLASSPATH.

import java.util.Properties;
import java.sql.*;

public class Example {

  static final String HOST = "localhost";
  static final String DATABASE = "test";
  static final String USER = "user";

  public static void main(String[] args) throws SQLException {

    // Connect to "test" database on port 5432
    String url = "jdbc:postgresql://" + HOST + ":5432/" + DATABASE;
    ResultSet  rs;
    Properties props = new Properties();

    props.setProperty("user", USER);
    Connection conn = DriverManager.getConnection(url, props);

    Statement stmt = conn.createStatement();
    stmt.executeUpdate(
      "CREATE FOREIGN TABLE stream (x integer) SERVER pipelinedb");
    stmt.executeUpdate(
      "CREATE VIEW v WITH (action=materialize) AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x");

    for (int i=0; i<100000; i++)
    {
      // 10 unique groupings
      int x = i % 10;

      // INSERT INTO stream (x) VALUES (x)
      stmt.addBatch("INSERT INTO stream (x) VALUES (" + Integer.toString(x) + ")");
    }

    stmt.executeBatch();

    rs = stmt.executeQuery("SELECT * FROM v");
    while (rs.next())
    {
      int id = rs.getInt("x");
      int count = rs.getInt("count");

      System.out.println(id + " = " + count);
    }

    // Clean up
    stmt.executeUpdate("DROP VIEW v");
    conn.close();
  }
}