Continuous Transforms

Continuous transforms can be used to continuously transform incoming time-series data without storing it. Since no data is stored, continuous transforms don’t support aggregations. The result of the transformation can be piped into another stream or written to an external data store.

Creating Continuous Transforms

Transforms are defined as PostgreSQL views with the action parameter set to transform. Here’s the syntax for creating a continuous transform:

CREATE VIEW name (WITH action=transform [, outputfunc=function_name( arguments ) ]) AS query

query is a subset of a PostgreSQL SELECT statement:

SELECT expression [ [ AS ] output_name ] [, ...]
    [ FROM from_item [, ...] ]
    [ WHERE condition ]
    [ GROUP BY expression [, ...] ]

where any expression in the SELECT statement can't contain an aggregate and
from_item can be one of:

    stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    from_item [ NATURAL ] join_type from_item [ ON join_condition ]

function_name is an optional user-supplied function that is declared as taking no arguments and returning type trigger, which is executed for every single row that is output by the continuous transform.

arguments is an optional comma-separated list of arguments to be provided to the function when the trigger is executed. Arguments can only be literal string constants.

Note

You can think of continuous transforms as being triggers on top of incoming streaming data where the trigger function is executed for each new row output by the continuous transform. Internally the function is executed as an AFTER INSERT FOR EACH ROW trigger so there is no OLD row and the NEW row contains the row output by the continuous tranform.

Dropping Continuous Transforms

To DROP a continuous transform from the system, use the DROP VIEW command. Its syntax is simple:

DROP VIEW continuous_transform;

This will remove the continuous transform from the system along with all of its associated resources.

Viewing Continuous Transforms

To view the continuous transforms and their definitions currently in the system, you can run the following query:

SELECT * FROM pipelinedb.transforms;

Continuous Transform Output Streams

All continuous transforms have Output Streams associated with them, making it easy for other transforms or continuous views to read from them. A continuous transform’s output stream simply contains whatever rows the transform selects.

For example, here’s a simple transform that joins incoming rows with a table:

CREATE VIEW t WITH (action=transform) AS
  SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;

This transform now writes values from the joined table out to its output stream, which can be read using output_of:

CREATE VIEW v WITH (action=materialize) AS
  SELECT sum(y) FROM output_of('t');

Built-in Transform Output Functions

In order to provide more flexibility over a continuous transform’s output than their built-in output streams provide, PipelineDB exposes an interface to receive a transform’s rows using a trigger function. Trigger functions attached to tranforms can then do whatever you’d like with the rows they receive, including write out to other streams.

Currently, PipelineDB provides only one built-in trigger function, pipelinedb.insert_into_stream, that can be used with continuous transforms. It inserts the output of the continuous transform into all the streams that are provided as the string literal arguments. For example:

CREATE VIEW t WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('even_stream)) AS
  SELECT x, y FROM stream WHERE mod(x, 2) = 0;

This continuous transform will insert all values of (x, y) into even_stream where x is even.

Important

All arguments to pipelinedb.insert_into_stream must be valid names of streams that already exist in the system, otherwise an error will be thrown.

Creating Your Own Output Function

You can also create your own output function that can be used with continuous transforms. For example, if you want to insert the output into a table, you could do something like:

CREATE TABLE t (user text, value int);

CREATE OR REPLACE FUNCTION insert_into_t()
  RETURNS trigger AS
  $$
  BEGIN
    INSERT INTO t (user, value) VALUES (NEW.user, NEW.value);
    RETURN NEW;
  END;
  $$
  LANGUAGE plpgsql;

CREATE VIEW ct WITH (action=transform, outputfunc=insert_into_t) AS
  SELECT user::text, value::int FROM stream WHERE value > 100;