← Back to context

Comment by galeaspablo

7 months ago

Disclosure (given this is from Confluent): I'm ex MSK (Managed Streaming for Kafka at AWS) and my current company was competing with Confluent before we pivoted.

Yup, this is one more example, just like Pulsar. There are definitely great optimizations to be made on the average case. In the case of parallel consumer, if you'd like to keep ordering guarantees, you retain O(n^2) processing time in the worst case.

The issues arise when you try to traverse arbitrary dependency topologies in your messages. So you're left with two options:

1. Make damn sure that causal dependencies don't exhibit O(n^2) behavior, which requires formal models to be 100% sure. 2. Give up ordering or make some other nasty tradeoff.

At a high level the problem boils down to traversing a DAG in topological order. From computer science theory, we know that this requires a sorted index. And if you're implementing an index on top of Kafka, you might as well embed your data into and consume directly from the index. Of course, this is easier said than done, and that's why no one has cracked this problem yet. We were going to try, but alas we pivoted :)

Edit: Topological sort does not required a sorted index (or similar) if you don't care about concurrency. But then you've lost the advantages of your queue.

> traverse arbitrary dependency topologies

Is there another way to state this? It’s very difficult for me to grok.

> DAG

Directed acyclic graph right?

  • Apologies, we've been so deep into this problem that we take our slang for granted :)

    A graphical representation might be worth a thousand words, keeping in mind it's just one example. Imagine you're traversing the following.

    A1 -> A2 -> A3...

    |

    v

    B1 -> B2 -> B3...

    |

    v

    C1 -> C2 -> C3...

    |

    v

    D1 -> D2 -> D3...

    |

    v

    E1 -> E2 -> E3...

    |

    v

    F1 -> F2 -> F3...

    |

    v

    ...

    Efficient concurrent consumption of these messages (while respecting causal dependency) would take O(w + h), where w = the _width_ (left to right) of the longest sequence, and h = the _height_ (top to bottom of the first column)

    But Pulsar, Kafka + parallel consumer, Et al. would take O(n^2) either in processing time or in space complexity. This is because at a fundamental level, the underlying data storages store looks like this

    A1 -> A2 -> A3...

    B1 -> B2 -> B3...

    C1 -> C2 -> C3...

    D1 -> D2 -> D3...

    E1 -> E2 -> E3...

    F1 -> F2 -> F3...

    Notice that the underlying data storage loses information about nodes with multiple children (e.g., A1 previously parented both A2 and B1)

    If we want to respect order, the consumer will be responsible for declining to process messages that don't respect causal order. E.g., attempting to process F1 before E1. Thus we could get into a situation where we try to process F1, then E1, then D1, then C1, then B1, then A1. Now that A1 is processed, kafka tries again, but it tries F1, then E1, then D1, then C1, then B1... And so on and so forth. This is O(n^2) behavior.

    Without changing the underlying data storage architecture, you will either:

    1. Incur O(n^2) space or time complexity

    2. Reimplement the queuing mechanism at the consumer level, but then you might as well not even use Kafka (or others) at all. In practice this is not practical (my evidence being that no one has pulled it off).

    3. Face other nasty issues (e.g., in Kafka parallel consumer you can run out of memory or your processing time can become O(n^2)).

    • Wanted to say thanks so much for writing this all out - I've always thought of ordering as being sort of inherently against the point of parallel streams, so its interesting to hear about the state of the art and the benefits that are trying to be gleaned! I'm not thinking in stream processors terribly often so I wasn't aware of how dependencies are mapped.

      If you don't mind another followup (and your patience with my ignorance hasn't run out :P), wouldn't the efficient concurrent consumption imply knowing the dependency graph before the events are processed? IE, is it possible in any instance to get to O(w+h) in a stream?

      1 reply →

    • Do you have an example use case for this? This does seem like something unsuited to kafka, but I'm having a hard time imagining why you would structure something like this.

      1 reply →