Comment by galeaspablo
8 months ago
Agreed. The head of line problem is worth solving for certain use cases.
But today, all streaming systems (or workarounds) with per message key acknowledgements incur O(n^2) costs in either computation, bandwidth, or storage per n messages. This applies to Pulsar for example, which is often used for this feature.
Now, now, this degenerate time/space complexity might not show up every day, but when it does, you’re toast, and you have to wait it out.
My colleagues and I have studied this problem in depth for years, and our conclusion is that a fundamental architectural change is needed to support scalable per message key acknowledgements. Furthermore, the architecture will fundamentally require a sorted index, meaning that any such a queuing / streaming system will process n messages in O (n log n).
We’ve wanted to blog about this for a while, but never found the time. I hope this comment helps out if you’re thinking of relying on per message key acknowledgments; you should expect sporadic outages / delays.
Check out the parallel consumer: https://github.com/confluentinc/parallel-consumer
It processes unrelated keys in parallel within a partition. It has to track what offsets have been processed between the last committed offset of the partition and the tip (i.e. only what's currently processed out of order). When it commits, it saves this state in the commit metadata highly compressed.
Most of the time, it was only processing a small number of records out of order so this bookkeeping was insignificant, but if one key gets stuck, it would scale to at least 100,000 offsets ahead, at which point enough alarms would go off that we would do something. That's definitely a huge improvement to head of line blocking.
Disclosure (given this is from Confluent): I'm ex MSK (Managed Streaming for Kafka at AWS) and my current company was competing with Confluent before we pivoted.
Yup, this is one more example, just like Pulsar. There are definitely great optimizations to be made on the average case. In the case of parallel consumer, if you'd like to keep ordering guarantees, you retain O(n^2) processing time in the worst case.
The issues arise when you try to traverse arbitrary dependency topologies in your messages. So you're left with two options:
1. Make damn sure that causal dependencies don't exhibit O(n^2) behavior, which requires formal models to be 100% sure. 2. Give up ordering or make some other nasty tradeoff.
At a high level the problem boils down to traversing a DAG in topological order. From computer science theory, we know that this requires a sorted index. And if you're implementing an index on top of Kafka, you might as well embed your data into and consume directly from the index. Of course, this is easier said than done, and that's why no one has cracked this problem yet. We were going to try, but alas we pivoted :)
Edit: Topological sort does not required a sorted index (or similar) if you don't care about concurrency. But then you've lost the advantages of your queue.
> traverse arbitrary dependency topologies
Is there another way to state this? It’s very difficult for me to grok.
> DAG
Directed acyclic graph right?
5 replies →
I suppose it depends on your message volume. To me, processing 100k messages and then getting a page however long later as the broker (or whatever) falls apart sounds much worse than head of line blocking and seeing the problem directly in my consumer. If I need to not do head of line blocking, I can build whatever failsafe mechanisms I need for the problematic data and defer to some other queueing system (typically, just add an attempt counter and replay the message to the same kafka topic and then if attempts > X, send it off to wherever)
I'd rather debug a worker problem than an infra scaling problem every day of the week and twice on Sundays.
It's interesting you say that, since this turned an infra scaling problem into a worker problem for us. Previously, we would get terrible head-of-line throughput issues, so we would use an egregious number of partitions to try to alleviate that. Lots of partitions is hard to manage since resizing topics is operationally tedious and it puts a lot of strain on brokers. But no matter how many partitions you have, the head-of-line still blocks. Even cases where certain keys had slightly slower throughput would clog up the whole partition with normal consumers.
The parallel consumer nearly entirely solved this problem. Only the most egregious cases where keys were ~3000 times slower than other keys would cause an issue, and then you could solve it by disabling that key for a while.
1 reply →
Follow on: If you're using kafka to publish messages to multiple consumers, this is even worse as now you're infecting every consumer with data processing issues from every other consumer. Bad juju
There's also a similar project from Line https://github.com/line/decaton.
> Furthermore, the architecture will fundamentally require a sorted index, meaning that any such a queuing / streaming system will process n messages in O (n log n).
Would using a sorted index have an impact on the measured servicing time of each message? (Not worst-case, something more like average-cass). It's made extremely clear in the Kafka docs that Kafka's relies heavily on the operating systems filesystem cache for performance, and that seeking through events on disk turns out to be very slow compared to just processing events in-order.
Let’s separate two advantages in the average case with Kafka.
1. Sequential IO when reading from disk.
2. Use of disk cache (instead of reading from disk) when re-reading recently read events.
#2 helps when you have many consumer groups reading from the tail. And this advantage would extend to index-based streaming.
But #1 would not fully extend to index-based streaming.
When does this matter? When adding a new consumer group you would lose the speed advantage of sequential IO, because it consumes from the beginning (which isn’t in disk cache).
BUT this has become less important now that SSDs are so prevalent and affordable. Additionally, in practice, the bottleneck isn’t in disk IO. Consumers tend to perform IO in other systems that incur O(log n) per insert. Or network cards can get saturated way before disk IO is the limiting factor.
I speculate that we got Kafka et al because we didn’t have such abundance of SSDs in the early 2010’s.
So, returning to your question, you wouldn’t notice the difference in the average case, as long as there are SSDs under the hood.
> streaming system will process n messages in O (n log n)
I'm guessing this is mostly around how backed up the stream is. n isn't the total number of messages but rather the current number of unacked messages.
Would a radix structure work better here? If you throw something like a UUID7 on the messages and store them in a radix structure you should be able to get O(n) performance here correct? Or am I not understanding the problem well.
I think the problem is that if you want quick access to all messages with a particular key then you have to maintain some kind of index over all persisted messages. So n would be total number of persisted messages as I read it, which can be quite large. But even storing them in the first place is O(n), so O(n log n) might not be so bad.
That's correct. And keep in mind that you might have new consumers starting from the beginning come into play, so you have to permanently retain the indexes.
And yes, O(n log n ) is not bad at all. Sorted database indexes (whether SQL, NoSQL, or AcmeVendorSQL, etc.) already take O(n log n) to insert n elements into data storage or to read n elements from data storage.