Comment by solidasparagus

3 days ago

Nice work! There is a gap when it comes to writing single-machine, concurrent CPU-bound python code. Ray is too big, pykka is threads only, builtins are poorly abstracted. The syntax is also very nice!

But I'm not sure I can use this even though I have a specific use-case that feels like it would work well (high-performance pure Python downloading from cloud object storage). The examples are a bit too simple and I don't understand how I can do more complicated things.

I chunk up my work, run it in parallel and then I need to do a fan-in step to reduce my chunks - how do you do that in Pyper?

Can the processes have state? Pure functions are nice, but if I'm reaching for multiprocess, I need performance and if I need performance, I'll often want a cache of some sort (I don't want to pickle and re-instantiate a cloud client every time I download some bytes for instance).

How do exceptions work? Observability? Logs/prints?

Then there's stuff that is probably asking too much from this project, but I get it if I write my own python pipeline so it matters to me - rate limiting WIP, cancellation, progress bars.

But if some of these problems are/were solved and it offers an easy way to use multiprocessing in python, I would probably use it!

Great feedback, thank you. We'll certainly be working on adding more examples to illustrate more complex use cases.

One thing I'd mention is that we don't really imagine Pyper as a whole observability and orchestration platform. It's really a package for writing Python functions and executing them concurrently, in a flexible pattern that can be integrated with other tools.

For example, I'm personally a fan of Prefect as an observability platform-- you could define pipelines in Pyper then wrap it in a Prefect flow for orchestration logic.

Exception handling and logging can also be handled by orchestration tools (or in the business logic if appropriate, literally using try... except...)

For a simple progress bar, tqdm is probably the first thing to try. As it wraps anything iterable, applying it to a pipeline might look like:

  import time
  from pyper import task
  from tqdm import tqdm


  @task(branch=True)
  def func(limit: int):
      for i in range(limit):
          time.sleep(0.1)
          yield i


  def main():
      for _ in tqdm(func(limit=20), total=20):
          pass


  if __name__ == "__main__":
      main()

> I don't want to pickle and re-instantiate a cloud client every time I download some bytes for instance

Have you tried multiprocessing.shared_memory to address this?

  • I haven't played with that much! This isn't really a problem in general for my approach to writing this sort of code - when I use multiprocessing, I use a Process class or a worker task function with a setup step followed by a while loop that pulls from a work/control queue. But in the Pyper functional programming world, it would be a concern.

    IIRC multiprocessing.shared_memory is a much more low-level of abstraction than most python stuff, so I think I'd need to figure out how to make the client use the shared memory and I'm not sure if I could.

Do you really need to reinvent the wheel every time for parallel workloads? Just learn GNU parallel and write single-threaded code.

Concurrency in general isn't about parallelism. It's just about doing multiple things at the same time.

  • GNU Parallel is really neat, software that's so good it's boring. Closing in on being a quarter century old by now, no? I remember first reading about it in 2003 maybe?

    I've also used 'fork in Picolisp a lot for this kind of thing, and also Elixir, which arguably has much nicer pipes.

    But hey, it's good that Python after like thirty years or so is trying to get decent concurrency. Eventually people that use it as a first language might learn about such things too.