Comment by woile
10 months ago
Looks interesting! Great work! When using cache, if you have 2 topics, each with multiple partitions. How does the join operation works? what if the partitions don't have the same id?
10 months ago
Looks interesting! Great work! When using cache, if you have 2 topics, each with multiple partitions. How does the join operation works? what if the partitions don't have the same id?
Thanks! This example uses nearby-joins (based on event time): https://slipstream.readthedocs.io/en/1.0.1/cookbook.html#joi....
I hope I get your question right, but if you're joining on a fixed key, the solution should be simpler. If the data is partitioned by id (partition 0 has id's 0 - 9, and 1 has 10 - 19), each message still passes through the handler that stores each message in the cache. So in the cache it will no longer be partitioned. Let's say each weather update has a fixed `id` we could join on, we'd simply use the following logic instead:
``` if w := weather_cache[id]: return f'The weather during {a["value"]} was {w["value"]}'
return a['value'], '?' ```
It may be the case that the weather updates come in late, in which case we may be joining with stale data. For that we can use Synchronization: https://slipstream.readthedocs.io/en/1.0.1/cookbook.html#syn...
Here's a full example that sends out corrections: https://gist.github.com/Menziess/22d8a511f61c04a8142d81510a0...
Instead, you could also wait by pausing the activity stream by setting the Checkpoints Dependeny downtime_threshold to 0. Perhaps negative values may also work, although I haven't tried yet.