Continuous transforms can be used to continuously transform incoming time-series data without storing it. Since no data is stored, continuous transforms don’t support aggregations. The result of the transformation can be piped into another stream or written to an external data store.
Creating Continuous Transforms¶
Transforms are defined as PostgreSQL views with the
action parameter set to
transform. Here’s the syntax for creating a continuous transform:
CREATE VIEW name (WITH action=transform [, outputfunc=function_name( arguments ) ]) AS query
query is a subset of a PostgreSQL
SELECT expression [ [ AS ] output_name ] [, ...] [ FROM from_item [, ...] ] [ WHERE condition ] [ GROUP BY expression [, ...] ] where any expression in the SELECT statement can't contain an aggregate and from_item can be one of: stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ] table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ] from_item [ NATURAL ] join_type from_item [ ON join_condition ]
function_name is an optional user-supplied function that is declared as taking no arguments and returning type
trigger, which is executed for every single row that is output by the continuous transform.
arguments is an optional comma-separated list of arguments to be provided to the function when the trigger is executed. Arguments can only be literal string constants.
You can think of continuous transforms as being triggers on top of incoming streaming data where the trigger function is executed for each new row output by the continuous transform. Internally the function is executed as an
AFTER INSERT FOR EACH ROW trigger so there is no
OLD row and the
NEW row contains the row output by the continuous tranform.
Dropping Continuous Transforms¶
DROP a continuous transform from the system, use the
DROP VIEW command. Its syntax is simple:
DROP VIEW continuous_transform;
This will remove the continuous transform from the system along with all of its associated resources.
Viewing Continuous Transforms¶
To view the continuous transforms and their definitions currently in the system, you can run the following query:
SELECT * FROM pipelinedb.transforms;
Continuous Transform Output Streams¶
All continuous transforms have Output Streams associated with them, making it easy for other transforms or continuous views to read from them. A continuous transform’s output stream simply contains whatever rows the transform selects.
For example, here’s a simple transform that joins incoming rows with a table:
CREATE VIEW t WITH (action=transform) AS SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;
This transform now writes values from the joined table out to its output stream, which can be read using
CREATE VIEW v WITH (action=materialize) AS SELECT sum(y) FROM output_of('t');
Built-in Transform Output Functions¶
In order to provide more flexibility over a continuous transform’s output than their built-in output streams provide, PipelineDB exposes an interface to receive a transform’s rows using a trigger function. Trigger functions attached to tranforms can then do whatever you’d like with the rows they receive, including write out to other streams.
Currently, PipelineDB provides only one built-in trigger function,
pipelinedb.insert_into_stream, that can be used with continuous transforms. It inserts the output of the continuous transform into all the streams that are provided as the string literal arguments. For example:
CREATE VIEW t WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('even_stream)) AS SELECT x, y FROM stream WHERE mod(x, 2) = 0;
This continuous transform will insert all values of
(x, y) into
x is even.
All arguments to
pipelinedb.insert_into_stream must be valid names of streams that already exist in the system, otherwise an error will be thrown.
Creating Your Own Output Function¶
You can also create your own output function that can be used with continuous transforms. For example, if you want to insert the output into a table, you could do something like:
CREATE TABLE t (user text, value int); CREATE OR REPLACE FUNCTION insert_into_t() RETURNS trigger AS $$ BEGIN INSERT INTO t (user, value) VALUES (NEW.user, NEW.value); RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE VIEW ct WITH (action=transform, outputfunc=insert_into_t) AS SELECT user::text, value::int FROM stream WHERE value > 100;