Golden property based on last updatedΒΆ
Make sure you have a reliable timestamp from the source that you propagate. Think about feedback loops if data is synced back. Can be good to standardise on e.g. $last_updated
.
Example enrich pipe:
{
"_id": "test-enrich",
"type": "pipe",
"source": {
"type": "dataset",
"dataset": "test",
"include_previous_versions": true
},
"sink": {
"set_initial_offset": "onload"
},
"transform": {
"type": "dtl",
"rules": {
"default": [
["copy", "*"],
["add", "$updated",
["dict",
["flatten",
["list",
["items",
["first",
["hops", {
"datasets": ["test-enrich t"],
"where": [
["eq", "_S._id", "t._id"]
],
"return": "t.$updated",
"track-dependencies": false
}]
]
],
["items",
["apply", "is-changed",
["filter",
["not",
["matches", "_*", "_.key"]
],
["key-values", "_S."]
]
]
]
]
]
]
]
],
"is-changed": [
["if",
["is-changed",
["path", "_P._S.key", "_."]
],
["discard"],
["add", "_S.", "_P._S.$last-update"]
]
]
}
},
"batch_size": 1,
"namespaces": {
"identity": "test",
"property": "test"
}
}
Example connect pipe:
{
"_id": "global-test",
"type": "pipe",
"source": {
"type": "merge",
"datasets": ["test-update t", "test-update2 t2"],
"equality_sets": [
[
["ni-id",
["ni", "t._id"]
],
["ni-id",
["ni", "t2._id"]
]
]
],
"identity": "first",
"strategy": "compact",
"version": 2
},
"transform": {
"type": "dtl",
"rules": {
"default": [
["add", "$updated",
["dict",
["items", "_S.$updated"]
]
],
["add", "x",
["first",
["values",
["apply", "newest",
["list",
["list", "test:x", "test2:x"]
]
]
]
]
],
["add", "y",
["first",
["values",
["apply", "newest",
["list",
["list", "test:y", "test2:y"]
]
]
]
]
]
],
"newest": [
["add", "_x",
["first",
["sorted-descending", "_.value",
["key-values",
["map-dict",
["if",
["in", "_.", "_S."], "_."], "_.", "_R._T.$updated"]
]
]
]
],
["add", "_x",
["first",
["values",
["map-dict",
["if",
["eq", "_.", "_T._x.key"], "_."], "_.", "_R._S"]
]
]
]
]
}
},
"metadata": {
"global": true
}
}