Pumps¶
Pumps are responsible for “pumping” data through the pipe by reading entities from a source and writing them into a sink. The pump is also responsible for retrying failed writes of entities and logging its activity. It can also write ultimately failed entities to a “dead letter” dataset for manual inspection. Pumps log their execution history in a internal dataset with the id “system:pump_execution:<pipe_id>”. See the chapter on the pump execution dataset for more details about the contents of this dataset.
See also the feature description of scheduling and signalling.
Pipes can be scheduled to run at a specific interval or at specific times. See the schedule_interval and cron_expression properties below. Signalling can also schedule the pipe to run immediately.
Note
Note that signalling is not enabled for all pipes and that setting schedule_interval
or cron_expression
can effectively disable signalling.
Note
If a pipe is scheduled to run on a cron-defined schedule or on a long scheduled interval (i.e. using an interval more than an hour long) then the scheduled run start time will be persisted. This means that that if the service is unable to run the pipe at the pre-scheduled time, it will try to run it as soon as possible when it’s able.
Prototype¶
{
"comment": "This is a comment",
"schedule_interval": 30,
"cron_expression": "* * * * *",
"rescan_run_count": 10,
"rescan_cron_expression": "* * * * *",
"partial_rescan_run_count": 5,
"partial_rescan_delta": 3600,
"run_at_startup": false,
"run_at_startup_if_not_populated": false,
"max_read_retries": 0,
"read_retry_delay": 0,
"write_retry_delay": 0,
"max_retries_per_entity": 5,
"max_consecutive_write_errors": 1,
"max_write_errors_in_retry_dataset": 0,
"fallback_to_single_entities_on_batch_fail": true,
"dead_letter_dataset": "some-dataset-id",
"track_dead_letters": false,
"mode": "scheduled",
"log_events_noop_runs": false,
"log_events_noop_runs_changes_only": true,
"notification_granularity": 99999999999
}
Properties¶
Note: A pump configuration needs to have either a schedule_interval
or a
cron_expression
property to govern when the pump should be run. They are mutually exclusive with the
cron_expression
taking precedence if both are present. If neither property is set, the schedule_interval
will be set to a default value. For pipes with a dataset sink and a
dataset source the default will be 30 seconds +/- 1.5 seconds. For all other pipes, the default
will be 900 seconds +/- 45 seconds. It is good practice to always set the cron_expression
property
on pipes that reads from or writes to external systems.
If you are unfamiliar with cron expressions, you can read more of how they are formatted in the Cron Expressions document.
Property |
Type |
Description |
Default |
|
---|---|---|---|---|
|
String or list of strings |
A human readable comment on the pump (optional). |
||
|
Number |
The number of seconds between runs. It is mutually exclusive with the |
(see the note above) |
|
|
String |
A cron expression that indicates when the pump should run.
It is mutually exclusive with the |
||
|
Integer(>=1) |
The interval between each time the pump should do a complete rescan of the source of the pipe that the pump
is part of. It is mutually exclusive with the Examples:
"rescan_run_count": 1 => “rescan”, “rescan”, “rescan”, “rescan”"rescan_run_count": 2 => “incremental”, “rescan”, “incremental”, “rescan”"rescan_run_count": 3 => “incremental”, “incremental”, “rescan”, “incremental”, “incremental”, “rescan” |
||
|
String |
A cron expression that indicates when the pump should schedule a full rescan of the source of the pipe the pump
is part of. It is mutually exclusive with the |
||
|
Integer(>=1) |
The interval between each time the pump should do a partial rescan of the source of the pipe that the pump
is part of. It is mutually exclusive with the Examples:
"partial_rescan_run_count": 1 => “partial rescan”, “partial rescan”, “partial rescan”, “partial rescan”"partial_rescan_run_count": 2 => “incremental”, “partial rescan”, “incremental”, “partial rescan”"partial_rescan_run_count": 3 => “incremental”, “incremental”, “partial rescan”, “incremental”, “incremental”, “partial rescan” |
||
|
Integer(>=1) |
This specifies the delta to perform a partial rescan of. If the since value is an integer the value is substracted. Example: If the since value is If the since value is a timestamp then the value in seconds is subtracted. Example: If the since value is
|
||
|
Boolean |
A flag that indicates if the pump should run when Sesam starts up, in addition to the normal schedule
specified by the |
false |
|
|
Boolean |
A flag that indicates if the pump should run when Sesam starts up and the dataset is not populated. This is
in addition to the normal schedule specified by the |
false |
|
|
Boolean |
Deprecated. Use the |
||
|
String |
This is string that indicates which dataset to write any entities that fail retries if
|
||
|
Boolean |
A flag that indicates if the pump should delete any previously “dead letter” entities if a later version of it
is successfully written to the sink. It is only active if the |
false |
|
|
Integer |
A counter that indicates to the pump how many times it should retry when failing to read a entity from a source.
The default (0) means that it should not retry but log an error immediately when encountering read errors.
See also the |
0 |
|
|
Number |
A debug option to help track down slow to transform entities. If set, it will make the pipe fail if a batch
of entities uses more than the limit number of seconds (on average) to pass through the transform stage. It
will include the first entity of the batch in the |
||
|
Number |
How many seconds to wait before retrying after a read error (i.e. only if |
0 |
|
|
Number |
How many seconds to wait before retrying after a write error (i.e. only if |
0 |
|
|
Integer |
A counter that indicates to the pump how many times it should retry a failing entity when writing to a sink before
giving up on it, which in case it can optionally write it to the dataset referenced in |
5 |
|
|
Integer |
A counter that indicates to the pump how many consecutive write errors it tolerates before terminating the
current run. The default (1) means it will terminate after the first write error it encounters.
See also the |
1 |
|
|
Integer |
A counter that indicates to the pump how many write errors it accepts in its execution history dataset. If the number of
retryable and not “dead” failed entities in the dataset exceeds this number, the pump will refuse to
write any more failed entities to the execution dataset and terminate, even if the |
0 |
|
|
Boolean |
A flag that controls if the pipes should attempt to process a single entity at a time if a batch
write operation fails. This can be useful to turn off if the cost of processing a single entity at a time
is prohibitively high. This single-entity-at-a-time fallback is on by default ( |
true |
|
|
String |
The mode of operation. Valid options are “ Pumps in Pumps in Pumps in |
“scheduled” |
|
|
Boolean |
A flag that controls if a “noop” (“no-operation”) pipe run should be logged in the pipe execution log or not. The default
value |
false |
|
|
Boolean |
A flag that controls what kind of metric is used to determine if a pipe run was a “noop” (“no-operation”) run or not.
The default setting |
true |
|
|
int |
This property lets the pipe “override” the |
true |
Example configuration¶
The outermost object would be your pipe configuration, which is omitted here for brevity:
A scheduled pump running every 30 seconds, no retries or dead letter dataset:
{
"pump": {
"schedule_interval": 30
}
}
A cron pump running every day at midnight with max 5 retries, maximum 100 retries in the execution log and a dead letter dataset. Also max ten consecutive write failures allowed:
{
"pump": {
"cron_expression": "0 0 * * *",
"max_retries_per_entity": 5,
"max_consecutive_write_errors": 10,
"max_write_errors_in_retry_dataset": 100,
"dead_letter_dataset": "mypipe-dead-letters"
}
}
A scheduled pump running every 30 seconds but do a full rescan of the source every full hour. No retries or dead letter datasets:
{
"pump": {
"schedule_interval": 30,
"rescan_cron_expression": "0 * * * *"
}
}
A scheduled pump running every 5 minutes from 14:00 and ending at 14:55, AND fire every 5 minutes starting at 18:00 and ending at 18:55, every day. No retries or dead letter datasets:
{
"pump": {
"cron_expression": "0/5 14,18 * * ?"
}
}
Rescans¶
Definition of terms:
- Incremental run:
This is what a pump does when it is started when the stored “last_seen” value is set to a non-empty value, i.e. the pipe will only process source-entities that has appeared after the previous run of the pipe. This is the most common way to run a pipe.
- Background rescan:
This is what a pump does when it is started by the rescan_cron_expression or rescan_run_count config-properties (or if it is manually started by the “start-rescan” pump-operation) and enable_background_rescan is set to
true
. It will process all the source-entities, and do deletion tracking when finished.Only pipes with a dataset sink support background rescans. This is because a rescan run needs a way to check that it isn’t overwriting newer entities from an incremental run, and only the dataset sink has the required functionality.
The rescan functionality is not enabled by default. To enable it, either set the pipe’s enable_background_rescan setting to
true
to enable rescans on that specific pipe, or set the service metadata property global_defaults.enable_background_rescan totrue
to enable rescans on all pipes.- Reset/Full run:
This is what a pump does when the user has explicitly reset the pipe. It will process all the source-entities, and do deletion tracking when finished.
The use-case for rescans is that the user wants new entities to flow through the pipe as quickly as possible, but the user also wants to reprocess all the source entities. The latter can be very time-consuming, and sometimes it is not an option to simply reset the pipe to reprocess everything, since that would prevent any new entities from flowing through the pipe until all the old entities have been processed.
Example: The pipe reads from a sql database-table that has an “last_modified_time”-column, but no “deleted” column; new and modified rows can be selected with a an appropriate sql-statement, but there is no way to query the sql database for deleted rows. In this case a rescan can be used to detect deleted rows, while incremental runs can be used to process new rows at the same time.
There are two different “flavors” of rescans:
The entities produced by the incremental runs are known to be correct. This is the case if the user has just changed the DTL of a pipe.
If one or more incremental run has been started while a rescan was in progress, the rescan will stop processing entities when it reaches the “last_seen” offset used by the first incremental run.
If no incremental run has been started, the rescan will proceed past the “last_seen” offset and start to update the stored “last_seen” value. It is not possible to start an incremental run if a rescan is running and it has already passed the “last_seen” offset.
The rescan will not overwrite any entities that have been written by an incremental run. At the end of the rescan, the recan will do deletion-tracking, but will not delete any entities that were output by the incremental run(s).
Caveats of doing rescan+incremental runs:
The order of the resulting entities can be different that it would be in a normal “reset”-run.
Since the rescan can’t overwrite entities that has been output by the incremental run, the pipe may not output all the versions of an entity that it would in a normal run. This can happen for instance if the pipe has a dataset source with the
include_previous_versions
property set to true; once the incremental run has output entity “A”, any older versions of “A” that is produces by the rescan will be ignored.
The entities produced by the incremental run may not be correct in all cases. This is the case if the pipe has a “merge”-source, and the user has changed the configuration of the merge-source.
In this case the incremental run will use the old version of the merge-source, which may produce erronous results. The entities from the incremental run will not be put into the sink’s seen-tracker. The incremental run will not overwrite any entities that have been produced by the rescan run.
Once the rescan finishes, any incremental run in progress will be stopped. The rescan will then process any entities that have appeared since the start of the rescan. Once that is done, the rescan will do deletion-tracking. This will delete any erronous entities that was emitted by the incremental run.
Caveats of doing rescan+incremental runs:
The order of the resulting entities can be different that it would be in a normal “reset”-run.
The output can temporarily contain erronous entities (produced by the incremental runs). Such entities will deleted once the rescan has finished.
Only one incremental run can be active at once, but once an incremental run has finished a new incremental run can be started. A rescan run can also be started while an incremental run is in progress.
The incremental runs will not do retries, since the rescan will reprocess any previously failed entities. The incremental runs will do dependency tracking.