Clients¶
Since PipelineDB runs as an extension of PostgreSQL 10.1+ and 11.0+, it doesn’t have its own client libraries. Instead, any client that works with PostgreSQL (or any SQL database for that matter) will work with PipelineDB.
Here you’ll find examples of a simple PipelineDB application written in a few different languages and clients. The application simply creates the continuous view:
CREATE VIEW continuous view WITH (action=materialize) AS
SELECT x::integer, COUNT(*) FROM stream GROUP BY x;
The application then emits 100,000
events resulting in 10
unique groupings for the continuous view, and prints out the results.
Python¶
For this example in Python, you’ll need to have psycopg2 installed.
import psycopg2
conn = psycopg2.connect('dbname=test user=user host=localhost port=5432')
pipeline = conn.cursor()
create_stream = """
CREATE FOREIGN TABLE stream (x integer) SERVER pipelinedb
"""
pipeline.execute(create_stream)
create_cv = """
CREATE VIEW continuous_view WITH (action=materialize) AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x
"""
pipeline.execute(create_cv)
conn.commit()
rows = []
for n in range(100000):
# 10 unique groupings
x = n % 10
rows.append({'x': x})
# Now write the rows to the stream
pipeline.executemany('INSERT INTO stream (x) VALUES (%(x)s)', rows)
# Now read the results
pipeline.execute('SELECT * FROM continuous_view')
rows = pipeline.fetchall()
for row in rows:
x, count = row
print x, count
pipeline.execute('DROP VIEW continuous_view')
pipeline.close()
Ruby¶
This example in Ruby uses the pg gem.
require 'pg'
pipeline = PGconn.connect("dbname='test' user='user' host='localhost' port=5432")
# This continuous view will perform 3 aggregations on page view traffic, grouped by url:
#
# total_count - count the number of total page views for each url
# uniques - count the number of unique users for each url
# p99_latency - determine the 99th-percentile latency for each url
s = "
CREATE FOREIGN TABLE page_views (
url text,
cookie text,
latency integer
) SERVER pipelinedb"
pipeline.exec(s)
q = "
CREATE VIEW v WITH (action=materialize) AS
SELECT
url,
count(*) AS total_count,
count(DISTINCT cookie) AS uniques,
percentile_cont(0.99) WITHIN GROUP (ORDER BY latency) AS p99_latency
FROM page_views GROUP BY url"
pipeline.exec(q)
for n in 1..10000 do
# 10 unique urls
url = '/some/url/%d' % (n % 10)
# 1000 unique cookies
cookie = '%032d' % (n % 1000)
# latency uniformly distributed between 1 and 100
latency = rand(101)
# NOTE: it would be much faster to batch these into a single INSERT
# statement, but for simplicity's sake let's do one at a time
pipeline.exec(
"INSERT INTO page_views (url, cookie, latency) VALUES ('%s', '%s', %d)"
% [url, cookie, latency])
end
# The output of a continuous view can be queried like any other table or view
rows = pipeline.exec('SELECT * FROM v ORDER BY url')
rows.each do |row|
puts row
end
# Clean up
pipeline.exec('DROP VIEW v')
Java¶
For this example you’ll need to have JDBC installed and on your CLASSPATH
.
import java.util.Properties;
import java.sql.*;
public class Example {
static final String HOST = "localhost";
static final String DATABASE = "test";
static final String USER = "user";
public static void main(String[] args) throws SQLException {
// Connect to "test" database on port 5432
String url = "jdbc:postgresql://" + HOST + ":5432/" + DATABASE;
ResultSet rs;
Properties props = new Properties();
props.setProperty("user", USER);
Connection conn = DriverManager.getConnection(url, props);
Statement stmt = conn.createStatement();
stmt.executeUpdate(
"CREATE FOREIGN TABLE stream (x integer) SERVER pipelinedb");
stmt.executeUpdate(
"CREATE VIEW v WITH (action=materialize) AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x");
for (int i=0; i<100000; i++)
{
// 10 unique groupings
int x = i % 10;
// INSERT INTO stream (x) VALUES (x)
stmt.addBatch("INSERT INTO stream (x) VALUES (" + Integer.toString(x) + ")");
}
stmt.executeBatch();
rs = stmt.executeQuery("SELECT * FROM v");
while (rs.next())
{
int id = rs.getInt("x");
int count = rs.getInt("count");
System.out.println(id + " = " + count);
}
// Clean up
stmt.executeUpdate("DROP VIEW v");
conn.close();
}
}