Pipes¶
A pipe defines the flow of data from a source to a sink on some schedule as defined by the pump settings. Optionally, a pipe may define an ordered list of transforms that are applied to entities as they flow from the source to the sink. As the name implies, a pump “pumps” data in the form of entities from the source to the sink at regular or scheduled intervals. A chain of transforms can be placed in between the source and the sink, so that entities are transformed on their way to the sink.
The pipe configuration consists of a source, transform, sink and a pump.
Note that the forward slash character (“/
”) is not allowed in the pipe _id
property.
Prototype¶
The following JSON snippet shows the general form of a pipe definition.
{
"_id": "pipe-id",
"name": "Name of pipe",
"description": "This is a description of the pipe",
"comment": "This is a comment",
"type": "pipe",
"source": {
},
"transform": {
},
"sink": {
},
"pump": {
},
"metadata": {
}
}
Note that if no name
property is explicitly set for the source, sink or pump configurations one will be generated based on the name
of the pipe (i.e. the contents of this property postfixed with “source”, “sink” or “pump” respectively).
Batching¶
Pipes support batching if the sink supports batching. It does this by
accumulating source entities in a buffer before writing the batch to
transforms and the sink. The size of each batch can be specified using
the batch_size
property on the pipe. The default batch size
is usually 100, but this may vary depending on the source- and
sink-type used in the pipe. The REST sink will
for instance make the default batch_size 1.
Note that the sink may have its own batch_size
property. This is
useful if the pipe has transforms that produce more entities than the
number of entities taken as input.
Properties¶
Property |
Type |
Description |
Default |
Req |
---|---|---|---|---|
|
String |
The id of the pipe, this should be unique within a Sesam service instance. Note that you cannot use the |
Yes |
|
|
String |
The type of the component, for pipes the only allowed value is “pipe” |
Yes |
|
|
String |
A human readable name of the component. |
||
|
String or list of strings |
A human readable description of the component. |
||
|
String or list of strings |
A human readable comment on the component. |
||
|
Integer(>=1) |
The number of source entities to consume before writing to the sink. The batch size can be used to buffer up entities so that they can be written together to the sink in one go. The sink must support batch for the bulking to happen. This may increase the throughput of the pipe, at the cost of extra memory usage. If the batch fails, then entities will be retried individually. |
usually 100, but varies with other pipe settings. |
|
|
Integer(>=1) |
Specifies how often the pipe offset is saved. It says how many batches must be processed before the pipe offset is saved the next time. Note that the pipe offset is always saved at the end of the sync if it changed. The default value is 10000/ |
100 (1 if batch_size=1) |
|
|
Boolean |
If this flag is set to |
|
|
|
Boolean |
When set to true, enables running pipe rescans in the background for this pipe. |
|
|
|
Object |
A configuration object for the source component of the pipe. |
Yes |
|
|
Object/List |
Zero or more configuration objects for the transform components of the pipe. The default is to do no transformation of the entities. If a list of more than one transform components is given, then they are chained together in the order given. This means that the output of the first transform is passed as the input of the second, and so on. The output of the last transform is then passed to the sink. The first transform gets its input from the source. |
||
|
Object |
A configuration object for the sink component of the pipe. If omitted, it defaults to
a dataset sink with its |
||
|
Object |
A configuration object for the pump component of the pipe. |
||
|
Boolean |
Schema inferencing is enabled for all pipes by default. Setting this property to false will disable schema inferencing for this pipe. |
|
|
|
Integer |
The number of entities that dependency tracking can keep in memory at a given time. If this number is exceeded then a warning message is written to the log. The default value is inherited from the service metadata. |
|
|
|
Integer |
The number of entities that dependency tracking can keep in memory at a given time. If this number is exceeded then the pump will fail. The default value is inherited from the service metadata. Do not set this value too high as it may cause excessive memory usage. |
|
|
|
Integer |
The number of bytes that dependency tracking can keep in memory at a given time. If this number is exceeded then a warning message is written to the log. The default value is inherited from the service metadata. |
|
|
|
Integer |
The number of bytes that dependency tracking can keep in memory at a given time. If this number is exceeded then the pump will fail. The default value is inherited from the service metadata. Do not set this value too high as it may cause excessive memory usage. |
|
|
|
Boolean |
If |
|
Namespaces¶
Namespaces can be used by entity identifiers, entity property names and the namespaced identifier datatype. A namespaced identifier consists of two parts; a namespace and an identifier. The namespace part can consist of any character, including colons. The identifier part can consist of any character except colons (:
).
Example of an entity with namespaces:
{
"_id": "user:123",
"user:username": "erica",
"user:first_name": "Erica",
"user:manager": "~:user:101"
}
Note
Namespaced identifiers can be enabled by setting the
namespaced_identifiers
property to true
in the pipe
configuration (see below) or the service metadata. The former
enables it for just the one pipe. The latter enables it for all
pipes - except for those pipes that have explicitly disabled it.
Note
Some of the DTL functions are namespace aware and they will behave slightly differently when namespaces are enabled. See the section on namespaces in the DTL reference guide for more details.
Properties¶
Property |
Type |
Description |
Default |
Req |
---|---|---|---|---|
|
Boolean |
Flag used to enable namespaced identifers support on the pipe. The default value is read from the service metadata. If not specified in the service metadata then the default value is |
Service metadata default |
|
|
String |
The namespace used for identifiers. The default value is the pipe’s id. |
|
|
|
String |
The namespace used for properties. The default value is the pipe’s id. |
|
|
|
Boolean |
If If |
Source default |
|
|
Boolean |
If If |
Sink default |
Compaction¶
Compaction deletes the oldest entities in a dataset and reclaims space for those entities in the dataset’s indexes.
Datasets that are written to by pipes using the dataset sink are compacted incrementally as
the pipe writes new entities to the dataset by default (compaction type “sink” enabled). If sink compaction is disabled,
the dataset is automatically compacted once every 24 hours (compaction type “background” in the global settings or
compaction.sink set to false
). The default is to keep the last two versions of every
entity up until the current time.
Note
Compaction will only be performed up to the lowest offset for which there exists a pipe doing dependency tracking on the dataset. Each pipe doing dependency tracking keeps a tracking offset on the dataset so that it knows which entities to perform dependency tracking for. It is this tracking offset that compaction cannot go beyond. This is done so that those pipes should not fall out of sync. If the compaction did not hold off then we could not guarantee that the output of those pipes are correct.
Be aware that disabled pipes also hold off compaction. If the pipes are to be disabled for a long time then it is better to remove the pipe, or alternatively comment out the hops.
Properties¶
Property |
Type |
Description |
Default |
Req |
---|---|---|---|---|
|
Boolean |
If |
|
No |
|
Boolean |
If |
|
No |
|
Integer |
The number of unique versions of an entity to keep around. The default is Warning A value less than |
|
No |
|
Integer |
Specifies the threshold for how old entities must be before they are considered for compaction. This property is usually used when you want to keep entities around for a certain time. |
|
No |
|
Integer |
Same as |
|
No |
|
Float |
The growth factor required for the automatically scheduled compaction to kick
in. Uses the minimum value of |
|
No |
|
Float |
Specifies the sink compaction interval. If this value is zero, sink compaction will run every time
the pipe runs. If it is larger than zero, sink compaction will only run if at least
|
|
No |
Circuit breakers¶
A circuit breaker is a safety mechanism that one can enable on the dataset sink. The circuit breaker will trip if the number of entities written to a dataset in a pipe run exceeds a certain configurable limit.
Note that a circuit breaker is only activated if the sink dataset is populated. In practice this means that the pipe must have ran to completion at least once. This is to avoid tripping it on the initial sync.
A tripped circuit breaker will prevent the pipe from running. It can either be rolled back or committed. Rolling it back will delete any entities that were written in the pipe run before the circuit breaker was tripped. Committing it will expose the uncommitted entities. Both operations resets the circuit breaker so that pipe can run again.
Compaction will not be performed on datasets with a tripped circuit breaker. It is also not possible to repost entities to these datasets.
You can rollback or commit the circuit breaker on the dataset page in the Management Studio, or use the service API.
Resetting¶
When the configuration of a pipe is modified in such a way that the entities the pipe produces changes (for instance by changing the DTL transform of the pipe), the pipe’s “last-seen” value must be cleared in order to reprocess already seen entities with the new pipe configuration.
This can be done by setting the “last-seen” value to an empty string with the update-last-seen operation in the Service API.
Automatic reprocessing¶
Datasets that are input to a pipe or datasets that are hop-ed to by a pipe may be deleted. When this happens the data output by a pipe is no longer in sync with the input data. By default a pipe will not reset automatically if this happens, but it will maintain a list of datasets that are out of sync. Alternatively one can set the reprocessing policy to automatic
so that such resets happen automatically.
Properties¶
Property |
Type |
Description |
Default |
Req |
---|---|---|---|---|
|
Enum<String> |
Specifies the policy that the pipe uses to decide if a pipe needs to be reset or not.
|
|
No |
Completeness¶
When a pipe completes a successful run the sink dataset will inherit the smallest completeness timestamp value of the source datasets and the related datasets. Inbound pipes will use the current time as the completeness timestamp value (the http_endpoint can optionally get the completeness value from a request header). This mechanism has been introduced so that a pipe can hold off processing source entities that are more recent than the source dataset’s completeness timestamp value. The propagation of these timestamp values is done automatically. Individual datasets can be excluded from completeness timestamp calculation via the exclude_completeness
property on the pipe. One can enable the completeness filtering feature on a pipe by setting the completeness
property on the dataset source to true
.
Warning
Completeness is implicitly incompatible with full rescans as they do not necessarily expose all the latest entities. This means that if deletion tracking is performed by the pipe that has completeness set to true
then the non-covered entity ids will get deleted from the sink dataset. This may or may not be a problem depending on the use-case. Deletion tracking is only performed by pipes with dataset
sinks currently. Set deletion_tracking
to false
on the dataset
sink if you do not want deletion tracking to be performed.
Properties¶
Property |
Type |
Description |
Default |
Req |
---|---|---|---|---|
|
List<String> |
A list of dataset ids that should not contribute to the completeness timestamp value. Any dataset listed in this property will be ignored when calculating the dataset sink completeness timestamp value. Note If all datasets are excluded a new completeness timestamp value will be generated in this pipe. |
|
No |
|
List<String> |
A list of dataset ids that should contribute to the completeness timestamp value. All
datasets listed in this property will be used when calculating the dataset sink
completeness timestamp value. If this property is not specified, it defaults to a list of all the datasets in the
pipe’s source and transforms, with the exception of datasets that are also specified in Note If both |
No |
Metadata¶
Pipe metadata can be used to annotate a pipe in various user-defined ways. Some keys (documented below) are reserved for internal use, but otherwise the users are free to add their own metadata settings.
Properties¶
Property |
Type |
Description |
Default |
---|---|---|---|
|
Boolean |
When set to true, this pipe will store its state and data on a high-durability disk. This makes the pipe more resilient to data-loss, but will also incur an additional cost, see Durable Data for more details. |
|
Example configuration¶
The following example shows a pipe definition that exposes data from a SQL database table called customers
, and feeds it into a sink that writes the data into a dataset called Northwind:Customers
.
{
"_id": "northwind-customers",
"name": "Northwind customers",
"type": "pipe",
"source": {
"type": "sql",
"system": "Northwind",
"table": "Customers"
},
"sink": {
"type": "dataset",
"dataset": "Northwind:Customers"
},
"pump": {
"schedule_interval": 3600
},
"compaction": {
"keep_versions": 2,
"time_threshold_hours": 48
}
}