Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostgreSQL Queue Executor (Distributed executor without Amazon/k8s) #78

Open
9 tasks done
gabriel-v opened this issue Aug 18, 2023 · 1 comment · May be fixed by #84
Open
9 tasks done

PostgreSQL Queue Executor (Distributed executor without Amazon/k8s) #78

gabriel-v opened this issue Aug 18, 2023 · 1 comment · May be fixed by #84

Comments

@gabriel-v
Copy link

gabriel-v commented Aug 18, 2023

This project is great - but there's no simple way to distribute work on self-hosted hardware, only Amazon this and Amazon that. There are executors that run Docker and K8s, but I'm working in an environment where that is managed elsewhere.

Then, I remembered this article / ad about postgres work queues.

The article above is missing a small detail about waiting for new work - we can use postgres's LISTEN and NOTIFY to have the workers wait with select.select(conn) instead of continuously polling for new changes.

Code:

But the scheduler doesn't just want to execute code - it also wants results back! That's why I used a second queue table to return the results.

Then I found that the redun.executor.* classes are very easy to work with - thanks for that! I did a Frankenstein between the Local Executor and the Docker Executor. The monitor thread logic is similar to the Docker executor, and everything else is cloned from the Local excutor.

Some things of interest you might want me to change:

  • all tasks are run in sub-process to guarantee clean python context for all tasks: processify(func)(...)
  • the worker loop itself is in yet another subprocess, to really restart it forever, in case of OOM: processify(_run_worker_forever)
  • there are no long-running DB connections because I want this to survive database crashes and restarts.
    • I am worried about Database connection error after completing very long task #56 and the fact that the scheduler expects the single PG connection to survive the duration of the run
    • The workers would really benefit from a long-running cursor to monitor the LISTEN/NOTIFY - otherwise we miss messages and still have to poll every few seconds
  • The executor needs to know how to unpack the message format created by the worker
  • The executor will possibly run on a different database then the redun backend, so that needs to be configured separately. The code currently doesn't use your database schema and migration systems - let me know if you want me to use that instead of running CREATE TABLE IF NOT EXISTS

I want to clean these experiments and submit a PR with the new executor. Before that happens, I think I need to clean some things up:

  • queue tables: remove timestamps - copypasted from article but not used
  • queue tables: use Binary field instead of Text field that we fill with base64 - text field came from the article, was lazy to change it
  • queue table: make new column with redun run ID, so concurrent runs on the same queue don't interfere with each other
  • fix SQL injection opportunities - all queries are using f-string instead of using c.execute("bla %s bla", (arg,))
  • refactor all the queue management stuff as methods of the PgExecutor class
  • add correct Type annotations and docstrings
  • use more long-running DB connections in workers and monitor for LISTEN, while still refreshing the connection if it died
  • put the executor code in redun/redun/executors/PgExecutor.py and write some extra tests
  • put all the options (db connection info, table names, timeouts, limits) using the Executor Options logic, so it can be configured from the ini file / config dict

After this is cleaned up and merged, I think there will be some more discussion points on this:

  • could the Promise class that the scheduler uses be implemented using PG queues too? That would let us put the redun job object on the payload, so we would not need to keep all the pending tasks in memory on the scheduler
  • If the promise info is stored on SQL, then must the scheduler still be single-threaded? We could distribute it over all the workers, removing the _monitor thread singleton alltogether - the workers could run the scheduler on a completed task immediately instead of queuing it back

Let me know what you think, and if you'd want to merge this work

@gabriel-v gabriel-v linked a pull request Sep 7, 2023 that will close this issue
6 tasks
@gabriel-v
Copy link
Author

gabriel-v commented Sep 7, 2023

got CI execution here: https://github.com/gabriel-v/redun/actions/runs/6113659282/job/16593587764?pr=3

no new tests so far

PR #84

@gabriel-v gabriel-v changed the title PostgreSQL Queue Executor (Distributed executor without Amazon) PostgreSQL Queue Executor (Distributed executor without Amazon/k8s) Sep 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging a pull request may close this issue.

1 participant