Service Configuration Guide

Table of Contents

General

The Sesam service is configured using one or more JSON files. These configuration files can be imported through the service API, e.g. using the sesam command line client. They can also be created and edited using the Sesam Management Studio.

Conceptually, the configuration files contains definitions for Systems and Pipes.

The configuration is a JSON array of system and pipe configurations. The configuration entities are JSON objects of the form:

[
    {
        "_id": "some-solution-wide-unique-id",
        "name": "Name of component",
        "type": "component-type",
        "some-property": "some value"
    },
    {
        "_id": "some-other-solution-wide-unique-id",
        "name": "Name of other component",
        "type": "component-type",
        "some-other-property": "some other value"
    }
]

It should be noted that all _id property values must be unique across across the solution. This means unique within the sesam.conf.json file but also across all files when a multiple file configuration is used.

Environment variables

You can insert the values of environment variables into configuration using the syntax "$ENV(variable)" in place of property values. You can manage these environment variables using the Sesam client or using a HTTP client with the Environment Manager API.

An example, given a uploaded environment variable JSON file containing:

{
   "server-ip": "10.10.10.1"
}

You can refer to this property in your configuration by reference:

{
   "_id": "my-system",
   "type": "oracle",
   "host": "$ENV(server-ip)"
   ..
}

You can also compose a property that consists of several environment variables:

{
  "_id": "my-system",
  "type": "url",
  "base_url": "http://$ENV(my-domain):$ENV(my-port)",
  "..": ".."
}

Note that when using properties that contain multiple environment variables you cannot nest them inside each other, and the resulting property will always be a string.

You can combine environment variables and secrets, but they cannot be nested within each other. For secret variables see the Secrets manager API for details on how to upload them and their syntax.

Environment variables applies to both System and Pipe configuration entities.

Service metadata

There is an optional special configuration entity used to represent the service instance's metadata. The metadata is used to specify properties that apply to the service instance itself. This entity can either be added as a normal configuration entity, edited in the UI or updated with the Service API.

Example:

{
   "_id": "node",
   "type": "metadata",
   "namespaced_identifiers": true,
   "namespaces": {
      "default": {
        "example": "http://example.org/",
        "fifa": "http://www.fifa.com/"
      }
   }
}

Properties

Property Type Description Default Req
namespaced_identifiers Boolean Flag used to enable namespaced identifers support for the service as a whole. Pipes inherit the value of the namespaced_identifiers property less explictly overridden. false  
namespaces.default Dict

A dictionary of namespace to URI expansions. This expansion mapping is used to expand namespaced identifiers into fully qualified URIs, e.g. by those components that provide RDF support.

A few expansion mappings come built-into the system. These are always available unless explicity overridden:

"_": "http://example.org/",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"owl": "http://www.w3.org/2002/07/owl#",
"foaf": "http://xmlns.com/foaf/0.1/",
"wgs84": "http://www.w3.org/2003/01/geo/wgs84_pos#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"dc": "http://purl.org/dc/elements/1.1/",
"skos": "http://www.w3.org/2004/02/skos/core#",
"dcterms": "http://purl.org/dc/terms/",
"gs": "http://www.opengis.net/ont/geosparql#",
   

Pipes

A pipe defines the flow of data from a source to a sink on some schedule as defined by the pump settings. Optionally, a pipe may define an ordered list of transforms that are applied to entities as they flow from the source to the sink. As the name implies, a pump "pumps" data in the form of entities from the source to the sink at regular or scheduled intervals. A chain of transforms can be placed in between the source and the sink, so that entities are transformed on their way to the sink.

The pipe configuration consists of a source, transform, sink and a pump.

The configuration of a pipe has two forms; one complete form and one short hand form. The complete form is described first and we will later revisit pipes and look at an additional short hand form.

Note that the forward slash character ("/") is not allowed in the pipe _id property.

Prototype

The following json snippet shows the general form of a pipe definition.

{
    "_id": "pipe-id",
    "name": "Name of pipe",
    "type": "pipe",
    "short_config": "sql://system/table",
    "source": {
    },
    "transform": {
    },
    "sink": {
    },
    "pump": {
    }
}

Note that if no name property is explicitly set for the source, sink or pump configurations one will be generated based on the name of the pipe (i.e. the contents of this property postfixed with "source", "sink" or "pump" respectively).

Batching

Pipes support batching if the sink supports batching. It does this by accumulating source entities in a buffer before writing the batch to transforms and the sink. The size of each batch can be specified using the batch_size property on the pipe. The default batch size is 100.

Note that the sink may have its own batch_size property. This is useful if the pipe has transforms that produce more entities than the number of entities taken as input.

Properties

Property Type Description Default Req
_id String The id of the pipe, this should be unique within a Sesam service instance. Note that you cannot use the / character in the id property.   Yes
name String A human readable name of the component.   Yes
type String The type of the component, for pipes the only allowed value is "pipe"   Yes
short_config String A connection string-like short form of the configuration, see the pipes revisited for more information on the format of this property.    
batch_size Integer The number of source entities to consume before writing to the sink. The batch size can be used to buffer up entities so that they can be written together to the sink in one go. The sink must support batch for the bulking to happen. This may increase the throughput of the pipe, at the cost of a little extra memory usage. If the batch fails, then entities will be retried individually. The pipe offset will be saved after each batch if the source supports this. 100  
disable_set_last_seen Boolean If this flag is set to true, it will no longer be possible to reset or set the 'last seen' parameter for this pipe. The primary use case for this property is when you need to protect the pipe from accidental resets. false  
source Object A configuration object for the source component of the pipe. It can be omitted if short_config is present and contains enough information to infer the source configuration. See the pipes revisited for more information about how the source configuration is inferred in this case.    
transform Object/List Zero or more configuration objects for the transform components of the pipe. The default is to do no transformation of the entities. If a list of more than one transform components is given, then they are chained together in the order given. This means that the output of the first transform is passed as the input of the second, and so on. The output of the last transform is then passed to the sink. The first transform gets its input from the source.    
sink Object A configuration object for the sink component of the pipe. If omitted, it defaults to a dataset sink with its dataset property set to same as the pipe's _id property.    
pump Object A configuration object for the pump component of the pipe.    

Namespaces

Namespaces can be used by entity identifiers, entity property names and the namespaced identifier datatype. A namespaced identifier consists of two parts; a namespace and an identifier. The namespace part can consist of any character, including colons. The identifier part can consist of any character except colons (:).

Example of an entity with namespaces:

{
  "_id": "users:123",
  "user:username": "erica",
  "user:first_name": "Erica",
  "user:manager": "~:users:101"
}

Note

Namespaced identifiers can be enabled by setting the namespaced_identifiers property to true in the pipe configuration (see below) or the service metadata. The former enables it for just the one pipe. The latter enables it for all pipes - except for those pipes that have explicitly disabled it.

Note

Some of the DTL functions are namespace aware and they will behave slightly differently when namespaces are enabled. See the section on namespaces in the DTL reference guide for more details.

Properties

Property Type Description Default Req
namespaced_identifiers Boolean Flag used to enable namespaced identifers support on the pipe. The default value is read from the service metadata. If not specified in the service metadata then the default value is false. Service metadata default  
namespaces.identity String The namespace used for identifiers. The default value is the pipe's id. pipe.id  
namespaces.property String The namespace used for properties. The default value is the pipe's id. pipe.id  
add_namespaces Boolean

If true then the current identity namespace will be added to _id and the current property namespace will be added to all properties. The namespaces are added before the first transform. This property is normally only specified on input pipes.

If namespaced_identifiers is enabled in the service metadata then the source default value is used. The following sources has a default value of true: csv, ldap, sql, embedded, http_endpoint, and json.

Source default  
remove_namespaces Boolean

If true then namespaces will be removed from _id, properties and namespaced identifier values. The namespaces are removed after the last transform. This property is normally only specified on output pipes.

If namespaced_identifiers is enabled in the service metadata then the sink default value is used. The following sinks has a default value of true: csv_endpoint, elasticsearch, mail, rest, sms, solr, sql, http_endpoint, and json.

Sink default  

Compaction

Compaction deletes the oldest entities in a dataset and reclaims space for those entities in the dataset's indexes.

Datasets that are written to by pipes using the dataset sink are automatically compacted once every 24 hours. The default is to keep the last two versions of every entity up until the current time.

Properties

Property Type Description Default Req
compaction.automatic Boolean If true then the dataset is a candidate for automatic compaction. true No
compaction.keep_versions Integer The number of unique versions of an entity to keep around. The default is 2, which is the minimum value allowed. 2 No
compaction.time_threshold_hours Integer Specifies the threshold for how old entities must be before they are considered for compaction. This property is usually used when you want to keep entities around for a certain time. null No
compaction.time_threshold_hours_pump Integer Same as compaction.time_threshold_hours, but applies to the pipe's pump execution dataset. Pump execution datasets are always trimmed by time. The default is 30 days, which is the minimum value allowed. 720 No

Example configuration

The following example shows a pipe definition that exposes data from a SQL database table called customers, and feeds it into a sink that writes the data into a dataset called Northwind:Customers.

{
    "_id": "northwind-customers",
    "name": "Northwind customers",
    "type": "pipe",
    "source": {
        "type": "sql",
        "system": "Northwind",
        "table": "Customers"
    },
    "sink": {
        "type": "dataset",
        "dataset": "Northwind:Customers"
    },
    "pump": {
        "schedule_interval": 3600
    },
    "compaction": {
        "keep_versions": 2,
        "time_threshold_hours": 48
    }
}

Sources

Sources provide streams of entities as input to the pipes which is the building blocks for the data flows in Sesam. These entities can take any shape (i.e. they can also be nested), and have a single required property: _id. This _id field must be unique within a flow for a specific logical entity. There may exist multiple versions of this entity within a flow, however.

Continuation support

Sources can optionally support a since marker which lets them pick up where the previous stream of entities left off - like a "bookmark" in the entity stream. The since marker is opaque to the rest of the Sesam components and it is assumed to be interpretable only by the source. Within an entity the marker is carried in the _updated property if supported by its source.

Sesam supports a diverse set of core data sources. They are all described below.

There are three characteristics that describe continuation support. All sources have these and there are three properties available to describe them. The properties can be fixed, have a default value or be calculated from other properties (aka dynamic) on the source. The table below explains them in detail.

Note

It is important that you do not to set any of these properties to true unless the source actually have these characteristics. Doing so can mean that the pump is not able track changes properly.

Property Description
supports_since

Does the source make use of the 'since' parameter if it gets passed one?

This property is typically used to disable the tracking of the since marker. Sometimes it is not necessary to perform the tracking as the source won't make use of it anyway.

Note

If you set supports_since to true then you should also make sure that you set either is_since_comparable to true or is_chronological to true — or both depending on the strategy you want.

is_since_comparable

Can you compare two _updated values using lexical/bytewise comparison and decide their relative order?

This property is used to specify if the values of two entities's _updated properties are always comparable. If the property can contain values of different types or structures, then it may not be possible to use lexical/bytewise comparison of the two values to decide order.

Note

If you set is_since_comparable to true then you should also make sure that supports_since is set to true.

is_chronological

Does the source hand out entities in chronological order, i.e. in increasing order?

If the entities are sorted in chronological other, then the pump can shift its since marker for each new entity in the stream. It can also store it away more often. This is a good characteristic to have as it makes the source able to continue where it left off even though the previous run did not complete fully. If the property is set to false then it can only know at the end of the run what the new since marker is.

Note

If you set is_chronological to true then you should also make sure that supports_since is set to true and is_since_comparable is set to true.

The strategy for tracking the since marker is chosen like this — and in this specific order:

  1. If supports_since is true and is_since_comparable is true and is_chronological is true then continuation support is enabled and the chronological strategy is chosen. This strategy will store _updated values in the order we see them.
  2. If supports_since is true and is_since_comparable is true then continuation support is enabled and the max strategy is chosen. This strategy will store the maximum _updated value seen in the run.
  3. If none of the above apply, then continuation support is disabled. No tracking of the since marker is then done.

If continuation support is enabled for a pipe then the since marker is stored in the last-seen property on the pump. Note that one can use the pump's update-last-seen operation in the Service API to update or reset the last-seen value manually. This is useful in cases where one wants to reprocess the data from scratch for some reason. The Service API can also tell you what the current last-seen value is.

The dataset source

The dataset source is one of the most commonly used sources in a Sesam installation. It simply presents a stream of entities from a dataset stored in Sesam. Its configuration is very simple and looks like:

Prototype

{
    "type": "dataset",
    "dataset": "id-of-dataset",
    "include_previous_versions": false,
    "include_replaced": true
}

Properties

Property Type Description Default Req
dataset String
A dataset id
  Yes
include_previous_versions Boolean If set to false, the dataset source will only return the latest version of any entity for any unique _id value in the dataset. This is the default behaviour. false  
include_replaced Boolean If set to false, the dataset source will filter out entities where the $replaced property is true. This typically used when reading from datasets that have been produced by the merge source. true  

Continuation support

See the section on continuation support for more information.

Property Value
supports_since true (Fixed)
is_since_comparable true (Fixed)
is_chronological true (Fixed)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "source": {
        "type": "dataset",
        "dataset": "northwind:customers",
        "include_previous_versions": true
    }
}

The merge source (Experimental)

The merge source is a source that is able to infer the sameness of entities across multiple datasets. The source uses a set of equality rules to figure out which entities are the same. Equality is resolved transitively, so if A is the same as B and B is the same as C then A, B and C are all considered the same.

Prototype

{
    "type": "merge",
    "datasets": ["one d1", "two d2", "three d3"],
    "equality": [
         ["eq", "d1.field1", "d2.field1"],
         ["eq", "d2.field2", "d3.field2"]
    ]
}

Properties

Property Type Description Default Req
datasets List<String{>=1}> A list of one or more datasets that are to be merged. Each item in this list is a pair of dataset id and dataset alias. A given dataset can only appear once in this list. The syntax is the same as in the datasets property in hops.   Yes
equality List<EqFunctions{>=0}> A list of zero or more eq functions that are to be used to decide which entities are the same. The functions must follow the rules for joins in DTL.   No

Continuation support

See the section on continuation support for more information.

Property Value
supports_since true (Fixed)
is_since_comparable true (Fixed)
is_chronological true (Fixed)

Example configuration

Below you'll find three datasets A, B and C and a pipe configuration that uses the merge source.

Dataset A:

[
    {"_id": "a1", "f1": 1},
    {"_id": "a2", "f1": 2}
]

Dataset B:

[
    {"_id": "b1", "f1": 1, "f2": "x"},
    {"_id": "b2", "f1": 3}
]

Dataset C:

[
    {"_id": "c1", "f3": "X"},
    {"_id": "c2", "_deleted": true, "f3": "Y"},
    {"_id": "c3", "_deleted": true, "f3": "X"},
]

Pipe configuration:

{
    "_id": "result",
    "source": {
        "type": "merge",
        "datasets": ["A a", "B b", "C c"],
        "equality": [
            ["eq", "a.f1", "b.f1"],
            ["eq", "b.f2", ["lower", "c.f3"]],
        ]
    }
}

Given the above we should expect an output that looks like this:

[
    {"$ids": ["a1", "b1", "c1"], "_id": "0|a1|1|b1|2|c1", "_updated": 0,
     "f1": [1, 1], "f2": "x", "f3": "X"},
    {"$ids": ["a2"], "_id": "0|a2", "_updated": 1, "f1": 2},
    {"$ids": ["b2"], "_id": "1|b2", "_updated": 2, "f1": 3},
    {"$ids": ["c2"], "_deleted": true, "_id": "2|c2", "_updated": 3, "f3": "Y"},
    {"$ids": ["c3"], "_deleted": true, "_id": "2|c3", "_updated": 4, "f3": "X"}
]

Entities a1, b1 and c1 have been merged. Entities a2 and b2 did not match any other entities. Deleted entities, like c2 and c3, are never merged with any other entities.

The merged entities are combined so that the properties and their values are merged in the resulting entity. null values are kept intact. List values appear in a consistent order and may contain duplicate values.

The _updated property is a sequence number that increases every time a new entity is generated by the source. Entities appear in chronological order.

The _id property is a composite id that consists of the dataset offset and entity id joined by the | character. The dataset offset is the index of the dataset in the datasets property in the pipe configuration. The composite parts are ordered by dataset offset and entity in order to get consistent ids.

The $ids property contains all the original entity ids of the entities merged into the entity. Note that an entity id will not be added to this list if the original entity has the $ids property. Because of how properties are merged the $ids will end up being a union of all the orginal entity ids excluding the entity ids of the merge entities themselves. This is useful when merging already merged entities downstream.

Warning

Do not remove a dataset from the datasets property nor change the order of the datasets in the datasets property. Doing so may lead to inconsistent results. Adding or renaming datasets is OK though as this won't affect the order of the datasets. If you need to do this then you should reset the pipe and maybe also delete the target dataset.

The union datasets source

The union datasets source is similar to the dataset source, except it can process several datasets at once and keep track of each one in its since marker handler. The union datasets source reads its datasets in order, exhausting each one before moving to the next.

The entity _id property in entities is prefixed by the dataset id separated by the : character. This is done to prevent unwanted identity collisions. The entity id dave from the men dataset will end up with the id men:dave, and the entity id claire from the women dataset will end up with the id women:claire.

Prototype

{
    "type": "union_datasets",
    "datasets": ["id-of-dataset1", "id-of-dataset2"],
    "include_previous_versions": false
}

Properties

The configuration of this source is identical to the dataset source, except datasets can be a list of datasets ids.

Property Type Description Default Req
datasets List<String> A list of datasets ids.   Yes
include_previous_versions Boolean If set to false, the data source will only return the latest version of any entity for any unique _id value in the dataset. This is the default behaviour. false  

Continuation support

See the section on continuation support for more information.

Property Value
supports_since true (Fixed)
is_since_comparable true (Fixed)
is_chronological true (Fixed)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "source": {
        "type": "union_datasets",
        "datasets": ["northwind:customers", "northwind:orders"],
        "include_previous_versions": true
    }
}

The merge datasets source

The merge datasets source is similar to the dataset source, except it can process several datasets at once and keep track of each one in its since marker handler.

The merge datasets source reads its all of its datasets and returns entities ordered by their _ts field. It knows how to deal with identities, so that only the latest version of entities are returned.

Entity ids are not modified in any way.

Prototype

{
    "type": "merge_datasets",
    "datasets": ["id-of-dataset1", "id-of-dataset2"],
    "strategy": "latest"
 }

Properties

The configuration has two primary properties, datasets which must be a list of datasets ids and strategy for choosing the merge strategy.

Property Type Description Default Req
datasets List<String> A list of datasets ids.   Yes
strategy String

The name of the strategy to use to merge entities. Valid options are "latest" (the default) and "all".

The "latest" strategy returns the version of the entity with the newest timestamp (as given in the _ts field). It will return the entity from the dataset that contains the latest version. This strategy is useful when only the latest version of an entity among the given datasets are of interest.

The "all" strategy returns a merged version of the entity that contains all latest versions from all datasets. The individual dataset entities are keyed under the dataset id that they came from. The entities are ordered by the timestamp of the latest version of that entity. The returned entity contains all latest versions from all datasets where is appears. This strategy is useful when all datasets provide data for the resulting entity. In a lot of cases one may want to use it with a transform, so that only the entity can be shaped in a way that is more useful downstream.

"latest"  

Continuation support

See the section on continuation support for more information.

Property Value
supports_since true (Fixed)
is_since_comparable true (Fixed)
is_chronological true (Fixed)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "source": {
        "type": "merge_datasets",
        "datasets": ["products", "products-metadata"]
    }
}

The diff datasets source (Experimental)

The diff datasets source is similar to the merge dataset source, except that it also compares the entities from the datasets. The comparison produces a diff and filters out entities that are equal.

For each merged entity (same as the all strategy in merge dataset source) an additional $diff property is also generated. The diff contains the datasets and values for the properties that are not equal across all the datasets.

Entity ids are not modified in any way.

Prototype

{
    "type": "diff_datasets",
    "datasets": ["id-of-dataset1", "id-of-dataset2"]
 }

Properties

The configuration only requires the property datasets which must be a list of datasets ids.

Property Type Description Default Req
datasets List<String> A list of datasets ids.   Yes
whitelist List<String> The names of the properties to include in the comparison. If there is a blacklist also specified, the whitelist will be filtered against the contents of the blacklist.    
blacklist List<String> The names of the properties to exclude from the comparison. If there is a whitelist also specified, the blacklist operates on the values of the whitelist (and not the properties present in the entities).    
treat_lists_as_sets Boolean Flag to indicate if you want to ignore duplicates and ordering of lists in the entities you are comparing. This option also affects lists nested deeper inside the entity. false  
ignore_deletes Boolean

Flag to indicate if you want to ignore deleted entities during the comparison. By default there will be produced a difference if one of the datasets contains a deleted entity while the other datasets does not contain the deleted entity.

If true the deleted entities are treated as if they don't exist.

false  

Continuation support

See the section on continuation support for more information.

Property Value
supports_since true (Fixed)
is_since_comparable true (Fixed)
is_chronological true (Fixed)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "source": {
        "type": "diff_datasets",
        "datasets": ["product", "other-products"]
    }
}

Example result

{
    "_id": "some-product",
    "$diff": {
        "price": {
            "products": "price-from-products",
            "other-products": "price-from-other-products",
        }
    }
 }

The embedded source

This is a data source that lets you embed the data inside the configuration of the source. This is convenient when you have a small and static dataset. Do not use this source to hold a large number of entities.

Properties

Property Type Description Default Req
entities List<Entity> Contains the list of entities is to be served by the source.   Yes

Continuation support

See the section on continuation support for more information.

Property Value
supports_since false (Fixed)
is_since_comparable true (Fixed)
is_chronological false (Fixed)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

Example:

{
    "source": {
        "type": "embedded",
        "entities": [
            {"_id": "a", "title": "A"},
            {"_id": "b", "title": "B"},
            {"_id": "c", "title": "C"}
        ]
    }
}

The SQL source

The SQL database source is one of the most commonly used data sources. In short, it presents database relations (i.e. tables, views or queries) as a entity stream to Sesam.

The SQL source has several options, all of which are presented below with their default values:

Prototype

{
    "system": "id-of-system",
    "table": "name-of-table",
    "primary_key": ["list","of","key","names"],
    "query": "SQL query string",
    "updated_query": "SQL query string for 'since' support in queries",
    "updated_column": "column-name-for-since-support-in-tables",
    "whitelist": ["columns","to","include"],
    "blacklist": ["columns","to","exclude"],
    "fetch_size": 1000,
    "preserve_null_values": false,
    "schema": "default-schema-name-if-included"
}

Column types

See the supported column types list for a overview of which RDBMS column types are supported and how they are mapped to Sesam types. Note that if your table or query property refer to relations with unsupported column types, you will either have to use the blacklist configuration property to ignore them, or write a custom query that coerces the non-supported column to a supported type.

Properties

Property Type Description Default Req
system String Must refer to a SQL system component by id. The role of this component is provide services like connection pooling and authentication for the data sources using it   Yes
table String If table is given, it must refer to a fully qualified table name in the database system, not including schema, which if needed must be set separately. The table and query properties are mutually exclusive with table used if both are present. TODO: are table names case sensitive?   Yes
primary_key List<String> or String The value of this property can be a single string with the name of the column that contains the primary key (PK) of the table or query, or a list of strings if it is a compound primary key. If the property is not set and the table property is used, the data source component will attempt to use table metadata to deduce the PK to use. In other words, you will have to set this property if the query property us used.    
query List<String> or String Must be a valid query in the dialect of the RDBMS represented by the system property. You will also have to configure the primary key(s) of the query in the primary_key property. Note: mutually exclusive with the table property with table taking precedence. If a list of strings is given, they will be converted to a single string by concatenation with the newline character.   Yes
updated_column String If the underlying relation contains information about updates, the data source is able to support since markers. You can provide the name of the column to use for such queries here. This must be a valid column name in the table or query result sets and it must be of a data type that supports larger or equal (">=") tests for the table case.    
updated_query List<String> or String If the query property is set, the since support must be expressed by a full query including any test needed. A single variable binding :since must be included somewhere in the query string - for example "select * from view_name v where v.updates >= :since". If a list of strings is given, they will be converted to a single string by concatenation with the newline character.    
schema String If a specific schema within a database is needed, you must provide its name in this property. Do not use schema names in the table property.    
whitelist List<String> The names of the columns to include in the generated entities. If there is a blacklist also specified, the whitelist will be filtered against the contents of the blacklist.    
blacklist List<String> The names of the columns to exclude from the generated entities. If there is a whitelist also specified, the blacklist operates on the values of the whitelist (and not the whole columnset).    
preserve_null_values Boolean If set to true will include null values in the entities produces by this source. By default they are omitted. False  
fetch_size Integer The fetch size of the result sets (number of rows in a cursor fetch) to get from the database 1000  

Continuation support

See the section on continuation support for more information.

Property Value
supports_since false (Dynamic: true if updated_column set)
is_since_comparable true (Default)
is_chronological false (Default)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

Example with a single table:

{
    "source": {
        "type": "sql",
        "system": "Northwind",
        "table": "Customers"
    }
}

Example with a single table, where the primary key is in a column named table_id and the updated datestamp is in a column called updated. This enables us to switch on since support:

{
    "source": {
        "type": "sql",
        "system": "my_system",
        "table": "my_table",
        "primary_key": "table_id",
        "updated_column": "updated"
    }
}

Example with custom query:

{
    "source": {
        "type": "sql",
        "system": "Northwind",
        "query": "select * from Customers",
        "primary_key": "CustomerID"
    }
}

Example with a custom query from a table called my_table where the primary key is in a column named table_id and the updated datestamp is in a column called updated. This enables us to switch on since support:

{
    "source": {
        "type": "sql",
        "system": "my_system",
        "query": "select * from my_table",
        "primary_key": "table_id",
        "updated_column": "updated",
        "updated_query": "select * from my_table where updated >= :since"
    }
}

The conditional source

The conditional source selects an active source based on a key typically controlled by an environment variable. It is typically used in devops to be able to use the same configuration in different type of environments (i.e. development, staging, production). The actual source to use is resolved at runtime when the parent pipe is created.

The configuration options are:

Prototype

{
   "type": "conditional",
   "condition": "$ENV(current-environment)",
   "alternatives": {
       "dev": {
           "type": "embedded",
           ..
       },
       "test": {
           "type": "sql",
           ..
       },
       "prod": {
           "type": "sql",
           ..
       }
   }
}

Properties

Property Type Description Default Req
condition String The key to look up in alternatives for the actual source to use at runtime. Typically an environment variable. Note that all possible enumerations of this value need to exist in alternatives.   Yes
alternatives Object A dictionary of actual source configurations keyed by the enumerated value of condition.   Yes

The CSV source

The CSV data source translates the rows of files in CSV format to entities.

The configuration options are:

Prototype

{
   "type": "csv",
   "system": "a-valid-url-or-microservice-system-id",
   "url": "url-to-csv-file",
   "has_header": true,
   "field_names": ["mappings","from","columns","to","properties"],
   "auto_dialect": true,
   "dialect": "excel",
   "encoding": "utf-8",
   "decode_error_strategy": "strict-or-replace",
   "primary_key": ["list","of","column","names"],
   "whitelist": ["list","of","column","names","to","include"],
   "blacklist": ["list","of","column","names","to","exclude"],
   "preserve_empty_strings": false,
   "delimiter": ","
}

Properties

Property Type Description Default Req
url String The URL of the CVS file to load.   Yes
system String The ID of the URL system or microservice system component to use.   Yes
has_header Boolean Flag that indicates to the source that the first row in the CSV file contains the names of the columns. If this property is set to false, you will have to provide a list of column names in the field_names property. true  
field_names List If set, specifies the names of the columns. It takes precedence over the header in the CSV file if present.    
auto_dialect Boolean Flag that hints to the source that it should try to guess the dialect of the CSV file on its own. true  
dialect String Encodes what type of CSV file the file is. This is basically presets of the other properties. The recognised values are "excel", "excel_tab" and "unix_dialect". TODO: explain what they mean.    
encoding String The character set to used to encode the text in the CSV file "UTF-8"  
decode_error_strategy String A enumeration of "strict" and "replace" that tells the character decoder how to deal with illegal characters in the input data. The default is "strict" which raises an error and stops processing. The "replace" option will log a warning and attempt to replace the offending character(s) with the unicode special character for "replacement character", see https://en.wikipedia.org/wiki/Specials_%28Unicode_block%29 for more details. Use the "replace" option with extreme care as it can lead to data loss if you're not absolutely sure of what you are doing. The preferred option should always be to try the fix the data at the source. "strict"  
primary_key List<String> or String The name of the column(s) to use as _id in the generated entities. It can be either a list of strings (if the identity is a compound value) or a single column name (i.e. a string). The column name(s) are case sensitive and must match the contents of either field_names or the header of the CSV file.   Yes
whitelist List<String> The names of the columns to include in the generated entities. If there is a blacklist also specified, the whitelist will be filtered against the contents of the blacklist.    
blacklist List<String> The names of the columns to exclude from the generated entities. If there is a whitelist also specified, the blacklist operates on the values of the whitelist (and not the whole columnset).    
preserve_empty_strings Boolean If set to true will include column values that are empty strings. By default these are omitted. False  
delimiter String The character or string to use as the CSV field separator (delimiter) ","  

Continuation support

See the section on continuation support for more information.

Property Value
supports_since false (Default)
is_since_comparable true (Default)
is_chronological false (Default)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "source": {
        "type": "csv",
        "url": "http://blog.plsoucy.com/wp-content/uploads/2012/04/countries-20140629.csv",
        "primary_key": "Code",
        "encoding": "iso-8859-1"
    }
}

The RDF source

The RDF data source is able to read RDF data in NTriples, Turtle or RDF/XML format and turn this into entities.

See the Working with RDF document for more detail on working with RDF in Sesam.

It will transform triples on the form <subject-uri> <predicate-uri> "value" OR <object-uri> into entities on the form:

{
    "_id": "<subject-uri>",
    "<predicate-uri>": "value" OR "~robject-uri"
}

RDF Blank Nodes (aka BNodes) will be turned into child entities.

Prototype

{
   "type": "rdf",
   "system": "url--or-microservice-system-id",
   "url": "url-to-rdf-file",
   "format": "nt-ttl-or-xml"
}

Properties

Property Type Description Default Req
system String The ID of the URL system or microservice system component to use.   Yes
url String The URL of the RDF file to load - it can contain multiple subjects (with blank node hierarchies) and each unique non-blank subject will result in a single root entity.   Yes
format String The type of RDF file referenced by the url property. It is an enumeration that can take following recognized values: "nt" for NTriples, "ttl" for Turtle form or "xml" for RDF/XML files. "nt"  
is_sorted Boolean Indicates that the input data is sorted on RDF subject. If the is_sorted is set to true and the format property is nt (NTriples), the RDF source will attempt to parse the input data sequentially and emit a new entity when the RDF subject changes, without loading the entire RDF file into memory first. Note that the input data cannot contain RDF Blank Nodes (aka BNodes) in this case. The property has no effect on formats other than nt. false  

Continuation support

See the section on continuation support for more information.

Property Value
supports_since false (Default)
is_since_comparable true (Default)
is_chronological false (Default)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "source": {
        "type": "rdf",
        "url": "http://www.snee.com/rdf/elvisimp.rdf",
        "format": "xml",
    }
}

The SDShare source

The SDShare data source can read RDF from ATOM feeds after the SDShare specification. See the Working with RDF document for more information about working with RDF data in Sesam.

It has the following properties:

Prototype

{
   "type": "sdshare",
   "system": "url-or-microservice-system-id",
   "url": "url-to-sdshare-fragments-feed"
}

Properties

Property Type Description Default Req
system String The ID of the URL system or microservice system component to use.   Yes
url String The URL of the SDShare fragments feed to consume.   Yes

Continuation support

See the section on continuation support for more information.

Property Value
supports_since true (Default)
is_since_comparable true (Fixed)
is_chronological false (Default)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "source": {
        "type": "sdshare",
        "url": "https://open.sesam.io/sdshare/server/1/fragments/enhetsregisteret"
    }
}

The LDAP source (Experimental)

The LDAP source provides entities from a LDAP catalog configured by a LDAP system.

It supports the following properties:

Prototype

{
    "type": "ldap",
    "system": "ldap-system-id",
    "search_base": "*",
    "search_filter": "(objectClass=organizationalPerson)",
    "attributes": "*",
    "id_attribute": "cn",
    "page_size": 500,
    "attribute_blacklist": ["a","list","of","attributes","to","exclude"]
}

Properties

Property Type Description Default Req
system String ID of the LDAP system component to use   Yes
search_base String The base LDAP search expression to use when looking for records "*"  
search_filter String LDAP filter expression to apply to all records found by the search_base expression "(objectClass=organizationalPerson)"  
attributes String A wildcard expression specifying which attributes to include in the entity. "*"  
id_attribute String Sets which of the LDAP attributes to use for the _id property of a entity. "cn"  
page_size Integer The default number of records to read at a time from the LDAP service. 500  
attribute_blacklist List A list of attribute names (as strings) to exclude from the record when constructing entities. []  

Continuation support

See the section on continuation support for more information.

Property Value
supports_since false (Fixed)
is_since_comparable true (Fixed)
is_chronological false (Fixed)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "source": {
        "type": "ldap",
        "system": "bouvet_ldap",
        "search_base": "ou=Bouvet,dc=bouvet,dc=no"
    }
}

The JSON source

The JSON source can read entities from a JSON file available either locally or over HTTP (i.e. served by a web server).

If the supports_since property is set to true, then the since request parameter is added to the URL to signal that we want only changes that happened after the since marker.

Prototype

{
   "system": "url-system-id",
   "type": "json",
   "url": "url-to-json-file"
}

Properties

Property Type Description Default Req
system String The id of the URL system component to use.   Yes
url String The URL of the JSON file to load.   Yes

Continuation support

See the section on continuation support for more information.

Property Value
supports_since false (Default)
is_since_comparable true (Default)
is_chronological false (Default)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "source": {
        "type": "json",
        "url": "https://server.com/sesam/data/test.json",
    }
}

An example with a local file:

{
    "source": {
        "type": "json",
        "url": "/sesam/data/test.json",
    }
}

The empty source

Sometimes it is useful for debugging or development purposes to have a data source that doesn't produce any entities:

Prototype

{
    "type": "empty"
}

Continuation support

See the section on continuation support for more information.

Property Value
supports_since false (Fixed)
is_since_comparable true (Fixed)
is_chronological true (Fixed)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "source": {
        "type": "empty"
    }
}

The HTTP endpoint source

This is a special data source that registers an HTTP receiver endpoint that one can post entities to. Entities posted here will be written to the pipe's sink.

A pipe that references the HTTP endpoint source will not pump any entities, in practice this means that a pump is not configured for the pipe; the only way for entities to flow through the pipe is by posting them to the HTTP endpoint.

It exposes two URLs:

URL Description
http://localhost:9042/api/receivers/mypipe/entities JSON Push endpoint
http://localhost:9042/api/receivers/mypipe/sdshare-push-receiver SDShare Push receiver endpoint

JSON Push protocol

The JSON Push protocol is described in additional detail in the JSON Push Protocol document. The serialisation of entities as JSON is described in more detail here. Both individual entities and lists of entities can be posted. This endpoint is compatible with The JSON push sink.

The JSON Push endpoint supports HTTP POST of both a single JSON object and a list of JSON objects. The HTTP request's content-type header element must be set to application/json in this case.

SDShare Push protocol

The SDShare Push protocol is described here.

The SDShare Push endpoint supports receiving RDF in NTriples form. In this case the URL parameters have to include at least one resource parameter describing which resources the NTriples payload contains statements about. If you include a resource parameter that there are no statements about in the NTriples body, an empty entity is generated with its _deleted flag set to true. Note that the graph parameter of the protocol is ignored - the destination of the entities generated from the NTriples payload must be configured in the pipe's sink section. This type of HTTP request expects the content-type to be application/n-triples or text/plain. See the Working with RDF document for more detail on working with RDF in Sesam.

Prototype

{
    "type": "http_endpoint"
}

Continuation support

See the section on continuation support for more information.

Property Value
supports_since false (Fixed)
is_since_comparable true (Fixed)
is_chronological false (Fixed)

Example configuration

The pipe configuration given below will expose the my-entities receiver endpoint and write any data it receives into the my-entities dataset:

{
    "_id": "my-entities",
    "type": "pipe",
    "source": {
        "type": "http_endpoint"
    }
}

The SPARQL source

The SPARQL source fetches RDF data about subjects from a triplestore exposing a SPARQL compliant endpoint. The endpoint of the source is configured either directly or implicitly by a URL system. The source uses two SPARQL queries to construct entities; the fragment query is a SPARQL SELECT query that gets a list of subjects to get data for and their modification times and a fragment query, which is a SPARQL CONSTRUCT query that gathers all relevant statements about a particular subject. The latter is then used to generate the stream of entities.

See the Working with RDF document for more detail on working with RDF in Sesam.

Prototype

{
    "type": "sparql",
    "system": "url-system-id",
    "url": "sparql-endpoint",
    "fragments_query": "SPARQL select query",
    "fragment_query": "SPARQL construct query"
    "since_default": "0001-01-01T00:00:00Z"
}

Properties

Property Type Description Default Req
system String The id of the URL System component to use.   Yes
fragments_query List<String> or String A SPARQL SELECT query that should return exactly two bound variables: id which should contain a unique subject and updated which should contain its modification time in ISO UTC format (or "0001-01-01T00:00:00Z" if not available in the data). If you would like the source to have continuation support, then you must include a filter based on the updated content compared to the current since moniker. You must use a variable expansion ${since} for this purpose. The query result set should always be ordered by the "?updated" variable. If a list of strings is given, they will be converted to a single string by concatenation with the newline character.   Yes
fragment_query List<String> or String A SPARQL CONSTRUCT query that should return all the relevant statements for a particular subject selected by the fragments_query query. The query should use the expansion variable "${uri}" to filter or select the correct subject to construct the statements to return. If a list of strings is given, they will be converted to a single string by concatenation with the newline character.   Yes
since_default String A string literal to use when querying the triplestore the first time. "0001-01-01T00:00:00Z"  

Continuation support

See the section on continuation support for more information.

Property Value
supports_since false (Dynamic: true if ${since} in fragments_query)
is_since_comparable true (Default)
is_chronological false (Default)

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity.

{
    "source": {
        "type": "sparql",
        "url": "http://localhost:8890/sparql",
        "fragments_query": [
            "PREFIX sdshare: <http://www.sdshare.org/2012/extension/>",
            "SELECT DISTINCT ?id ?updated WHERE {",
             "    ?id sdshare:lastmodified ?updated",
             "} FILTER (?updated >= \"${since}\"^^xsd:dateTime) ORDER BY ?updated",
        ],
        "fragment_query": [
            "CONSTRUCT { ?subject ?property ?value } WHERE {",
            "  ?subject ?property ?value .",
            "} FILTER (?subject = <${uri}>)",
        ]
    },
}

Transforms

Transforms sit between the source and the sink. Entities passed from a source to a sink, can optionally be passed through a chain of transforms before they are passed on to the sink. This makes it possible to reshape the entities on their way to the sink. Transforms can also be used to filter entities and construct new entities.

Transforms can be configured on a pipe by specifying the "transform" property. The field is optional, and can contain either a transform configuration object or a list of them.

{
    "_id": "mypipe",
    "name": "Name of pipe",
    "type": "pipe",
    ...
    "source": {
       ...
    },
    ..
    "transform": {
       "name": "name of transform (NOTE: deprecated)",
       "description": "description of the transform (optional)"
        ...the rest of the transform configuration goes here...
    }
 }}

The DTL transform

This is a transform that lets you apply Data Transformation Language transformations on the entities stream produced by the data source.

Example configuration

Pipe configuration that reads entities from the Northwind:Customers dataset and transforms them using the Data Transformation Language before writing them to the customer-with-orders dataset.

{
    "_id": "customer-with-orders",
    "name": "Customers with orders",
    "type": "pipe",
    "source": {
       "type": "dataset",
       "dataset": "Northwind:Customers"
    },
    "transform": {
        "type": "dtl",
        "rules": {
            "default": [
                ["copy", "_id"],
                ["add", "name", "_S.ContactName"],
                ["add", "orders", ["apply", "order", ["hops", {
                    "datasets": ["Northwind:Orders o"],
                    "where": [
                        ["eq", "_S._id", "o.CustomerID"]
                    ]
                }]]]
            ],
            "order": [
                ["add", "order_id", "_S.OrderID"],
                ["add", "order_date", "_S.OrderDate"]
            ]
        }
    }
}

The JSON Schema validation transform

A transform that validates entities against a JSON Schema document. If the document is valid then the field referenced by key_valid will be set to true, otherwise false. Any validation error messages will be added to the field referenced by key_errors.

Properties

Property Type Description Default Req
schema Object The JSON schema to validate entities against.   Yes
key_valid String The field to store the validation result. This is a boolean value, which is true if the entity is valid, otherwise false. valid  
key_errors String The field to store the validation error messages. The error messages is a list of strings. The field is only added if the entity is invalid. errors  

Example configuration

{
    "_id": "men-validated",
    "type": "pipe",
    "source": {
        "type": "dataset",
        "dataset": "men"
    },
    "transform": {
        "type": "json_schema",
        "schema": {
            "type" : "object",
            "properties" : {
                "name" : {"type" : "string"},
                "born" : {"type" : "string"}
            },
            "required": ["name", "born"]
        }
    }
}

If the following entities where pushed through the pipe:

[
 {"_id": "3",
  "name": "Jim"},
 {"_id": "5",
  "name": "Bob",
  "born": "1972-03-12"}
]

then these would come out:

[
 {"_id": "3",
  "valid": false,
  "errors": [
    "'born' is a required property"
  ],
  "name": "Jim"},
 {"_id": "5",
  "valid": true,
  "name": "Bob",
  "born": "1972-03-12"}
]

The conditional transform

The conditional transform selects an active transform based on a key typically controlled by an environment variable. It is typically used in devops to be able to use the same configuration in different type of environments (i.e. development, staging, production). The actual transform to use is resolved at runtime when the parent pipe is created.

The configuration options are:

Prototype

{
   "type": "conditional",
   "condition": "$ENV(current-environment)",
   "alternatives": {
       "dev": {
           "type": "dtl",
           ..
       },
       "test": {
           "type": "dtl",
           ..
          },
       "prod": {
           "type": "dtl",
           ..
       }
   }
}

Properties

Property Type Description Default Req
condition String The key to look up in alternatives for the actual transform to use at runtime. Typically an environment variable. Note that all possible enumerations of this value need to exist in alternatives.   Yes
alternatives Object A dictionary of actual transform configurations keyed by the enumerated value of condition.   Yes

The HTTP transform

This transform performs HTTP POST requests to a HTTP capable endpoint. The service at the endpoint then transforms the entities contained in the request body and returns them in the HTTP response message .

The HTTP endpoint must accept application/json and the response must also be application/json.

The endpoint must support lists of entities only, i.e. it should expect to receive a JSON array and it should always return a JSON array. If the endpoint returns anything other than a "2xx Success" HTTP status code, the transform will raise an exception.

The endpoint is free to decide how the entities are transformed. It'll just have to produce a list of zero or more entities from the entities it was posted. This means that entities can be transformed, filtered out or new ones created.

Properties

Property Type Description Default Req
system String The id of the URL system or microservice system component to use.   Yes
url Object The URL to HTTP POST entities to.   Yes
batch_size Integer The maximum number of entities to POST in each request. If there are more entities than this then they'll be split across multiple HTTP requests. 100  

Example configuration

{
    "_id": "my-http-transform-service",
    "type": "system:url",
    "base_url": "http://localhost:8080/transforms/"
},
{
    "_id": "deduplicated-men",
    "type": "pipe",
    "source": {
        "type": "dataset",
        "dataset": "men"
    },
    "transform": {
        "type": "http",
        "system":"my-http-transform-service",
        "url": "http://localhost:8080/transforms/deduplicate",
        "batch_size": 5
    }

The lower keys transform

This transform transforms all the keys of an entity to lower case (optionally recursively).

Prototype

{
    "type": "lower_keys",
    "recurse": false
}

Properties

Property Type Description Default Req
recurse Boolean An optional flag to indicate whether to do the case conversion recursively or not (default is false, which means no recursion). false  

Example

With the default transform configuration:

{
    "type": "lower_keys",
}

And given the the input entity:

{
    "_id": "http://psi.test.com/2",
    "Born": "1980-01-23",
    "CODE": "AB32",
    "Status": {
        "http://psi.foo.com/married": true,
        "Spouse": "Pam",
        "URL1": "~rhttp://www.foo.com",
        "URL2": "~rhttp://psi.foo.com/url2",
        "CODE": 123,
        "Child": {
            "t_c": "C",
            "http://psi.test.com/hello": "http://psi.foo.com/world",
            "http://psi.tests.com/S": "bye"
        }
    }
}

The transform will output the following transformed entity:

{
    "_id": "http://psi.test.com/2",
    "born": "1980-01-23",
    "code": "AB32",
    "status": {
        "http://psi.foo.com/married": true,
        "Spouse": "Pam",
        "URL1": "~rhttp://www.foo.com",
        "URL2": "~rhttp://psi.foo.com/url2",
        "CODE": 123,
        "Child": {
            "t_c": "C",
            "http://psi.test.com/hello": "http://psi.foo.com/world",
            "http://psi.tests.com/S": "bye"
        }
    }
}

Note that only the root keys are transformed. If the recurse property is set to true in the configuration, however, the result would instead become:

{
    "_id": "http://psi.test.com/2",
    "born": "1980-01-23",
    "code": "AB32",
    "status": {
        "http://psi.foo.com/married": true,
        "spouse": "Pam",
        "url1": "~rhttp://www.foo.com",
        "url2": "~rhttp://psi.foo.com/url2",
        "code": 123,
        "child": {
            "t_c": "C",
            "http://psi.test.com/hello": "http://psi.foo.com/world",
            "http://psi.tests.com/s": "bye"
        }
    }
}

The upper keys transform

This transform transforms all the keys of an entity to upper case (optionally recursively). The transform mirrors the lower case transform exactly except for the keys being transformed to upper case. See previous section for details.

The undirected graph transform

The undirected graph transform transforms a list of properties representing nodes in a graph into all its possible sets of edges, forming a complete graph. The transform will generate all possible edges in the graph, which will be twice the number of entities as there are values in the aggregate of the list of properties given. See the example section for an example.

Prototype

{
    "type": "undirected_graph",
    "nodes": ["_id", "sameAs"],
    "from": "from-property",
    "to": "to-property"
}

Properties

Property Type Description Default Req
nodes List<String> A list of entity property names that should be used to pick the nodes of the graph. The properties must refer to a value that is either a string or a URI, or a list of strings or URIs. No other value types are allowed in the transform. ["_id", "sameAs"]  
from String The name of the property to use as "from" point in the generated entity for an edge in the graph. "from"  
to String The name of the property to use as the "to" point in the generated entity for an edge in the graph. "to"  

Example

Given the configuration:

{
    "transform": [
       {
         "type": "undirected_graph",
         "nodes": ["_id", "map"],
         "from": "from",
         "to": "to"
       }
    ]
}

And the input entity:

{
   "_id": "foo",
   "map": ["bar", "zoo"]
}

The transform will output the following edges of the graph as entities on its output stream:

{
    "_id": "foo.bar",
    "from": "foo",
    "to": "bar"
}

{
    "_id": "foo.zoo",
    "from": "foo",
    "to": "zoo"
}

{
    "_id": "bar.foo",
    "from": "bar",
    "to": "foo"
}

{
    "_id": "bar.zoo",
    "from": "bar",
    "to": "zoo"
}

{
    "_id": "zoo.foo",
    "from": "zoo",
    "to": "foo"
}

{
    "_id": "zoo.bar",
    "from": "zoo",
    "to": "bar"
}

The emit children transform

This transform will emit all child entities of its source entities. All entities in the $children property that have an _id property will be emitted. The parent entity will not be emitted.

Properties

There are currently no properties on this transform.

Example configuration

{
    "_id": "children",
    "type": "pipe",
    "source": {
        "type": "dataset",
        "dataset": "parents-with-children"
    },
    "transform": {
        "type": "emit_children"
    }

The XML transform

This transform will render entities on the form described in the XML endpoint sink to a string and embed it in the entity, which is then passed on to the transform chain.

Prototype

The properties are identical to the XML endpoint sink, except for the additional xml-property:

{
    "type": "xml",
    "root-attributes": {
       "xmlns": "http://www.example.org/ns1",
       "xmlsn:foo": "http://www.example.org/ns2",
       "xmlns:bar": "http://www.example.org/ns3"
    },
    "xml-property": "xml-property-to-use",
    "include-xml-decl": false,
    "skip-deleted-entities": true
}

Properties

Property Type Description Default Req
root-attributes Object An object containing the attributes to include on the root element. This is where you typically declare your namespaces, schema and so on.    
include-xml-decl Boolean If set to true includes a default XML header: <?xml version="1.0" encoding="UTF-8" standalone="yes"?> false  
xml-property String The property that will hold any XML generated   Yes
skip-deleted-entities Boolean This can be set to false to make deleted entities appear in the XML output. The default is that deleted entities does not appear. true  

Example configuration

This is how a XML transform would look like in the context of a pipe (source and sink configs omitted for brevity):

{
    "_id": "my-pipe",
    "transform": {
        "type": "xml",
         "root-attributes": {
            "xmlns": "http://www.example.org/ns1",
            "xmlns:foo": "http://www.example.org/ns2"
         },
         "xml-property": "xml"
    }
}

Given the input entity:

{
  "_id": "1",
  "name": "Entity 1",
  "id": "entity-1",
  "<foo:tag>": [{
      "id": "child",
      "name": "Child entity",
      "<section>": [
        {"<from>": "0"},
        {"<to>": "999"}
      ]
  }]
}

it will produce the transformed entity:

{
  "_id": "1",
  "name": "Entity 1",
  "id": "entity-1",
  "<foo:tag>": [{
      "id": "child",
      "name": "Child entity",
      "<section>": [
        {"<from>": "0"},
        {"<to>": "999"}
      ]
  }],
  "xml": "<foo:tag xmlns=\"http://www.example.org/ns1\" xmlns:foo=\" .. </foo:tag>"
}

Sinks

Sinks are at the receiving end of pipes and are responsible for writing entities into a internal dataset or a target system.

Sinks can support batching by implementing specific methods and accumulating entities in a buffer before writing the batch. The size of each batch can be specified using the batch_size property on the pipe. See the section on batching for more information.

The conditional sink

The conditional sink selects an active sink based on a key typically controlled by an environment variable. It is typically used in devops to be able to use the same configuration in different type of environments (i.e. development, staging, production). The actual sink to use is resolved at runtime when the parent pipe is created.

The configuration options are:

Prototype

{
   "type": "conditional",
   "condition": "$ENV(current-environment)",
   "alternatives": {
       "dev": {
           "type": "null",
           ..
       },
       "test": {
           "type": "sql",
           ..
       },
       "prod": {
           "type": "sql",
           ..
       }
   }
}

Properties

Property Type Description Default Req
condition String The key to look up in alternatives for the actual sink to use at runtime. Typically an environment variable. Note that all possible enumerations of this value need to exist in alternatives.   Yes
alternatives Object A dictionary of actual sink configurations keyed by the enumerated value of condition.   Yes

The dataset sink

The dataset sink writes the entities it is given to an identified dataset. The configuration looks like:

Prototype

{
    "type": "dataset",
    "dataset": "id-of-dataset"
}

Properties

Property Type Description Default Req
dataset String

The id of the dataset to write entities into. Note: if it doesn't exist before entities are written to the sink, it will be created on the fly.

Note

The dataset id cannot contain forward slash characters (/) nor can it reference a system: dataset.

  Yes
indexes String If set to "$ids" then an index on the $ids property will be automatically maintained. This index will then be used by the dataset browser to look up entities both by _id and $ids. null No
track_children Boolean

If true then the $children property will be compared against the previous version of the entity and a delta produced. This will cause the $children property to be updated on entities just before they are written to the dataset.

This is a special feature that can be used in combination with the ["create-child", ...] DTL function and the emit_children pipe transform. The purpose is to be able to detect deleted children entities when doing incremental syncs.

false  
enable_optimistic_locking Boolean

If true then the _updated property in each entity will be compared against the previous version of the entity. If the _updated property of at least one entity doesn't match, an error will raised and no entities will be written to the target dataset.

The purpose is to be guard against two agents trying to update the same entity at the same time; in some cases one doesn't want the last edit to "win" automatically. The typical usecase is a pipe with a http_endpoint source where the http endpoint can be accessed by several independant processes that use the sesam instance as a storage service. In this case the pipe should not have any transforms, since the http_endpoint will send the resulting entity back to the calling process; if the entity has been transformed by DTL or some other transform, the result might make little sense to the calling process.

false  

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "sink": {
        "type": "dataset",
        "dataset": "Northwind:Customer",
    }
}

The Sesam Databrowser sink

The databrowser sink writes the entities it is given to a Solr index to be displayed by the Sesam Databrowser application. The input entities are transformed to special Databrowser JSON documents before being sent off for indexing.

This sink supports batching.

The configuration looks like:

Prototype

{
    "type": "databrowser",
    "system": "solr-system-id",
    "batch_size": 100,
    "commit_within": null,
    "commit_at_end": true,
    "keep_existing_solr_ids": false,
    "maintain_id_links": false
}

Properties

Property Type Description Default Req
system String The id of the Solr system component to use.   Yes
batch_size Integer The maximum number of documents to post to solr in one http request 100  
commit_within Integer The number of seconds to wait until committing, i.e. invalidating the Solr caches. This is used to set up commit batching. The default is null (i.e. not set) which means that a commit will be issued at the end of the sync if commit_at_end is true. Do not set this too low as it will cause a lot of overhead on the Solr server. null  
commit_at_end Boolean If true, then the sink will issue a commit at the end of the sync. In general it is best to rely on commit_within instead or just let the Solr server itself decide the commit interval. true  
keep_existing_solr_ids Boolean This can be set to true in order to try to reuse the existing solr-id of an entity, even if the solr-ids of the entity no longer contains the solr-id that exists on the solr server. The cons of doing this is that it requires a http-request to solr for each and every entity, so it is very expensive. This option should therefore be set to false in cases where the id-problem is not likely to occur. true  
maintain_id_links Boolean This can be set to true in order to maintain links to documents in the Solr index. If the current document doesn't exist in the solr index (via its id property), but there is a match in the ids property of some other document(s), this setting will force the new document to use ab existing id from the index. This makes sure any links to an existing document in the Solr index is kept (for example bookmarked documents). true  

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "sink": {
        "type": "databrowser",
        "url": "http://localhost:8893/solr/my_index"
    }
}

The JSON push sink

The JSON push sink implements a simple HTTP based protocol where individual entities or lists of entities are POSTed as JSON data to an HTTP endpoint.

The protocol is described in additional detail in the JSON Push Protocol document. The serialisation of entities as JSON is described in more detail here.

This sink is compatible with The HTTP endpoint source.

This sink supports batching.

Prototype

{
    "type": "json",
    "system": "url-system-id",
    "url": "url-to-http-endpoint",
    "batch_size": 100
}

Properties

Property Type Description Default Req
system String The id of the URL system component to use.   Yes
url String The full URL to HTTP service implementing the JSON push protocol described.   Yes
batch_size Integer The maximum number of entities to POST in each request. If there are more entities than this then they'll be split across multiple HTTP requests. 100  

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "sink": {
        "type": "json",
        "url": "http://localhost:9042/api/receivers/foo/entities"
    }
}

The SDShare push sink

The SDShare push sink is similar to the JSON push sink, but instead of posting JSON it translates the inbound entities to RDF and POSTs them in NTriples form to a HTTP endpoint implementing the SDShare push protocol.

Prototype

{
    "type": "sdshare",
    "system":"url-system-id",
    "url": "url-to-http-endpoint",
    "graph": "uri-of-graph-to-post-to"
}

Properties

Property Type Description Default Req
system String The id of the URL system component to use.   Yes
url String The full URL to HTTP service implementing the SDShare push protocol.   Yes
graph String A URI representing a graph to post the RDF ntriples to   Yes

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "sink": {
        "type": "sdshare",
        "url": "http://localhost:8001/sdshare_push_service"
    }
}

The SMS message sink (Experimental)

The SMS message sink is capable of sending SMS messages based on the entities it receives. The message to send can be constructed either by inline templates or from templates read from disk. These templates are assumed to be Jinja templates (http://jinja.pocoo.org/) with the entities properties available to the templating context. The template file name can either be inlined in the configuration or embedded in the input entity. The SMS service to use must be configured separately as a system and its _id property given in the system property. Currently, only the Twilio provider is supported.

Prototype

{
    "type": "sms",
    "system": "sms-system-id",
    "body_template": "static jinja template as a string",
    "body_template_property": "id-of-property-for-body-template",
    "recipients": "static,comma,separated,list,of,international,phonenumbers",
    "recipients_property": "id-of-property-to-get-recipients-from",
    "from_number": "static-international-phone-number-to-use-as-from-number",
}

Properties

The configuration must contain at most one of body_template or body_template_property:

Property Type Description Default Req
system String The id of the Twilio provider component to use.   Yes
body_template String Should contain a Jinja template to use for constructing messages. The template will have access to all entity properties by name.   Yes
body_template_property String Should contain a id of a property of the incoming entity to use for looking up the Jinja template (i.e for inlining the templates in the entities). It should not be used at the same time as body_template or body_template_file* "body_template"  
recipients String Should contain a comma-separated list of internationalised phone-numbers to send the message constructed to. If this is not inlined in the entities via recipients_property (see below) the property is required.   Yes
recipients_property String Should contain the id of the property to look up the recipients from the entity itself (i.e for inlining the recpients). If recipients (see abowe) is not specified, this property is mandatory and the propery referenced by it must exists and be valid for all entities. "recipients" Yes
from_number String An international phone number to use as the sender of all messages   Yes

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity. The examples assume a system component (i.e. a Twilio service) has been configured earlier:

{
    "sink": {
        "type": "sms",
        "system": "twilio_service",
        "body_template": "SMS message: {{ message_prop_id }}",
        "recipients": "+4799887766,+4788776655",
        "from_number": "+4766554433"
    }
}

In the above example the entities sent to the sink should have at least a single property message_prop_id, i.e.:

{
    "_id": "message_id",
    "message_prop_id": "This is the message to send",
    "some_other_property": "Some other value"
}

An example where the template to use is included in the entity written to the sink:

{
    "sink": {
        "type": "sms",
        "system": "twilio_service",
        "body_template_property": "body_template_property_id",
        "recipients": "+4799887766,+4788776655",
        "from_number": "+4766554433"
    }
}

For the example above the entities sent to the sink should have at least a single property body_template_property_id and it also needs to have the properties references in the embedded template:

{
    "_id": "message_id",
    "body_template_property_id": "SMS message: {{ message_prop_id }}",
    "message_prop_id": "This is the message to send",
    "some_other_property": "Some other value"
}

You can also store the Jinja templates on disk and reference them in the same way via filenames instead of embedding the templates in config or the entities themselves.

The Solr sink

The Solr sink writes the entities it is given to a Solr index.

The _id property is used as the document id. All other properties, except the ones at the root level matching _* or $* are added to the document. Notice the limitations described in the next section.

This sink supports batching.

Limitations

Due to the limited document structure allowed by Solr, there are some restrictions on the form of the entities accepted by the sink:

  • Only "flat" entities are allowed - any child entities must be removed or merged into the root entity before being sent to the sink.
  • Lists properties are supported, but they can only contain a single type of property.
  • Lists cannot contain other lists or entities.

If the document does not adhere to these rules, then an error is raised.

Prototype

{
    "type": "solr",
    "system": "solr-system-id",
    "commit_within": null,
    "commit_at_end": true
}

Properties

Property Type Description Default Req
system String The id of the Solr system component to use.   Yes
commit_within Integer The number of seconds to wait until committing, i.e. invalidating the Solr caches. This is used to set up commit batching. The default is null (i.e. not set) which means that a commit will be issued at the end of the sync if commit_at_end is true. Do not set this too low as it will cause a lot of overhead on the Solr server. null  
commit_at_end Boolean If true, then the sink will issue a commit at the end of the sync. In general it is best to rely on commit_within instead or just let the Solr server itself decide the commit interval. true  

The Elasticsearch sink

The Elasticsearch sink writes the entities it is given to an Elasticsearch server/cluster.

The _id property is used as the document id. All other properties, except the ones at the root level matching _* or $* are added to the document.

If the input entity has the property $index then this is the index into which the document is written. The $type property is used as the document type. Note that default values for $index and $type can be specified on the Elasticsearch system.

This sink supports batching.

Prototype

{
    "type": "elasticsearch",
    "system": "elasticsearch-system-id",
    "default_index": null,
    "default_type": null
}

Properties

Property Type Description Default Req
system String The id of the Elasticsearch system component to use.   Yes
default_index String The index to insert the documents into. This the default value for the $index property on the indexable entities. Note that this is overridable on each entity. null  
default_type String The document type to use for the entities. This the default value for the $type property on the indexable entities. Note that this is overridable on each entity. null  

The SPARQL sink

The SPARQL sink converts entities to RDF statements and writes them to a graph in a triplestore via a SPARQL compatible endpoint.

Prototype

{
    "type": "sparql",
    "system": "id-of-url-system"
    "graph": "http://uri.of/graph",
    "do_diff": false,
    "write_sdshare_updated": true
}

Properties

Property Type Description Default Req
url String The URL of the SPARQL endpoint to use.   Yes
system String The id of a URL system component to use.   Yes
graph String A full URI for the graph to write the entities into.   Yes
do_diff Boolean Tell the sink to compute the difference between the target graph RDF statements and the RDF statements generated by converting the input entity to RDF. This ensures the minimum number of write operations to the endpoint. This does however come with the cost of (many) more read operations. Use this option if your entities are large and/or there is large amounts of changes flowing through the sink on average.   false
write_sdshare_updated Boolean Tell the sink to automatically insert SDShare updated predicates with the generated RDF statements written to the endpoint. Note that the local UTC time is currently used for this timestamp.   true

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "sink": {
        "type": "sparql",
        "url": "http://virtuoso.example.com:8890/sparql",
        "graph": "http://example.com/fylketest",
        "do_diff": true,
        "write_sdshare_updated": true
}

The SQL sink

The SQL sink writes entities to a relational database table. You will have to configure and provide a SQL system id in the system property.

The expected form of an entity to be written to the sink is:

{
    "columnname1": value,
    "columnname2": another_value,
}

This sink supports batching.

Prototype

{
    "type": "sql",
    "system": "id-of-sql-system"
    "primary_key": ["list","of","key","names"],
    "table": "name-of-table",
    "schema": "default-schema-name-if-included",
    "whitelist": ["properties/columns","to","include"],
    "blacklist": ["properties/columns","to","exclude"],
    "batch_size": 100,
    "use_bulk_operations": false,
    "schema_definition": [],
    "create_table_if_missing": false,
    "timestamp": "name-of-collumn-to-add-timestamp-into",
    "truncate_table_on_first_run": false
}

Properties

Property Type Description Default Req
system String The id of the SQL system component to use.   Yes
table String Refers to a fully qualified table name in the database system, not including schema, which if needed must be set separately.   Yes
primary_key List<String> or String The value of this property can be a single string with the name of the column that contains the primary key (PK) of the table, or a list of strings if it is a compound primary key. If the property is not set the component will attempt to use table metadata reflection to deduce the PK to use.    
schema String If a specific schema within a database is needed, you must provide its name in this property. Do not use schema names in the table property.    
timestamp String Defines a name of a property (column) that is added to each entity, containg a timestamp in UTC. If the corresponding column exist in the target table, the value will be written to that column. sesam-timestamp  
use_bulk_operations Boolean Some database types supports bulk upload of data. Bulk uploading is typically much faster than doing updates with INSERT and UPDATE SQL statements, but may not be feasable in all cases (some bulk operations requires Sesam to have extra permissions in the database, for instance). Only some sql systems supports bulk operations, see the documentation of the SQL systems for details. false for now; will be changed to true at some future date.  
batch_size Integer The maximum number of rows to insert into the database table in one operation 1000 or use_bulk_operations is true, 100 otherwise.  
truncate_table_on_first_run Boolean A flag that indicates that the target table should be truncated/emptied the first time a pump runs (for example on the first run, or when its offset has been set to zero manually). Please note that the truncating operation is executed in a separate transaction, so if any subsequent inserts should fail the truncating operation will not be rolled back. Note that if combined with create_table_if_missing this property will make the sink drop and then recreate the table on the first run. false  
create_table_if_missing Boolean A flag that indicates that the target table should be created if it is missing the first time a pump runs (for example on the first run, or when its offset has been set to zero manually). If this property is true then a proper schema definition must be supplied in the schema_definition property. Note that this property requires that the user defined in the SQL system have the necessary privileges to create and drop tables in the target database/schema. false  
schema_definition List<Object> A list of column definitions that guides the sink when creating a new table when the create_table_if_missing is set to true. See SQL sink schema definition format for more details on how this element works.    
whitelist List<String> The names of the properties (columns) to include when inserting rows into the target tablke. If there is a blacklist also specified, the whitelist will be filtered against the contents of the blacklist.    
blacklist List<String> The names of the properties (columns) to exclude from inserts into the target table.    

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "sink": {
        "type": "sql",
        "system": "my-sql-system",
        "table": "customers"
    }
}

SQL sink schema definition format

The schema definition format consists of a list of objects for each property that exists in the input entities. These objects are in essence column definitions and correspond directly to columns in the target table. The initial schema definition can be generated from analysing the entities produced by the pipe the sink belongs to by using the API or in the management studio. After being generated it can then be manually edited or augmented.

If the entities in the source dataset changes shape, or you change the shape of the entities produced by the pipe by adding (or editing existing) DTL transforms attached to it, you may need to regenerate (or manually update) the schema definition accordingly.

If the schema definition does not match the shape or value ranges of the entities that the sink is attempting to insert (or update) in the resulting table, the sink will generate run time errors in the pump execution log.

Each object is on the form:

{
    "name": "name_of_column",
    "type": "string|integer|decimal|float|bytes|datetime|date|time|uuid|boolean",
    "max_size|max_value": 1234,
    "min_size|min_value": 1234,
    "precision": 10,
    "scale": 2,
    "allow_null": true|false,
    "primary_key": true|false,
    "index": true|false
}

The name property must correspond to a property in the entities fed to the sink. In the case of the primary_key set to true and/or allow_null set to false, the property must exist in all entities. Note that at least one column definition in the schema definition list must have primary_key set to true.

The type property is automatically mapped to the appropriate target RDBMS column type, based on the information in the max_size/max_value and min_size/min_value properties. For example, an integer type may translate to a bigint if the value range is outside +-2^31 (i.e larger than 32 bits) or a tinyint if it fits within a unsigned byte range (0..255). The translation table for the currently supported systems is listed below.

If the index property for a column definition is set to true, the table creation will add a default type of index to the column for the particular target RDBMS system.

The precision and scale properties are valid only for decimal type columns and define the total number of digits and the fractional digits respectively. I.e. the decimal number "10.3" would have a precision of "3" and an scale of "1".

Translation table for the Microsoft SQL server and Microsoft Azure SQL Data Warehouse server:

Type Range/size Column type Comment
string <= 8000 nvarchar(size) Unicode
string > 8000 nvarchar(MAX) Unicode
bytes <= 8000 varbinary(size)  
bytes > 8000 varbinary(MAX)  
integer -9223372036854775808 - 9223372036854775808 integer 64 bit/8 bytes
integer -2147483648 - 2147483647 int 32 bit/4 bytes
integer -32768 - 32768 smallint 16 bit/1 word/2 bytes
integer 0 - 255 tinyint 8 bit/1 byte
decimal Any decimal(precision,scale)  
float 53 bit precision float Double precision IEEE-754 number
datetime 0001-01-01T00:00:00.000000Z - 9999-12-31T23:59:59.999999Z datetime2  
date 0001-01-01 - 9999-12-31 date Coerced from datetime values or pre-formatted strings
time 00:00:00.000000 - 23:59:59.999999 time Coerced from datetime values or pre-formatted strings
boolean true | false bit Coerced to 0 or 1
uuid Any valid UUID uniqueidentifier Preformatted strings on the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx can also be used

The Email Message sink (Experimental)

The mail message sink is capable of sending mail messages based on the entities it receives. The message to send can be constructed either by inline templates or from templates read from disk. These templates are assumed to be Jinja templates (http://jinja.pocoo.org/) with the entities properties available to the templating context. The template file name can either be embedded in the configuration or in the input entity. The mail server settings have to be registered in a SMTP system component in advance and its _id put in the system property of the sink.

Prototype

{
    "type": "mail",
    "system": "smtp-system-id",
    "body_template": "static jinja template as a string",
    "body_template_property": "id-of-property-to-get-as-a-body-template",
    "text_body_template": "static text only jinja template as a string",
    "text_body_template_property": "id-of-property-to-get-as-a-text-body-template",
    "subject_template": "static jinja template as a string",
    "subject_template_property": "id-of-property-to-get-as-a-subject-template",
    "recipients": "static,comma,separated,list,of,email,addresses",
    "recipients_property": "id-of-property-to-get-recipients-from",
    "mail_from": "static@email.address"
}

Properties

The configuration must contain at most one of body_template or body_template_property. The same applies to subject_template and subject_template_property.

Property Type Description Default Req
system String The id of the SMTP system to use.   Yes
body_template String Should contain a Jinja template to use for constructing messages. The template will have access to all entity properties by name.   Yes
body_template_property String Should contain a id of a property of the incoming entity to use for looking up the Jinja template (i.e for inlining the templates in the entities). It should not be used at the same time as body_template. "body_template"  
subject_template String Should contain a Jinja template to use for constructing subjects for the email messages. The template will have access to all entity properties by name   Yes
subject_template_property String Should contain a id of a property of the incoming entity to use for looking up the Jinja template (i.e for inlining the templates in the entities). It should not be used at the same time as subject_template. "subject_template"  
text_body_template String Should contain a Jinja template to use for constructing plain-text messages. The template will have access to all entity properties by name.    
text_body_template_property String Should contain a id of a property of the incoming entity to use for looking up the Jinja template (i.e for inlining the templates in the entities) used to construct plain text messages. It should not be used at the same time as text_body_template "text_body_template"  
recipients String Should contain a comma-separated list of email addresses to send the message constructed to. If this is not inlined in the entities via recipients_property (see below) this property is mandatory.   Yes
recipients_property String Should contain the id of the property to look up the recpients from the entity itself (i.e for inlining the recpients). If recipients (see abowe) is not specified, this property is mandatory and the propery referenced by it must exists and be valid for all entities. "recipients"  
mail_from String An email address to use as the sender of all messages   Yes
unhandled_template_variable_replacement String Specifies how unhandled variables in the templates are handled. debug: the '{{variable_name}}'-string is kept. empty_string: {{variable_name}} is replaced with an empty string. strict: an error is raised. The default is 'debug'. "debug"  

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "sink": {
        "type": "mail",
        "system": "our-smtp-server",
        "body_template": "Mail message body: {{ message_prop_id }}",
        "subject_template": "Subject: {{ subject_prop_id }}",
        "recipients": "foo@bar.com,info@example.com",
        "mail_from": "all@of.us"
    }
}

In the above example the entities sent to the sink should have at least the properties subject_prop_id and message_prop_id, i.e.:

{
    "_id": "message_id",
    "message_prop_id": "This is the message to send",
    "subject_prop_id": "This is the subject of the message to send",
    "some_other_property": "Some other value"
}

A note on multi-part messages

To send multi-part email messages containing both a HTML and a plain-text version, you should provide templates for both body_template (or body_template_property) and text_body_template (or text_body_template_property). The former should then contain your HTML message and the latter your plain-text version. If you omit the text_* properties and the body template contains HTML, the sink will attempt to extract a text-only version of the HTML on a best-effort basis; i.e. this might not preserve the information contained in the HTML in the desired way.

The null sink

The null sink is the equivalent of the empty data source; it will discard any entities written to it and do nothing (it never raises an error):

Prototype

{
    "type": "null"
}

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

{
    "sink": {
        "type": "null"
    }
}

The HTTP endpoint sink

This is a special data sink that registers an HTTP publisher endpoint that one can get entities from.

A pipe that references the HTTP endpoint sink will not pump any entities, in practice this means that a pump is not configured for the pipe; the only way for entities to flow through the pipe is by retrieving them from the HTTP endpoint.

It exposes three URLs:

URL Description
http://localhost:9042/api/publishers/mypipe/entities JSON entities endpoint
http://localhost:9042/api/publishers/mypipe/sdshare-collection SDShare collections feed
http://localhost:9042/api/publishers/mypipe/sdshare-fragments SDShare fragments feed

The serialisation of entities as JSON is described in more detail here. This endpoint is compatible with The JSON source.

The SDShare protocol is described here.

The exposed URLs may support additional parameters such as since and limit - see the API reference for the full details.

Prototype

{
    "type": "http_endpoint"
}

Properties

Property Type Description Default Req
filename String This property provides a hint to HTTP clients on what filename to use when downloading data (via the Content-Disposition header property). Note that this property is not entirely standardized yet, so to be compatible with most HTTP clients, the filename should be ASCII characters only. For the same reason, quotes or backward or forward slashes should be avoided. If this property is not set, the contents will be served inline.    

Example configuration

The pipe configuration given below will expose the my-entities publisher endpoint and read the entities from the my-entities dataset:

{
    "_id": "my-entities",
    "name": "My published entities endpoint",
    "type": "pipe",
    "sink": {
        "type": "http_endpoint"
    }
}

The CSV endpoint sink

This is a data sink that registers an HTTP publisher endpoint that one can get entities in CSV format from.

A pipe that references the CSV endpoint sink will not pump any entities. In practice this means that a pump is not configured for the pipe; the only way for entities to flow through the pipe is by retrieving them from the CSV endpoint using a client that supports the HTTP protocol.

It exposes the URL:

URL
http://localhost:9042/api/publishers/mypipe/csv

The exposed URL may support additional parameters such as since and limit - see the API reference for the full details.

Prototype

{
    "type": "csv_endpoint",
    "columns": ["properties","to","use","as","columns"],
    "quoting": "all|minimal|non-numeric|"none",
    "delimiter": ","
    "doublequote": true
    "include_header": true,
    "escapechar": null,
    "lineterminator": "\r\n",
    "quotechar": "\"",
    "encoding": "utf-8",
    "skip-deleted-entities": true,
    "filename": "my_data.csv"
}

Properties

Property Type Description Default Req
columns List<String> A list of string keys to look up in the entity to construct the CSV columns. If include_header is set to true (which is the default), this list will also be included as the first line of the CSV file.   Yes
quoting Enum<String> A string from the set of "all", "minimal", "non-numeric" and "none" that describes how the fields of the CSV file will be quoted. A value of "all" means all fields will be quoted, even if they don't contain the quotechar or delimiter characters. A value of "non-numeric" means all non-numeric values will be quoted. The "minimal" setting (the default) means only fields with contents that need to be quoted will be quoted. Finally, the none value means do not quote (note this can produce broken CSV files if there are values that have to be quoted). "minimal"  
delimiter String The character to use as field separator. It will also affect which fields will be quoted if the quoting setting is set to minimal" (which is the default). The default value is to use the comma (",") character. ","  
doublequote Boolean Controls how instances of quotechar appearing inside a field should themselves be quoted. When set to true (the default), the character is doubled (repeated). When set to false, the escapechar property setting is used as a prefix to the quotechar. If doublequoting is set to true` but ``escapechar is not set, the backward slash character (\) is used as prefix. true  
include_header Boolean Controls if the columns property should be included as the header of the CSV file produced. true  
escapechar String A one-character string used by the sink to escape delimiter characters in fields if quoting is set to none and the quotechar if doublequote is set to false. The default is null which disables escaping (except if doublequote is set to true, in which case the default is \). null  
lineterminator String A character sequence to use as the EOL marker in the CSV output. The default is carriage return plus linefeed ("\r\n"). "rn"  
quotechar String A one-character string that controls how to quote field values. The default is the double quote character. See doublequote and escapechar for related settings. """  
encoding String Which encoding to use when converting the output to string values. The default is utf-8. See section 7.2.3 on this page for a list of valid values. "utf-8"
skip-deleted-entities Boolean This can be set to false to make deleted entities appear in the CSV output. The default is that deleted entities does not appear. If you set this to true you will also most likely want to include the "_deleted" attribute in the columns list, so that rows that represents deleted entities can be recognized. (If you need to rename or reformat the "_deleted" attribute you can do that by adding a DTL transform to the pipe.) true  
filename String This property provides a hint to HTTP clients on what filename to use when downloading data (via the Content-Disposition header property). Note that this property is not entirely standardized yet, so to be compatible with most HTTP clients, the filename should be ASCII characters only. For the same reason, quotes or backward or forward slashes should be avoided. If this property is not set, the contents will be served inline.    

Example configuration

The pipe configuration given below will expose the my-entities publisher endpoint and read the entities from the my-entities dataset, picking the _id, foo and bar properties as columns in the CSV file:

{
    "_id": "my-entities",
    "name": "My published csv endpoint",
    "type": "pipe",
    "sink": {
        "type": "csv_endpoint"
        "columns": ["_id", "foo", "bar", "zoo"],
        "filename": "my_data.csv"
    }
}

The data will be available at http://localhost:9042/api/publishers/my-entities/csv

The XML endpoint sink

This is a data sink that registers an HTTP publisher endpoint that one can get the entities in XML format from.

A pipe that references the XML endpoint sink will not pump any entities; the only way for entities to flow through the pipe is by retrieving them from the endpoint using the HTTP protocol.

It exposes the URL:

URL
http://localhost:9042/api/publishers/mypipe/xml

Note that the shape of the entities must conform to certain criteria, see the notes later in the section.

The exposed URL may support additional parameters such as since and limit - see the API reference for the full details.

Prototype

{
    "type": "xml_endpoint",
    "wrapper": "wrapper-tag",
    "root-attributes": {
       "xmlns": "http://www.example.org/ns1",
       "xmlsn:foo": "http://www.example.org/ns2",
       "xmlns:bar": "http://www.example.org/ns3"
    },
    "include-xml-decl": false,
    "skip-deleted-entities": true,
    "filename": "my_data.xml"
}

Properties

Property Type Description Default Req
wrapper String If included, the XML produced from all entities will wrapped in a single top level tag with the value of this property (<wrapper-value>..entity-tags..</wrapper-value>)    
root-attributes Object An object containing the attributes to include on the root element (i.e. on the wrapper tag if it is defined, or on the tag defined on the first entity level). This is where you typically declare your namespaces, schema and so on.    
include-xml-decl Boolean If set to true includes a default XML header: <?xml version="1.0" encoding="UTF-8" standalone="yes"?> false  
skip-deleted-entities Boolean This can be set to false to make deleted entities appear in the XML output. The default is that deleted entities does not appear. true  
filename String This property provides a hint to HTTP clients on what filename to use when downloading data (via the Content-Disposition header property). Note that this property is not entirely standardized yet, so to be compatible with most HTTP clients, the filename should be ASCII characters only. For the same reason, quotes or backward or forward slashes should be avoided. If this property is not set, the contents will be served inline.    

Expected entity shape

The entities must be transformed into a particular form before being piped to the XML endpoint sink. The general form expected is:

{
  "_id": "1",
  "name": "Entity 1",
  "id": "entity-1",
  "<foo:tag>": [{
      "id": "child",
      "name": "Child entity",
      "<section>": [
        {"<from>": "0"},
        {"<to>": "999"}
      ]
  }]
}

There must be exactly one property starting with '<' and ending with '>' in an entity, although it can contain child entities in as many levels as you want (also in lists).

All non-tag properties, except those beginning with a _ letter will be included as attribute values on the tag specified. A "tag"-property value can either be a single literal, a single object or a list of objects. Note that any non-object items in lists are ignored (i.e. lists, literals and null values).

The property names must be valid XML attribute or tag names (QNames). All literal values in tags or attributes will be XML escaped.

Example configuration

The pipe configuration given below will expose the my-entities publisher endpoint and read the entities from the my-entities dataset:

{
   "sink": {
       "type": "xml_endpoint",
       "wrapper": "baz",
       "root-attributes": {
          "xmlns": "http://www.example.org/ns1",
          "xmlsn:foo": "http://www.example.org/foo",
          "xmlns:xsi": "http://www.w3.org/2000/10/XMLSchema-instance",
          "xsi:schemaLocation": "http://example.com/myschema.dtd",
          "zoo": "bar"
       },
       "filename": "example.xml"
   }
}

The following output will be produced (here reformatted/pretty-printed):

<baz xmlns="http://www.example.org/ns1"
     xmlns:foo="http://www.example.org/foo"
     xmlns:xsi="http://www.w3.org/2000/10/XMLSchema-instance"
     xsi:schemaLocation="http://example.com/myschema.dtd"
     zoo="bar">
  <foo:tag name="Entity 1"
           id="entity-1">
     <section id="child"
              name="Child entity">
        <from>0</from>
        <to>999</to>
     </section>
  </foo:tag>
</baz>

The XML document will be available at http://localhost:9042/api/publishers/my-entities/xml

The REST sink (Experimental)

This is a data sink that can communicate with a REST service using HTTP requests.

Note that the shape of the entities piped to this sink must conform to certain criteria, see the notes later in the section.

Prototype

{
    "type": "rest",
    "system" : "rest-system",
}

Properties

Property Type Description Default Req
system String The id of the REST system to use.   Yes

Expected entity shape

The entities must be transformed into a particular form before being piped to the RESTsink. The general form expected is:

{
  "_id": "1",
  "properties": {
      "foo": "bar",
      "zoo": 1,
      "baz": [1,2,3]
  },
  "operation": "some-named-operation",
  "payload": "<some>string-value</some>"
}
Property Type Description Default Req
properties Object Any non-payload properties you need should go into the toplevel child entity properties. You can then address these properties in the Jinja templates for operation url properties using the "{{ properties.key_name }}" syntax.    
operation String The contents of this property must refer to one of the named operations registered with the sink's REST system.   Yes
payload String or Object The payload for the operation specified. It can be a string or an object. You can also omit it, in which case the empty string will be used instead (for example for "DELETE" methods). All string payloads will be encoded as UTF-8.    

Example entities:

String as payload:

{
  "_id": "1",
  "properties": {
      "foo": "bar",
      "zoo": 1,
      "baz": [1,2,3]
  },
  "operation": "some-named-operation",
  "payload": "<some>string-value</some>"
}

Object as payload (set operation payload-type to "json", "json-transit" or "form" in the REST system the sink uses):

{
  "_id": "2",
  "properties": {
      "foo": "bar",
      "zoo": 1,
      "baz": [1,2,3]
  },
  "operation": "some-other-operation",
  "payload": {
      "payload": "property",
      "child": {
        "foo": "bar"
      }
  }
}

Multi-part form request if payload-type is "form", otherwise use "json" or "json-transit" for this type of entity:

{
  "_id": "3",
  "operation": "some-third-operation",
  "payload": [
    {
      "foo": "bar"
    },
    {
      "zoo": "foo"
    }
  ]
}

Example configuration

See the REST system example section for how to configure the operations we refer to in these exapmles:

{
    "type" : "pipe",
    "sink" : {
        "type" : "rest",
        "system" : "our-rest-service",
    }
}

Example input entities:

[
    {
        "_id": "john",
        "operation": "update-man",
        "properties": {
            "id": "john",
            "age": 21,
            "sex": "M",
            "collection_name": "study-group-1"
        },
        "payload": "<man id=\"john\">john</man>"
    },
    {
        "_id": "mary",
        "operation": "update-woman",
        "properties": {
            "id": "mary",
            "age": 23,
            "sex": "F",
            "collection_name": "study-group-2"
        },
        "payload": {
          "id": "mary",
          "age": 23
        }
    },
    {
        "_id": "bob",
        "operation": "delete-man",
        "properties": {
            "collection_name": "study-group-1"
        }
    }
]

Systems

A system component represents a computer system that can provide data entities. Its task is to provide common properties and services that can be used by several data sources, such as connection pooling, authentication settings, communication protocol settings and so on.

You can manage any secret property values you do not want to be exposed in the API (or in log files) by using the Secrets manager API.

Note: as with pipe components, you are not allowed to use the forward slash character ("/") in system id's.

All systems share a number of common properties:

Prototype

{
    "_id": "a_system_id",
    "type": "system:some-type-of-system",
    "name": "The Foo System",
    "worker_threads": 10,
    "metadata": {
       "some_key": "some_value"
    }
}

Properties

Property Type Description Default Req
_id String A unique ID identifying the system. Any pipe sink or source that uses the system must have a corresponding system property matching this value.   Yes
name String A human readable name for this system    
metadata Object<string, Object> A object providing metadata for the system. The keys are strings while the values can be any valid JSON object (literals, lists or other objects).    
worker_threads Integer The maximum number of concurrent pipes running using this system 10  

The SQL systems

The SQL system components represents a RDBMS and contains the necessary information to establish a connection to the RDBMS and manage these connections among the sources that read from it. It can also provide source configurations for reading from all tables it can introspect from the RDBMS schema.

The common properties for all SQL systems are:

Prototype

{
    "_id": "sql_system_id",
    "type": "system:oracle|oracle_tns|mssql|mysql",
    "name": "The Foo Database",
    "db-type-specific-property":"some-value",
    "timezone": "UTC",
    "pool_size": 10,
    "pool_timeout": 30,
    "pool_max_overflow": 10,
    "pool_recycle": 1800
}

Column type mapping

See the supported column types section for a overview of which column types are supported for each RDBMS system and how they are mapped to Sesam types.

Properties

Property Type Description Default Req
timezone String The local timezone for the database server. It is used for any date(time) objects returned that doesn't have any timezone information. The default is the UTC timezone. All the official timezone names are supported, i.e. "UTC", "GMT", "EST" etc. You can also use the indirect "Continent/City" format, for example "Europe/Oslo" (see the complete list for which cities are supported). "UTC"  
pool_size Integer The target maximum number of concurrent connections to the database 10  
pool_timeout Integer The number of seconds to wait before giving up on getting a connection from the connection pool. 30  
pool_max_overflow Integer How many connections over the pool_size are allowed before refusing to establish a incoming connection. This means that the absolute hard limit of connections in a connection pool is pool_size + pool_max_overflow. 10  
pool_recycle Integer This configuration option prevents the pool from using a particular connection that has passed a certain age, and is appropriate for database backends such as MySQL that automatically close connections that have been stale after a particular period of time. Note that this doesn't affect any open/active connections. 1800  

The specific SQL systems available are:

The Oracle system

The Oracle SQL system represents a Oracle RDBMS available on the network. See the supported column types list for a overview of which Oracle column types are supported and how they are mapped to Sesam types.

Prototype

{
    "_id": "sql_system_id",
    "type": "system:oracle",
    "name": "The Oracle Database",
    "username":"username-here",
    "password":"secret",
    "host":"fqdn-or-ip-address-here",
    "port": 1521,
    "database": "database-name"
}

Properties

Property Type Description Default Req
username String Username to use when connecting to the database.   Yes
password String Password to use when connecting to the database.   Yes
host String Host name or IP address to the database server. Must be DNS resolvable if non-numeric.   Yes
port Integer Database IP port. 1521  
database String Name/id of database to connect to.   Yes

Example configuration

Example Oracle configuration:

{
    "_id": "oracle_db",
    "name": "Oracle test database",
    "type": "system:oracle",
    "username": "system",
    "password": "oracle",
    "host": "oracle",
    "database": "XE"
}

The Oracle TNS system

The Oracle SQL system represents a Oracle RDBMS configured using a TNS name See the supported column types list for a overview of which Oracle column types are supported and how they are mapped to Sesam types.

Prototype

{
    "_id": "sql_system_id",
    "type": "system:oracle_tns",
    "name": "The Oracle Database",
    "username":"username-here",
    "password":"secret",
    "tns_name": "tns-name-here"
}

Properties

Property Type Description Default Req
username String Username to use when connecting to the database.   Yes
password String Password to use when connecting to the database.   Yes
tns_name String A fully qualified Oracle TNS name   Yes

Example configuration

Example Oracle TNS configuration:

{
    "_id": "oracle_db",
    "name": "Oracle test database",
    "type": "system:oracle_tns",
    "username": "system",
    "password": "oracle",
    "tns_name": "(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = foo)(PORT = 1521)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = BAR)))""
}

The MSSQL system

The MSSQL system represents a Microsoft SQL Server available over the network. See the supported column types list for a overview of which SQL Server column types are supported and how they are mapped to Sesam types.

Prototype

{
    "_id": "sql_system_id",
    "type": "system:mssql",
    "name": "The Microsoft SQL Server Database",
    "username":"username-here",
    "password":"secret",
    "host":"fqdn-or-ip-address-here",
    "tds_version":"7.4",
    "port": 1433,
    "database": "database-name"
}

Properties

Property Type Description Default Req
username String Username to use when connecting to the database.   Yes
password String Password to use when connecting to the database.   Yes
host String Host name or IP address to the database server. Must be DNS resolvable if non-numeric.   Yes
port Integer Database IP port. 1433  
database String Name/id of database to connect to.   Yes
tds_version String Version of the TDS protocol to use for the driver. Note that the default is null which means it's not set. This will tell the database driver to attempt to auto-detect the protocol version, which should work in most cases. However, if you experience unknown or general connection errors, you can try to specify the TDS protocol version string manually (typically on the form "7.0", "7.4" etc).    

Example configuration

Example MS SQL Server configuration:

{
    "_id": "sqlserver_db",
    "name": "MS SQL Server test database",
    "type": "system:mssql",
    "username": "user",
    "password": "password",
    "host": "localhost",
    "port": 1433,
    "database": "testdb"
}

The Microsoft Azure SQL Data Warehouse system

This system type represents a Microsoft Azure SQL Data Warehouse server running in Azure.

See the supported column types list for a overview of which SQL Server column types are supported and how they are mapped to Sesam types.

Prototype

{
    "_id": "sql_system_id",
    "type": "system:mssql-azure-dw",
    "name": "A Microsoft Azure SQL Data Warehouse server",
    "username":"username-here",
    "password":"secret",
    "host":"fqdn-or-ip-address-here",
    "port": 1433,
    "database": "database-name"
}

Properties

Property Type Description Default Req
username String Username to use when connecting to the database.   Yes
password String Password to use when connecting to the database.   Yes
host String Host name or IP address to the database server. Must be DNS resolvable if non-numeric.   Yes
port Integer Database IP port. 1433  
database String Name/id of database to connect to.   Yes

Example configuration

Example MS SQL Server configuration:

{
    "_id": "sqlserver_db",
    "name": "MS Azure DW SQL Server test database",
    "type": "system:mssql-azure-dw",
    "username": "user",
    "password": "password",
    "host": "myserver.database.windows.net",
    "port": 1433,
    "database": "testdb"
}

Bulk operations in Microsoft SQL server and Azure SQL Data Warehouse systems

Both Microsoft SQL Server and Azure SQL Data Warehouse support bulk operations for uploading data. Sesam uses the bcp utility for bulk uploading.

When a pipe has been configured with a SQL sink that has the use_bulk_operations parameter set to true, this happens when the pipe runs:

  1. Sesam creates a temporary database table named "SESAM_BULK_TMP_<table>" (where <table> is the name of the table the sink writes to).
  2. Sesam writes a temporary file to the local disk that is formatted in a way that the bcp utility understands.
  3. Sesam runs the bcp utility, which will upload the content of the file to the temporary table.
  4. Sesam runs a MERGE sql statement that updates the target table with the contents of the temporary table (inserting and updating rows as required).
  5. Sesam drops the temporary database table.

For this method to work, Sesam must have permissions to create and drop tables in the database. If for some reason that is not possible, the use_bulk_operations parameter in the sql sink can be set to false to make the sink use the (slower) INSERT and UPDATE sql statements to upload data.

The MySQL system

The MySQL system represents a MySQL database available over the network: See the supported column types list for a overview of which MySQL column types are supported and how they are mapped to Sesam types.

Prototype

{
    "_id": "sql_system_id",
    "type": "system:mysql",
    "name": "The MySQL Database",
    "username":"username-here",
    "password":"secret",
    "host":"fqdn-or-ip-address-here",
    "port": 3306,
    "database": "database-name"
}

Properties

Property Type Description Default Req
username String Username to use when connecting to the database.   Yes
password String Password to use when connecting to the database.   Yes
host String Host name or IP address to the database server. Must be DNS resolvable if non-numeric.   Yes
port Integer Database IP port. 3306  
database String Name/id of database to connect to.   Yes

Example configuration

Example MySQL configuration:

{
    "_id": "sqlserver_db",
    "name": "MySQL test database",
    "type": "system:mysql",
    "username": "user",
    "password": "password",
    "host": "localhost",
    "port": 3306,
    "database": "testdb"
}

The PostgreSQL system

The PostgreSQL system represents a PostgreSQL RDBMS available on the network. See the supported column types list for a overview of which PostgreSQL column types are supported and how they are mapped to Sesam types.

Prototype

{
    "_id": "sql_system_id",
    "type": "system:postgresql,
    "name": "The PostgreSQL Database",
    "username":"username-here",
    "password":"secret",
    "host":"fqdn-or-ip-address-here",
    "port": 5432,
    "database": "database-name"
}

Properties

Property Type Description Default Req
username String Username to use when connecting to the database.   Yes
password String Password to use when connecting to the database.   Yes
host String Host name or IP address to the database server. Must be DNS resolvable if non-numeric.   Yes
port Integer Database IP port. 5432  
database String Name/id of database to connect to.   Yes

Example configuration

Example PostgreSQL configuration:

{
    "_id": "postgresql_db",
    "name": "PostgreSQL test database",
    "type": "system:postgresql",
    "username": "user",
    "password": "pw",
    "host": "test.postgresql.mydomain.com",
    "database": "test"
}

The LDAP system

The LDAP system contains the configuration needed to communicate with a LDAP system. It is used by LDAP sources to stream entities from LDAP catalogs. Note that Microsoft ActiveDirectory is also supported through its LDAP-compatible interface/API.

It supports the following properties:

Prototype

{
    "host": "FQDN of LDAP host",
    "port": 389,
    "use_ssl": false,
    "username": "authentication-username-here",
    "password": "authentication-password-here",
    "charset": "latin-1"
}

Properties

Property Type Description Default Req
host String The fully qualified domain name (FQDN) of the LDAP host server "localhost"  
port Integer The TCP port of the LDAP service. 389  
use_ssl Boolean Indicates to the client whether to use a secure socket layer (SSL) or not when communicating with the LDAP service false  
username String The user to authenticate as against the LDAP service. If not set, no authentication will be attempted.    
password String The password to use for authenticating with the LDAP service. Required if username is set.   Yes
charset String The charset used to encode strings in the LDAP database. Defaults to "latin-1" aka "ISO-8859-1", as "UTF-8" is usually not the default encoding in LDAP catalogs at the time of writing. "latin-1"  

Example configuration

{
    "_id": "bouvet_ldap",
    "name": "Bouvet LDAP server",
    "type": "system:ldap",
    "host": "dc1.bouvet.no",
    "port": 389,
    "username": "bouvet\\some-user",
    "password": "********"
}

The SMTP system (Experimental)

The SMTP system represents the information needed to connect to a SMTP server for sending emails. It is used in conjunction with the mail message sink to construct and send emails based on the entities it receives.

Prototype

{
    "_id": "id-of-system",
    "name": "Name of system",
    "type": "system:smtp",
    "smtp_server": "localhost",
    "smtp_port": 25,
    "smtp_username": None,
    "smtp_password": None,
    "use_tls": false,
    "max_per_hour": 1000
}

Properties

Property Type Description Default Req
smtp_server String Contains a FQDN of the SMTP service to use "localhost"  
smtp_port Integer The TCP port to use when talking to the SMTP service 25  
smtp_username String The username to use when authenticating with the SMTP service. If not set, no authentication is attempted.    
smtp_password String The password to use if smtp_username is set. It is mandatory if the smtp_username is provided.   Yes
use_tls Boolean Indicating to the client to use TLS encryption when communicating with the SMTP service. false  
max_per_hour Integer The maximum number of messages to send for any hour. It is used for stopping run-away message sending in development or testing. Note that any message not sent will be logged but discarded. 1000  

Example configuration

{
    "_id": "our-smtp-server",
    "name": "Our SMTP Server",
    "type": "system:smtp",
    "smtp_server": "localhost",
    "smtp_port": 25,
    "smtp_username": "some-user",
    "smtp_password": "*********",
    "max_per_hour": 100000
}

The Solr system

The Solr system represents the information needed to connect to a Apache Solr server for indexing JSON documents. It is used in conjunction with the Solr sink or the Sesam Databrowser sink sinks.

Prototype

{
    "_id": "id-of-system",
    "name": "Name of system",
    "type": "system:solr",
    "url": "http://localhost:8983/solr/",
    "timeout": 30,
}

Properties

Property Type Description Default Req
url String Contains a full URL to the Solr dataset to read/write documents from "http://localhost:8983/solr/"  
timeout Integer The number of seconds to wait for a response from the Solr server. 30  

Example configuration

{
    "_id": "our-solr-server",
    "name": "Our Solr Server",
    "type": "system:solr",
    "url": "http://localhost:8983/solr/"
}

The Elasticsearch system

The Elasticsearch system represents the information needed to connect to an Elasticsearch server/cluster for indexing JSON documents. It is used in conjunction with the Elasticsearch sink.

Prototype

{
    "_id": "id-of-system",
    "name": "Name of system",
    "type": "system:elasticsearch",
    "hosts": ["localhost:9200"]
}

Properties

Property Type Description Default Req
hosts List<String> Contains a list of host+port pairs, or full URL to the Elasticsearch server(s) ["localhost:9200"]  

Example configuration

{
    "_id": "our-elasticsearch-server",
    "name": "Our Elasticsearch Server",
    "type": "system:elasticsearch",
    "hosts": ["localhost:9200"]
}

The Twilio system (Experimental)

The Twilio system is a SMS system used with SMS message sinks to construct and send SMS messages from entities.

It has the following properties:

Prototype

{
    "_id": "system-id",
    "name": "Service name",
    "type": "system:twilio",
    "account": "twilio-account-number",
    "token": "twilio-api-token",
    "max_per_hour": 1000
}

Properties

Property Type Description Default Req
account String The Twilio account number   Yes
token String The Twilio API token   Yes
max_per_hour Integer The maximum number of messages to send for any hour. It is used for stopping run-away message sending in development or testing. Note that any message not sent will be logged but discarded. 1000  

Example configuration

{
     "_id": "twilio_service",
     "name": "Twilio Service",
     "type": "system:twilio",
     "account": "12334567890",
     "token": "ABCD-ADEF-FAA1-1234",
     "max_per_hour": 100000
}

The URL system

The URL system represents a HTTP server (i.e. a web server) serving HTTP requests from a base url. It supports the HTTP and HTTPS protocols. It provides session handling, connection pooling and authentication services to sources and sinks which need to communicate with a HTTP server.

Prototype

{
    "_id": "id-of-system",
    "name": "Name of system",
    "type": "system:url",
    "url_pattern": "http://host:port/path/%s",
    "verify_ssl": false,
    "username": None,
    "password": None,
    "jwt_token": None,
    "headers": {
        "MY_HEADER": "some-value",
        "MY_OTHER_HEADER": "$ENV(key-for-other-value)",
        "MY_SECRET_HEADER": "$SECRET(secret-key)"
    },
    "authentication": "basic",
    "connect_timeout": 60,
    "read_timeout": 7200
}

Properties

Property Type Description Default Req
url_pattern String Similar to base_url except one can use the %s token to tell where relative URLs are to be inserted into the url_pattern to produce absolute URLs. If %s is omitted then the relative URL is appended to the url_pattern.   Yes
base_url String Deprecated. Use the url_pattern property instead. The full URL of the base url of the HTTP server. Relative URLs are URL joined against this base URL to produce absolute URLs. Note that you may want a / at the end of the value.    
verify_ssl Boolean Indicate to the client if it should attempt to verify the SSL certificate when communicating with the HTTP server over SSL/TLS. false  
username String The username to use when authenticating with the HTTP server. Note that you also have to specify authentication protocol in authentication and password for this property to have any effect.    
password String The password to use if username and authentication is set. It is mandatory if username is provided.   Yes*
jwt_token String If authentication is set to jwt, this property must hold the JWT token to use towards the remote server.    
headers Dict<String,String> A optional set of header values to set as defaults in request made using the URL system. Both keys and values must evaluate to strings. Note that any "Authorization" header provided in this object is automatically overwritten when using the jwt_token property.    
authentication String What kind of authentication protocol to use. Note that authentication is opt-in only and the default is no authentication. Allowed values is either "basic", "digest", "ntlm" or "jwt". Note that username, password or jwt_token might be also required depending on the authentication scheme selected.    
connect_timeout Integer Number of seconds to wait for connecting to the HTTP server before timing out. A value of null means wait indefinitely. 60  
read_timeout Integer Number of seconds to wait for the HTTP server to respond to a request before timing out. A value of null means wait indefinitely. 7200  

[1] Exactly one of base_url and url_pattern must be specified.

Example configuration

{
    "_id": "our-http-server",
    "name": "Our HTTP Server",
    "type": "system:url",
    "base_url": "http://our.domain.com/files"
}

Example with ntlm configuration:

{
    "_id": "our-http-server",
    "name": "Our HTTP Server",
    "type": "system:url",
    "authentication": "ntlm",
    "username": "domain\\user",
    "password": "secret",
    "base_url": "http://our.domain.com/files"
}

The REST system (Experimental)

The REST system represents a REST service (i.e. a web server) serving HTTP requests from a base url using the REST vocabulary of GET, PUT, POST and PATCH.

It is used by the REST sink.

It supports the HTTP and HTTPS protocols. It provides session handling, connection pooling and authentication services to sources and sinks which need to communicate with a HTTP server.

Prototype

{
    "_id": "id-of-system",
    "name": "Name of system",
    "type": "system:rest",
    "url_pattern": "http://host:port/path/%s",
    "verify_ssl": false,
    "username": None,
    "password": None,
    "authentication": "basic",
    "jwt_token": None,
    "connect_timeout": 60,
    "read_timeout": 7200,
    "operations": {
        "delete-operation": {
            "url" : "/a/service/that/supports/delete/{{ _id }}",
            "method": "DELETE"
        },
        "put-operation": {
            "url" : "/some/service/that/supports/put",
            "method": "PUT",
            "headers": {
                "Content-type": "application/json"
            },
            "payload-type": "json"
        },
        "post-operation": {
            "url" : "/some/service/that/supports/post",
            "method": "POST",
            "payload-type": "form"
        },
        "patch-operation": {
            "url" : "/some/service/that/supports/patch",
            "headers": {
                "Content-type": "application/xml"
            },
            "method": "PATCH"
        }
    }
}

Properties

Property Type Description Default Req
url_pattern String Similar to base_url except one can use the %s token to tell where relative URLs are to be inserted into the url_pattern to produce absolute URLs. If %s is omitted then the relative URL is appended to the url_pattern.   Yes
base_url String Deprecated. Use the url_pattern property instead. The full URL of the base url of the HTTP server. Relative URLs are URL joined against this base URL to produce absolute URLs. Note that you may want a / at the end of the value.    
verify_ssl Boolean Indicate to the client if it should attempt to verify the SSL certificate when communicating with the HTTP server over SSL/TLS. false  
username String The username to use when authenticating with the HTTP server. Note that you also have to specify authentication protocol in authentication and password for this property to have any effect.    
password String The password to use if username and authentication is set. It is mandatory if username is provided.   Yes*
authentication String What kind of authentication protocol to use. Note that authentication is opt-in only and the default is no authentication. Allowed values is either "basic", "digest", "ntlm" or "jwt". Note that username, password or jwt_token might be also required depending on the authentication scheme selected.    
jwt_token String If authentication is set to jwt, this property must hold the JWT token to use towards the remote server.    
headers Dict<String,String> A optional set of header values to set as defaults in request made using the URL system. Both keys and values must evaluate to strings. Note that any "Authorization" header provided in this object is automatically overwritten when using the jwt_token property. The default headers can also be overridden in the operation properties on a per-method basis - see next section for detauks.    
connect_timeout Integer Number of seconds to wait for connecting to the HTTP server before timing out. A value of null means wait indefinitely. 60  
read_timeout Integer Number of seconds to wait for the HTTP server to respond to a request before timing out. A value of null means wait indefinitely. 7200  
operations Object An object containing the registered operations allowed for the REST service. See the next section for details. At least one operation need to be registered for the system.   Yes

Operation properties

You can register as many named "operations" as you like with the system (even using the same type of "method"). A operation configuration looks like:

Property Type Description Default Req
url String A string containing a absolute URL or relative path. The URL and/or path must match the base_url specified in the system. The property supports the Jinja template (http://jinja.pocoo.org/) syntax with the entities properties available to the templating context.   Yes
method String A enumeration of "POST", "PUT", "DELETE" and "PATCH" (note: case sensitive) that represents the HTTP operation that the operation should execute on the url specified.   Yes
headers Dict<String,String> An optional object that contain key-value mappings for the HTTP request header. Entries in this dictionary will override any default headers property defined on the system (see previous section).    
params Objects An optional object that contain key-value mappings for any HTTP parameters.    
payload-type String A enumeration of "json", "json-transit" and "form", that denotes how to treat the payload property of the entity (see the expected entity shape section of the REST sink for details). If you specify "json", the payload contents will serialized to JSON (without transit encoding). If you specify "json-transit" you will get a transit-encoded JSON document. If "form" is used, the contents will be used to construct a HTML FORM for the request. In this case, if the property contains a list, the request will use a multi-part form. If payload-type is omitted, the contents of the payload property will be assumed to be a string.    

Example configuration

{
    "_id": "our-rest-service",
    "name": "Our REST service",
    "base_url": "http://our.domain.com/api"
    "type" : "system:rest",
    "operations": {
       "delete-man": {
           "url" : "/men/{{ properties.collection_name }}/{{ _id }}",
           "method": "DELETE",
       },
       "delete-woman": {
           "url" : "/women/{{ properties.collection_name }}/{{ _id }}",
           "method": "DELETE"
       },
       "update-man": {
           "url" : "/men/{{ properties.collection_name }}/",
           "method": "POST",
           "headers": {
               "Content-type": "application/xml"
           }
       },
       "update-woman": {
           "url" : "/women/{{ properties.collection_name }}/",
           "method": "POST",
           "headers": {
               "Content-type": "application/json"
           },
           "payload-type": "json"
       }
    }
}

The microservice system (Experimental)

The microservice system is similar to the URL system, except that it also spins up the microservice that it defines. This system can be used with the JSON source, the HTTP transform and the JSON push sink.

The docker property lets one specify a Docker container that should be spun up. Note that the microservice system does not have the base_url property. The reason is that it is able to figure out this itself.

The microservice system supports private repositories.

A microservice must communicate with the outside world using either the HTTP protocol (the default) or the HTTPS protocol. Set the use_https property to true to enable HTTPS.

The system provides session handling, connection pooling and authentication services to sources, transforms and sinks which need to communicate with the microservice.

Prototype

{
    "_id": "id-of-microservice",
    "name": "Name of microservice",
    "type": "system:microservice",
    "docker": {
        "image": "some-repo/some-image:some-tag",
        "port": 5000,
        "username": null,
        "password": null,
        "memory": 128,
        "cpu_quota": 25,
        "cpu_period": 100,
        "cpuset_cpus": null,
        "data_folder": true,
        "environment": {
            "SOME-VARIABLE": "SOME-VALUE",
            "OTHER-VARIABLE": {
                "key1": "value1",
                "key2": "value2"
            }
        }
    },
    "use_https": false,
    "verify_ssl": false,
    "username": null,
    "password": null,
    "authentication": "basic",
    "connect_timeout": 60,
    "read_timeout": 7200
}

Note that due to Docker naming conventions, the _id of the microservice must start with a ASCII letter or number character and the only non-letter or number characters allowed in the rest of the string are "_", "." or "-".

Properties

Property Type Description Default Req
docker.image String The fully qualified name of a Docker image, e.g. sesam/file-share-service:latest or quay.io/someuser/someimage:1.2.3.   Yes
docker.port Integer The port on which to talk to the microservice. This should be one of the ports that the Docker container exposes.   Yes
docker.environment Dict<String,String|Object> The environment variables that should be passed to the microservice's Docker container. Note that string literals are passed along to the docker container as-is, while other types of values are serialized to a string in JSON format.    
docker.username String If the Docker images is located in a private repository, then the username must be specified here.    
docker.password String If the Docker images is located in a private repository, then the password must be specified here.    
docker.memory Integer The number of MB of RAM to allocate for the microservice. 128  
docker.cpu_quota Integer The percentage of CPU resources the container is allowed to consume. Use with extreme care as you can easily starve other processes on the server for resources if you set this value incorrectly or suboptimally. See the Docker documentation for details). Note that the value is divided by 1000 with respects to the range in the Docker documentation. Also note that the value represents the sum of CPU resources used across all cores. If the container is allowed to use more than one CPU (by default it can use all of them) the number can exceed 100. I.e. for a 4 core CPU, 400 means use all resources on all CPU cores. 25  
docker.cpu_period Integer The percentage of CPU time the OS scheduler is allowed use (see the Docker documentation for details). Note that the value is divided by 1000 with respects to the range in the Docker documentation. You should not normally change the default value. 100  
docker.cpuset_cpus String A string expression representing the CPU cores the container is allowed to use, see docker.cpu_quota. The default (null value) means the container can use all cores. A value of "0,4" means use core 0 and 4. A value of "0-4" means use cores 0 through 4. A value of "0,6-8" means use core 0 and 6 through 8. null  
docker.data_folder Boolean If set to true then a persistent folder will be created for the system and mounted as /data inside the container. Note that the folder will not be deleted unless the system is deleted. false  
use_https String If set to true then the system will use the https protocol to communicate with the microservice. false  
verify_ssl Boolean Indicate to the client if it should attempt to verify the SSL certificate when communicating with the microservice over SSL/TLS. false  
username String The username to use when authenticating with the microservice. Note that you also have to specify authentication protocol in authentication and password for this property to have any effect.    
password String The password to use if username and authentication is set. It is mandatory if username is provided.   Yes*
authentication String What kind of authentication protocol to use. Note that authentication is opt-in only and the default is no authentication. No authentication set means means any username or password set will be ignored. Allowed values is either "basic" or "digest".    
connect_timeout Integer Number of seconds to wait for connecting to the microservice before timing out. A value of null means wait indefinitely. 60  
read_timeout Integer Number of seconds to wait for the microservice to respond to a request before timing out. A value of null means wait indefinitely. 7200  

Microservice APIs

The Microservice system exposes several API endpoints that can be used to access or gather information about the running service:

Example configuration

{
    "_id": "our-http-server",
    "name": "My microservice",
    "type": "system:microservice",
    "docker": {
        "image": "myrepo/myimage:1.0",
        "port": 4444,
        "environment": {
           "USE_PORT": "4444"
        }
    }
}

Pumps

Pumps are responsible for "pumping" data through the pipe by reading entities from a source and writing them into a sink. The pump is also responsible for retrying failed writes of entities and logging its activity. It can also write ultimately failed entities to a "dead letter" dataset for manual inspection. Pumps log their execution history in a internal dataset with the id "system:pump_execution:<pipe_id>". See the chapter on the pump execution dataset for more details about the contents of this dataset.

Prototype

{
    "schedule_interval": 30,
    "cron_expression": "* * * * *",
    "rescan_run_count": 10,
    "rescan_cron_expression": "* * * * *",
    "run_at_startup": false,
    "max_read_retries": 0,
    "read_retry_delay": 0,
    "write_retry_delay": 0,
    "max_retries_per_entity": 5,
    "max_consecutive_write_errors": 1,
    "max_write_errors_in_retry_dataset": 0,
    "dead_letter_dataset": "dead-letter-dataset-id",
    "track_dead_letters": false,
    "mode": "scheduled"
}

Properties

Note: A pump configuration needs to have either a schedule_interval or a cron_expression property to govern when the pump should be run. They are mutually exclusive with the cron_expression taking precedence if both are present. If neither property is set, the schedule_interval will be set to a default value. For pipes with a dataset sink and a dataset sink the default will be 30 seconds +/- 1.5 seconds. For all other pipes, the default will be 900 seconds +/- 45 seconds. It is good practice to always set the cron_expression property on pipes that reads from or writes to external systems.

If you are unfamiliar with cron expressions, you can read more of how they are formatted in the Cron Expressions document.

Property Type Description Default Req
schedule_interval Number The number of seconds between runs. It is mutually exclusive with the cron_expression property. (see the note above) Yes
cron_expression String A cron expression that indicates when the pump should run. It is mutually exclusive with the schedule_interval property.   Yes
rescan_run_count Integer How many times the pump should run before scheduling a complete rescan of the source of the pipe that the pump is part of. It is mutually exclusive with the rescan_cron_expression property.    
rescan_cron_expression String A cron expression that indicates when the pump should schedule a full rescan of the source of the pipe the pump is part of. It is mutually exclusive with the rescan_run_count property.    
run_at_startup Boolean A flag that indicates if the pump should run when Sesam starts up, in addition to the normal schedule specified by the schedule_interval or cron_expression properties. false  
dead_letter_dataset String The id of the dataset to write any entities that fail retries. This can only happen if max_write_errors_in_retry_dataset is non-zero and max_retries_per_entity for the particular entity has been exceeded. Dead letter datasets can be shared by more than one pipe.    
track_dead_letters Boolean A flag that indicates if the pump should delete any previously "dead letter" entities if a later version of it is successfully written to the sink. It is only active if the "dead_letter_dataset" property is set and retries are active. Note that enabling this option wil incur a performance cost because all successfully written entities must be looked up in the execution log to determine if it has been previously marked as "dead". false  
max_read_retries Integer A counter that indicates to the pump how many times it should retry when failing to read a entity from a source. The default (0) means that it should not retry but log an error immediately when encountering read errors. See also the read_retry_delay property. 0  
read_retry_delay Number How many seconds to wait before retrying after a read error (i.e. only if max_read_retries is non-zero). The default value is 0 which will retry immediately. If the reason for the read error is non-transient, the number of retries set by max_read_retries will be exhausted quickly - in this case, set this property to match the expected interval. 0  
write_retry_delay Number How many seconds to wait before retrying after a write error (i.e. only if max_consecutive_write_errors is larger than 1). The default value is 0 which will retry immediately. If the reason for the write error is non-transient, the number of retries set by max_consecutive_write_errors will be exhausted quickly - in this case, set this property to match the expected interval. 0  
max_retries_per_entity Integer A counter that indicates to the pump how many times it should retry a failing entity when writing to a sink before giving up on it, which in case it can optionally write it to a dead_letter_dataset (if specified). 5  
max_consecutive_write_errors Integer A counter that indicates to the pump how many consecutive write errors it tolerates before terminating the current run. The default (1) means it will terminate after the first write error it encounters. See also the write_retry_delay property. 1  
max_write_errors_in_retry_dataset Integer A counter that indicates to the pump how many write errors it accepts in its execution history dataset. If the number of retryable and not "dead" failed entities in the dataset exceeds this number, the pump will refuse to write any more failed entities to the execution dataset and terminate, even if the max_retries_per_entity or max_retries_per_entity is not reached at that point. This purpose of this property is to limit the size of the pump execution dataset in the case where a target system is unreachable (or misconfigured). The default value (0) effectively disables retries for write errors. 0  
mode String

The mode of operation. Valid options are "scheduled" (the default), "manual" and "off".

Pumps in scheduled mode will follow the configured schedule and the scheduler can be enabled and disabled at runtime.

Pumps in manual mode will not be scheduled and can only be operated manually through the Service API.

Pumps in off mode cannot be started at all.

"scheduled"  

Example configuration

The outermost object would be your pipe configuration, which is omitted here for brevity:

A scheduled pump running every 30 seconds, no retries or dead letter dataset:

{
    "pump": {
       "schedule_interval": 30
   }
}

A cron pump running every day at midnight with max 5 retries, maximum 100 retries in the execution log and a dead letter dataset. Also max ten consecutive write failures allowed:

{
    "pump": {
       "cron_expression": "0 0 * * *",
       "max_retries_per_entity": 5,
       "max_consecutive_write_errors": 10,
       "max_write_errors_in_retry_dataset": 100,
       "dead_letter_dataset": "pump-dead-letters"
   }
}

A scheduled pump running every 30 seconds but do a full rescan of the source every full hour. No retries or dead letter datasets:

{
    "pump": {
       "schedule_interval": 30,
       "rescan_cron_expression": "0 * * * *"
   }
}

A scheduled pump running every 5 minutes from 14:00 and ending at 14:55, AND fire every 5 minutes starting at 18:00 and ending at 18:55, every day. No retries or dead letter datasets:

{
    "pump": {
       "cron_expression": "0/5 14,18 * * ?"
   }
}

Pipes revisited

Short-hand configuration

As mentioned earlier, in the pipe section, there is a special "short hand" configuration for one of the most used pipes; pipes pumping entities from RDBMS tables to an internal dataset. Since this is an often encountered usecase, we have condensed the information needed into a single url-style form:

[
    {
       "_id": "Northwind",
       "type": "system:mysql",
       "name": "Northwind database",
       "username": "northwind",
       "password": "secret",
       "host": "mydb.example.org",
       "database": "Northwind"
    },
    {
       "_id": "Northwind:Orders",
       "type": "pipe",
       "name": "Orders from northwind",
       "short_config": "sql://Northwind/Orders"
    }
]

Currently, only the sql system and source is supported though other short forms may be added at a later time. The above example using the short_config form is equivalent to this fully expanded pipe configuration:

[
    {
       "_id": "Northwind",
       "type": "system:mysql",
       "name": "Northwind database",
       "username": "northwind",
       "password": "secret",
       "host": "mydb.example.org",
       "database": "Northwind"
    },
    {
       "_id": "Northwind:Orders",
       "type": "pipe",
       "source": {
           "type": "sql",
           "system": "Northwind",
           "table": "Orders"
       },
       "sink": {
           "type": "dataset",
           "dataset": "Northwind:Orders"
       },
       "pump": {
           "schedule_interval": 30
       }
    }
]

You can combine the short form with properties from the dataset sink, sql source and specific pump properties, as long as the _id and type properties aren't overridden, for example changing the pump schedule and startup flag:

[
    {
       "_id": "Northwind",
       "type": "system:mysql",
       "name": "Northwind database",
       "username": "northwind",
       "password": "secret",
       "host": "mydb.example.org",
       "database": "Northwind"
    },
    {
       "_id": "Northwind:Orders",
       "type": "pipe",
       "name": "Orders from northwind",
       "short_config": "sql://Northwind/Orders",
       "pump": {
           "schedule_interval": 60,
           "run_at_startup": true
       }
    }
]

Changing configuration on an existing pipe

When the configuration of a pipe is modified in such a way that the entities the pipe produces changes (for instance by changing the DTL transform of the pipe), the pipe's "last-seen" value must be cleared in order to reprocess already seen entities with the new pipe configuration.

This can be done by setting the "last-seen" value to an empty string with the update-last-seen operation in the Service API.