Show HN: PgQueuer – Transform PostgreSQL into a Job Queue

1 year ago (github.com)

PgQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PgQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.

Does the celery SQLAlchemy broker support PostgreSQL's LISTEN/NOTIFY features?

Similar support in SQLite would simplify testing applications built with celery.

How to add table event messages to SQLite so that the SQLite broker has the same features as AMQP? Could a vtable facade send messages on tablet events?

Are there sqlite Triggers?

Celery > Backends and Brokers: https://docs.celeryq.dev/en/stable/getting-started/backends-...

/? sqlalchemy listen notify: https://www.google.com/search?q=sqlalchemy+listen+notify :

asyncpg.Connection.add_listener

sqlalchemy.event.listen, @listen_for

psychopg2 conn.poll(), while connection.notifies

psychopg2 > docs > advanced > Advanced notifications: https://www.psycopg.org/docs/advanced.html#asynchronous-noti...

PgQueuer.db, PgQueuer.listeners.add_listener; asyncpg add_listener: https://github.com/janbjorge/PgQueuer/blob/main/src/PgQueuer...

asyncpg/tests/test_listeners.py: https://github.com/MagicStack/asyncpg/blob/master/tests/test...

/? sqlite LISTEN NOTIFY: https://www.google.com/search?q=sqlite+listen+notify

sqlite3 update_hook: https://www.sqlite.org/c3ref/update_hook.html

We use listen notify extensively and it is great. The things it lacks most for us is guaranteed single recipient. All subscribers get all notifications which leads to problems in determining who should act on the message n our case.

  • Could use the notify to awake, and then the worker needs to lock the job row? Whichever worker gets the lock, acts on the message

    • Thats exactly what we do but taking a lock takes 1 RTT to the database which means about 100ms. it limits the number of events receivers can handle. IF you have too many events, receivers will be just trying to take a lock most of the time.

      5 replies →

  • Wouldn't it be possible for you to lose jobs if no subscriber is listening?

    • We use a garbage collector to error restart if a job is not served within a specified amount of time.

How does LISTEN/NOTIFY compare to using select for update skip locked? I thought listen/notify can lose queue items when the process crashes? Is that true? Do you need to code for those cases in some manner?

  • LISTEN/NOTIFY and SELECT FOR UPDATE SKIP LOCKED serve different purposes in PgQueuer. LISTEN/NOTIFY notifies consumers about changes in the queue table, prompting them to check for new jobs. This method doesn’t inherently lose messages if a process crashes, because it simply triggers a check rather than transmitting data. The actual job handling and locking are managed by SELECT FOR UPDATE SKIP LOCKED, which safely processes each job even when multiple workers are involved.

  • There are two things:

    1. Signaling

    2. Messaging

    In some systems, those are, effectively, the same. A consumer listens, and the signal is the message. If the consumer process crashes, the message returns to the queue and gets processed when the consumer comes back online.

    If the signal and messaging are separated, as in Postgres, where LISTEN/NOTIFY is the signal, and the skip locked query is the message pull, the consumer process would need to do some combination of polling and listening.

    In the consumer, that could essentially be a loop that’s just doing the skip locked query on startup, then dropping into a LISTEN query only once there are no messages present in the queue. Then the LISTEN/NOTIFY is just signaling to tell the consumer to check for new messages.

  • I think the usage of listen/notify is just a mechanism to save you from querying the database every X seconds looking for new tasks (polling). That has some drawbacks, because if the timeout is too small, you are making too much queries that usually may not return any new tasks, and if it's too big, then you may start processing the task long after it was submitted. This way, it just notifies you that new tasks are ready so you can query the database.

I am going to go the other direction on this... to anyone reading this, please consider using a backend-generic queueing system for your Python project.

Why? Mainly because those systems offer good affordances for testing and running locally in an operationally simple way. They also tend to have decent default answers for various futzy questions around disconnects at various parts of the workflow.

We all know Celery is a buggy pain in the butt, but rolling your own job queue likely ends up with you just writing a similary-buggy pain in the butt. We've already done "Celery but simpler", it's stuff like Dramatiq!

If you have backend-specific needs, you won't listen to this advice. But think deeply how important your needs are. Computers are fast, and you can deal with a lot of events with most systems.

Meanwhile if you use a backend-generic system... well you could write a backend using PgQueuer!

  • In my experience, it's easy to test locally with PG: we have unit tests which re-create DB for each test... It works.

    Also DB transactions are absolutely the best way to provide ACID guarantee

  • > those systems offer good affordances for testing and running locally in an operationally simple way

    Define "operationally simple", most if not all of them need persistent anyway, on top of the queue itself. This eliminates the queue and uses a persistent you likely already have.

    • Well for example, lots of queueing libraries have an "eager task" runtime option. What does that do? Instead of putting work into a backend queue, it just immediately runs the task in-process. You don't need any processing queue!

      How many times have you shipped some background task change, only to realize half your test suite doesn't do anything with background tasks, and you're not testing your business logic to the logical conclusion? Eager task execution catches bugs earlier on, and is close enough to the reality for things that matter, while removing the need for, say, multi-process cordination in most tests.

      And you can still test things the "real way" if you need to!

      And to your other point: you can use Dramatiq with Postgres, for example[0]. I've written custom backends that just use pg for these libs, it's usually straightforward because the broker classes tend to abstract the gnarly things.

      [0]: https://pypi.org/project/dramatiq-pg/

    • Some message queue brokers that traditionally implement their own backends can also use Postgresql (and other RDBMSs) for persistence. This is a reasonable option if you a.) want to consolidate persistence backends b.) want a mature, battle proven broker and client stack.

  • Some names

    - Celery (massive and heavy)

    - Dramatiq

    - APScheduler

    - Huey

    Today, Redis queues, unless stricly a single process, seem to be most pain free for small scale use.

I've been referring to this post about issues with Celery: https://docs.hatchet.run/blog/problems-with-celery

Does PgQueuer address any of them?

You could even layer in PostgREST for a nice HTTP API that is available from any language!

  • Already done. See: PostgREST. Want to use PostgreSQL (or most other RDBMSs) as the backend for an actively developed, multiprotocol, multiplatform, open source, battle proven message broker that also provides a messaging REST API of its own? Use ActiveMQ (either flavor) and configure a JDBC backend. Done.

Any suggestions for something like this for dotnet?

  • Hangfire with PostgreSQL driver.

    • Hangfire with few plugins can be an absolute godsent in 99% of situations i've encountered. The one downside is the documentation is very very lacking, and you have to google a lot until you get to a good place. Despite that, i've used, use, and will continue to use Hangfire, as it's a great tool!

You can make anything that stores data into a job queue.

  • But can you make a decent job queue with anything that stores data? Not easily. E.g. you need atomicity if multiple consumers can take jobs, and I think you need CAS for that, not just any storage will do, right?

    You probably need ACI and also D if you want your jobs to persist.

It looks like PgQueuer integrates well with Postgres RPC calls, triggers, and cronjobs (via pg_cron). Interesting, will check it out.

Cool, congrats on releasing. Have you seen graphile worker? Wondering how this compares or if you're building for different use-cases.

Although I am more of a MySQL guy, I have been exploring PostgreSQL from sometime. Seems it has lot of features out of box.

This is very interesting tool.

The name is perhaps slightly fraught with risk... I missed the second 'u', at first, so I misread it.

Why so much code for Avery simple concept

One table.

Producer writes Co sumer reads

A very good idea

I’ve been thinking about the potential for PostgreSQL-backed job queue libraries to share a common schema. For instance, I’m a big fan of Oban in Elixir: https://github.com/sorentwo/oban

Given that there are many Sidekiq-compatible libraries across various languages, it might be beneficial to have a similar approach for PostgreSQL-based job queues. This could allow for job processing in different languages while maintaining compatibility.

Alternatively, we could consider developing a core job queue library in Rust, with language-specific bindings. This would provide a robust, cross-language solution while leveraging the performance and safety benefits of Rust.

  • If you want a generic queue that can be consumed in any runtime, you can just build it directly into postgres via extensions like https://github.com/tembo-io/pgmq.

    • Also, pgmq can run as a TLE (trusted language extension), so you can install it into cloud hosted Postgres solutions like Supabase. We're using pgmq, and it's solid so far.

  • I am building an SQS compatible queue for exactly that reason. Use with any language or framework. https://github.com/poundifdef/smoothmq

    It is based on SQLite, but it’s written in a modular way. It would be easy to add Postgres as a backend (in fact, it might “just work” if I switch the ORM connection string.)

  • This would be so immensely useful. I’d estimate that there are so many cases where the producer is Node or Rails and the consumer is Python.

  • Qless "solves" this problem (in redis) by having all core logic written as lua and executed in redis.

    You could take a similar approach for pg: define a series of procedures that provide all the required functionality, and then language bindings are all just thin wrappers (to handle language native stuff) around calls to execute a given procedure with the correct arguments.

  • River is my go to in Golang, it’s really handy to have transactional queuing with a nice little ui.

  • A common schema is one nice thing, but imho the win of these db backed queues is being able to do things, including enqueue background jobs in a single transaction. e.g. create user, enqueue welcome email - both get done, or not - with redid-based, this is ... not usually a thing; if you fail to do one, it's left half done, leading to more code etc

    p.s. I maintain a ruby equivalent called QueueClassic

This looks like a great task queue, I'm a massive proponent of "Postgres is all you need" [0] and doubling down on it with my project that takes it to the extreme.

What I would love is a Postgres task queue that does multi-step pipelines, with fan out and accumulation. In my view a structured relational database is a particularly good backend for that as it inherently can model the structure. Is that something you have considered exploring?

The one thing with listen/notify that I find lacking is the max payload size of 8k, it somewhat limits its capability without having to start saving stuff to tables. What I would really like is a streaming table, with a schema and all the rich type support... maybe one day.

0: https://www.amazingcto.com/postgres-for-everything/

  • Putting low throughput queues in the same DB is great both for simplicity and for getting exactly-once-processing.

    Putting high throughput queues in Postgres sucks because...

    No O(1) guarantee to get latest job. Query planner can go haywire.

    High update tables bloat like crazy. Needs a whole new storage engine aka ZHEAP

    Write amplification as every update has to update every index

    LISTEN/NOTIFY doesn't work through connection pooling

    • Update-related throughput and index problems are only a problem if you update tables. You can use an append-only structure to mitigate some of that: insert new entries with the updated statuses instead. You gain the benefit of history also. You can even coax the index into holding non-key values for speed with INCLUDE to CREATE INDEX.

      You can then delete the older rows when needed or as required.

      Query planner issues are a general problem in postgres and is not unique to this problem. Not sure what O(1) means in this context. I am not sure pg has ever been able to promise constant-time access to anything; indeed, with an index, it'd never be asymptotically upper bounded as constant time at all?

      5 replies →

    • Indeed, that's my experience too. We used partitions like others mentioned below, but Postgres had issues with moving rows across tables atomically and had to implement our custom complex queries to overcome it. Plus job expiration was dynamic and had to use background cleaning. The bigger problem was with the planner not able to pick up sudden changes in volume and had to use a cron to run analyze on it. Managing retries with backoffs, etc.. At some point we stopped fighting it and just moved to SQS, we have zero problems since, no maintenence needed, and it's still free so we saved storage cost, time and developer effort for ongoing maintenance.

      We still use Postgres for simple queues, but those don't really require a library as it's quite simple usually, with some advisory locks we can handle the crashed job unlocking fairly well too.

  • Thanks for your insights!

    Regarding multi-step pipelines and fan-out capabilities: It's a great suggestion, and while PgQueuer doesn't currently support this, it's something I'm considering for future updates.

    As for the LISTEN/NOTIFY payload limit, PgQueuer uses these signals just to indicate changes in the queue table, avoiding the size constraint by not transmitting substantial data through this channel.

  • I've come to hate "Postgres is all you need." Or at least, "a single Postgres database is all you need."

    • I won´t call it "hate", but I've ran into quite some situations where the Postgres version caused a lot of pain.

      - When it wasn't as easy as a dedicated solution: where installing and managing a focused service is overall easier than shoehorning it into PG.

      - when it didn't perform anywhere close to a dedicated solution: overhead from the guarantees that PG makes (acid and all that) when you don't need them. Or where the relational architecture isn't suited for this type of data: e.g. hierarchical, time-series, etc.

      - when it's not as feature complete as a dedicated service: for example I am quite sure one can build (parts of) an ActiveDirectory or Kafka Bus, entirely in PG. But it will lack features that in future you'll likely need - they are built into these dedicated solutions because they are often needed after all.

  • Is multi-step (fan out, etc) typically something a queue or message bus would handle?

    I’ve always handled this with an orchestrator solution like (think Airflow and similar).

    Or is this a matter of use case? Like for a real-time scenario where you need a series of things to happen (user registration, etc) maybe a queue handling this makes sense? Whereas with longer running tasks (ETL pipelines, etc) the orchestrator is beneficial?

The Symfony framework (PHP) provides a similar feature, which also relies on LISTEN/NOTIFY and FOR UPDATE SKIP LOCKED: https://symfony.com/doc/current/messenger.html#doctrine-tran...

It also supports many other backends including AMQP, Beanstalkd, Redis and various cloud services.

This component, called Messenger, can be installed as a standalone library in any PHP project.

(Disclaimer: I’m the author of the PostgreSQL transport for Symfony Messenger).

You might also want to look at River (https://github.com/riverqueue/river) for inspiration as they support scheduled jobs, etc.

From an end-user perspective, they also have a UI which is nice to have for debugging.

  • Glancing at it briefly, I like the Workflows feature. I'm a long time Sidekiq user (Ruby), and while you can construct workflows pretty easily (especially using nested batches and callbacks in the Pro version), there really isn't a dedicated UI for visualizing them.

  • Also wanted to say I thought this problem has already been solved by River.

    Although seems like OP references a Python library rather than standalone server, so would probably be useful to Python devs.

  • I’ve been using river for some low volume stuff. I love that I can add a job to the queue in the same db transaction that handle the synchronous changes.

There is also Procrastinate: https://procrastinate.readthedocs.io/en/stable/index.html

Procrastinate also uses PostgreSQL's LISTEN/NOTIFY (but can optionally be turned off and use polling). It also supports many features (and more are planned), like sync and async jobs (it uses asyncio under the hood), periodic tasks, retries, task locks, priorities, job cancellation/aborting, Django integration (optional).

DISCLAIMER: I am a co-maintainer of Procrastinate.

  • I’m using Procrastinate in several projects. Would definitely like to see a comparison.

    What I personally love about Procrastinate is async, locks, delayed and scheduled jobs, queue specific workers (allowing to factor the backend in various ways). All this with a simple codebase and schema.

Good Job does the same for Rails

https://github.com/bensheldon/good_job

I really like the emergence of simple queuing tools for robust database management systems. Keep things simple and remove infrastructure complexity. Definitely a +1 from me!

For handling straightforward asynchronous tasks like sending opt-in emails, we've developed a similar library at All Quiet for C# and MongoDB: https://allquiet.app/open-source/mongo-queueing

In this context:

    LISTEN/NOTIFY in PostgreSQL is comparable to MongoDB's change streams.
    SELECT FOR UPDATE SKIP LOCKED in PostgreSQL can be likened to MongoDB's atomic read/update operations.

there seems to be a big hype to adapt pg into any infra. I love PG but this seems not be right thing.

  • I think for me the problem with every single new PG queue is that it seems like everyone and their mother thinks they need to reinvent this specific wheel for some reason and the flavor of the day doesn’t often bring much new to the space. Probably because it's

    1. Pretty easy to understand and grok the problem space

    2. Scratching the programmer itch of wanting something super generic that you can reuse all over the place

    3. Doable with a modest effort over a reasonable scope of time

    4. Built on rock solid internals (Postgres) with specific guarantees that you can lean on

    Here's 7 of them just right quick:

    - https://github.com/timgit/pg-boss

    - https://github.com/queueclassic/queue_classic

    - https://github.com/florentx/pgqueue

    - https://github.com/mbreit/pg_jobs

    - https://github.com/graphile/worker

    - https://github.com/pgq/pgq

    - https://github.com/que-rb/que

    Probably could easily find more by searching, I only spent about 5 minutes looking and grabbing the first ones I found.

    I'm all for doing this kind of thing as an academic exercise, because it's a great way to learn about this problem space. But at this point if you're reinventing the Postgres job queue wheel and sharing it to this technical audience you need to probably also include why your wheel is particularly interesting if you want to grab my attention.

  • At low-medium scale, this will be fine. Even at higher scale, so long as you monitor autovacuum performance on the queue table.

    At some point it may become practical to bring a dedicated queue system into the stack, sure, but this can massively simplify things when you don’t need or want the additional complexity.

    • Aside from that, the main advantage of this is transactions. I can do:

        begin;
          insert_row();
          schedule_job_for_elasticsearch();
        commit;
      

      And it's guaranteed that both the row and job for Elasticsearch update are inserted.

      If you use a dedicated queue system them this becomes a lot more tricky:

        begin;
          insert_row();
          schedule_job_for_elasticsearch();
        commit; // Can fail, and then we have a ES job but no SQL row.
      
        begin;
          insert_row();
        commit;
        schedule_job_for_elasticsearch(); // Can fail, and then we have a SQL row and no job.
      

      There are of course also situations where this doesn't apply, but this "insert row(s) in SQL and then queue job to do more with that" is a fairly common use case for queues, and in those cases this is a great choice.

      3 replies →

    • I agree, there is no need for FANG level infrastructure. Imo. in most cases, the simplicity / performance tradeoff for small/medium is worth it. There is also a statistics tooling that helps you monitor throughput and failure rats (aggregated on a per second basis)

  • Instead of SQS, I recently created a basic abstraction on PG that mimics the SQS apis. The intention was to use it during development and we would simply switch to SQS later.

    Never did. The production code still uses PG based queue (which has been improved since) and pg just works perfectly fine. Might still need to go with a dedicated queue service at some point but it has been perfectly fine so far.

  • I use it as a job queue. Yes, it has it's cons, but not dealing with another moving piece in the big picture is totally worth it.

  • I mean I love postgres like the next guy. And I like simple solutions as long as they work. I just wonder if this is truly simpler than using a redis or rabbitmq queue if you need Queues. If you're already using a cloud provider sqs is quite trivial as well.

    I guess if you already have postgres and don't want to use the cloud provider's solution. You can use this to avoid hosting another piece of infra.

    • db-based gives you the ability to query against your queues, if you use case needs it. Other options tend to dispose the state once the job is finished.