Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent, performant, reliable federation queue #3605

Merged
merged 72 commits into from
Sep 9, 2023

Conversation

phiresky
Copy link
Collaborator

@phiresky phiresky commented Jul 12, 2023

This PR implements a new outgoing federation queue. The end goal is to create a queue that can scale to reddit scale (that is 100-1000 activities per second, sent to each federated instance).

The basic idea is to change the primary division of the federation queue to be of the target instance. Federation to each instance is mostly handled separately.

The queue works as follows:

  • The main lemmy_server process with its send_lemmy_activity function only stores the sent_activity in the db (like currently), with an addition of the send targets.

  • There is a new table federation_queue_state (domain, last_successful_id, retries) that tracks the state of outgoing federation per instance

  • One or more lemmy_federate processes pick up the activities from the db and send them out. lemmy_federate works as follows:

    1. All known allow/non-blocklisted instances are read from the database every 60s

    2. A worker tokio task is started / stopped per federated instance. It does the following

      1. It fetches a list of local communities that at least one person on the remote instance is subscribe to (refreshed every 10-60 seconds)
      2. It loops through the activities table starting from last_successful_id up until the max(id). For every activity:
        1. It checks if this activity should be sent to this instance using (a) the list of communities from above and (b) a list of other inboxes stored in the activities tab
        2. It sends out the activity to the correct shared or individual inbox. If there's a retryable failure, it waits with exponential back off and tries again. I had to expose a raw non-retrying send_activity function from the activitypub-federation crate since the state of the retry has to be in the db. The activity_queue from there is not used at all.
      3. The updated federation_queue_state is stored to the database.
    3. A separate task logs the current progress of each domain once per minute. Example output:

       2023-07-12T21:43:15.642275Z  INFO lemmy_federate::worker: https://lemmy.phiresky.xyz/activities/announce/c06d2a17-4df9-43d3-b142-7a596b310d74 to https://toad.work/inbox failed: Queueing activity for retry after failure with status 502 Bad Gateway: <html><head><title>502 Bad Gateway</title></head>...
      2023-07-12T21:43:15.016121Z  INFO lemmy_federate: Federation state as of 2023-07-12T21:43:15+00:00:
      2023-07-12T21:43:15.024684Z  INFO lemmy_federate: lemmy.serverfail.party: Ok. 6 behind
      
      2023-07-12T21:43:15.025516Z  INFO lemmy_federate: toad.work: Warning. 3437 behind, 5 consecutive fails, current retry delay 320.00s
      2023-07-12T21:43:15.016220Z  INFO lemmy_federate: lemmy.byteunion.com: Ok. 0 behind
      2023-07-12T21:43:15.016228Z  INFO lemmy_federate: swiss-talk.net: Ok. 0 behind
      ...
      
    4. If a signal is received (ctrl+c, SIGINT, SIGTERM), all the workers are gracefully stopped, their most current state stored in the db.

This implementation has the following advantages:

  1. The queue is persistent and reliable - if any or all lemmy processes are stopped, the federation will continue without activity loss afterwards. If any process is killed or crashes, worst case is 100 activities or 10s of activity is resent. No losses.
  2. Low memory usage: The memory usage scales linearily with the number of instances but is constant for each instance and independent of the number of activities per second. It is also mostly uncoupled from the reliability of the other servers. No more unbounded memory use.
  3. The queue is performant - Each domain has a separate serialized queue, which means there is only ever a single request waiting for a server response per federated instance. Performance of each instance doesn't affect the others.
  4. Horizontally scalable - this outgoing federation can run on multiple separate processes (split by outgoing domain) or multiple servers - they just need access to the same PG database.
  5. No more DOSing other instances. Activities are sent as fast as the receiving end can take them or as fast as we can send them, whichever is lower.
  6. If a remote instance goes down, activities will be replayed reliably and in order from the time it goes down.
  7. It's easier to find out what the state of federation is and where bottlenecks are.

It has the following disadvantages:

  1. It assumes that every inbox in the same instance/domain has the same reliability. If the instance fails to respond to one activity, it will not receive any other activities either. (failure is decided the same as before, http 2xx and 4xx count as success, everything else as retryable). This is not relevant to lemmy instances since they use shared inbox for everything.
  2. It's optimized for large instances and has more overhead if there's very little activity.
    • Also I inverted the logic for figuring out which remote instance care about which local community. Instead of getting the remote inboxes for one activity every remote inbox has a set of communities it cares about. The reason is that this exact thing is the most expensive federation activity type so optimizing it is important. The inversion optimizes for most instances caring about many communities. This is not intrinsic to the general approach though and could be changed.
  3. There's a time delay of up to 10 seconds for outgoing activities and follower changes and up to 60 seconds for instance blacklist changes.
  4. Right now, every http request waits for the response to arrive before the next one is sent (per instance). this could be changed by adding a FuturesOrdered or similar limited to N=e.g. 10 concurrent in-flight requests and adding more complicated retry / backoff logic but right now I don't think it's necessary.
  5. There's overhead in the way I load every activity for every remote instance. Caching is used to make sure this doesn't really cause redundant DB queries, just CPU work. This is a tradeoff to allow for different remote instances to be in different positions in time.

The approach of one worker per remote instance should scale to reddit scale imo (~100 - 1000 activities per second). The details will of course need tweaking in the future when bottlenecks become clearer.

I've tested this so far only with my own very low activity instance and the basics work as expected.


Here's an example of how the federation_queue_state table looks:

domain last_successful_id fail_count last_retry
toad.work 6832351 14 2023-07-12 21:42:22.642379+00
lemmy.deltaa.xyz 6837196 0 1970-01-01 00:00:00+00
battleangels.net 6837196 0 1970-01-01 00:00:00+00
social.fbxl.net 6837196 0 1970-01-01 00:00:00+00
mastodon.coloradocrest.net 6837196 0 1970-01-01 00:00:00+00

And here's an example of how the activity table looks (for sendable activities):

id            | 6817007
data          | {"cc": ["https://lemmy.phiresky.xyz/c/localtest/followers"], "id": "https://lemmy.phiresky.xyz/activities/announce/c06d2a17-4df9-43d3-b142-7a596b310d74", "to": ["https://www.w3.org/ns/activitystreams#Public"], "type": "Announce", ...
local         | t
published     | 2023-07-12 21:36:31.749541
updated       |
ap_id         | https://lemmy.phiresky.xyz/activities/announce/c06d2a17-4df9-43d3-b142-7a596b310d74
sensitive     | f
send_targets  | {"all_instances": false, "inboxes": [], "community_followers_of": [54]}
actor_type    | community
actor_apub_id | https://lemmy.phiresky.xyz/c/localtest

@Nutomic
Copy link
Member

Nutomic commented Jul 14, 2023

The general approach looks very good. Among other things it also gives us the per-instance failure limit which I tried to implement (LemmyNet/activitypub-federation-rust#60). The main question for me is why you decided to implement this in Lemmy, and not in the federation library. It seems to make more sense to keep it encapsulated there, and only add a trait so that Lemmy can handle db storage.

The federate crate looks like it needs to run as an entirely separate process. Thats too complicated especially for small instances. Better run this logic from the lemmy_server crate, and provide a command line option to enable/disable activity sending.

cc @cetra3

@phiresky
Copy link
Collaborator Author

The main question for me is why you decided to implement this in Lemmy, and not in the federation library

Mainly for simplicity. I don't think the activitypub-federation crate should depend on postgresql specifically, and the way I fetch updates for instances and for communities is kind of tightly coupled with how lemmy stores that in the database. I can try to see how the trait would look if most of this code was in the ap crate, but it might be pretty hard to make it generic enough to actually work for other use cases (like @colatkinson wants to build here)

The federate crate looks like it needs to run as an entirely separate process. Thats too complicated especially for small instances. Better run this logic from the lemmy_server crate, and provide a command line option to enable/disable activity sending.

It would of course be possible to allow this to be run from the main process but I kind of disagree: Most admins, especially small instance admins, do the absolute minimum effort to set up an instance and then are confused if it runs badly. They don't understand postgresql tuning, configuration changes etc. Also I've seen multiple people use a script like lemmony to subscribe to every single community in existence and then are confused why their 2GB RAM server can't handle the federation traffic (I know that's just incoming traffic but still).

Also, the way it's implemented is "optimized" for running in a separate tokio pool.. Similar to the issues we have with the existing in-memory queue, this code spawns instance-count tokio tasks (e.g. 1000) which i think will dominate scheduling against the 1-100 max other / api query tasks against the if the process is at load. tokio doesn't have any task prioritization so I don't know how this could be prevented.

So IMO the default setup should be a very good and performant one - because admins don't understand or want to bother with optional tweaks. Lemmy already needs two processes (ui and server), I don't think adding a third one would increase the effort much? It can be in the same container as the other one by default.

If you're adamant about this I can make it integratable into the main process but it should definitely be an option to have it separate, one that all instance admins that federate with 100+ other instances should take.

@cetra3
Copy link
Contributor

cetra3 commented Jul 15, 2023

@phiresky I think it would be good if we had the option here. It wouldn't be too hard to spawn some sort of background tokio task in the embedded case and then have a thin wrapper in the "separate" process case.

@phiresky
Copy link
Collaborator Author

Ah, another reason is that I add signal handlers for clean shutdown, since that can take multiple seconds (up to the http timeout). So if it was in the same process that would conflict with the actix shutdown handlers (no idea if it's possible to merge those) and also cause more downtime when updating / restarting processes.

@cetra3
Copy link
Contributor

cetra3 commented Jul 15, 2023

@phiresky You can't really inject your own listener for the shutdown in actix web, however you can do the reverse: signal the actix http server to shut down in your own custom signal handler.

The way you do this is roughly:

@Nutomic
Copy link
Member

Nutomic commented Jul 17, 2023

One option might be to have the Lemmy process spawn the activity sender as a child process by default. Then its a separate process with separate tokio, but doesnt require any changes from instance admins. And both binaries can be included in the same dockerfile.

I suppose having the queue in Lemmy is fine, but I dont want to maintain two different queues. So if we use this approach, then I would get rid of the queue in the federation library, and only provide a simple method for sign + send on the current thread (no workers nor retry). Then that logic doesnt have to be reimplemented here. Later the queue could still be upstreamed.

@AppleSheeple
Copy link

Real efficiency/performance would require federation to be done using an efficient binary format like speedy.

But I understand why diverging from apub, even if only optionally, and only with Lemmy-Lemmy instance communication, is something the project may not want to support.

@cetra3
Copy link
Contributor

cetra3 commented Jul 22, 2023

@AppleSheeple honestly JSON serialization is a tiny sliver of the perf issues that relate to apub comms.

The biggest contributing factor when I've benchmarked this before is HTTP signatures. It takes up about 70% of the processing time

@AppleSheeple
Copy link

AppleSheeple commented Jul 22, 2023

@cetra3

A small difference becomes bigger at scale. And the small difference here covers all three of size, memory, and processing power needed.

And more relevantly, if the project were to be open to non-apub Lemmy-to-Lemmy federation, then the sky would be the limit. You can do batching however you want. You can even turn the architecture into a fully pull-based one. You can...

Creating a queue that can truly scale to reddit scale while sticking to apub is an unobtainable goal, was my point. The message format was just the easiest clear source of inefficiency to point out.


Lemmy can obviously still do much better than the status quo. And @phiresky's efforts are greatly appreciated, not that they need to hear it from a random githubber.

@phiresky
Copy link
Collaborator Author

phiresky commented Jul 23, 2023

Updated:

  • rebased on latest main branch (specifically the sent_activity split from Split activity table into sent and received parts (fixes #3103) #3583 )

  • added dead instance detection from Check for dead federated instances (fixes #2221) #3427 to the federation worker start / stop logic

  • added logs to show how many instances it's federating to (e.g.:

    INFO lemmy_federate: Federating to 451/490 instances (34 dead, 5 disallowed)

  • Expose a start_stop_federation_workers_cancellable() function from the lemmy_federate crate that could be called from lemmy_server to spawn the federation workers in the same process and tokio runtime as the lemmy server.

  • Added cmd-args parsing to allow starting multiple processes (e.g. on different servers altogether)

about = "A link aggregator for the fediverse",
long_about = "A link aggregator for the fediverse.\n\nThis is the Lemmy backend API server. This will connect to a PostgreSQL database, run any pending migrations and start accepting API requests."
)]
pub struct CmdArgs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should also make all the fields pub to ensure that CmdArgs can be constructed outside Lemmy.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that seems like kind of a separate decision whether to actually make this part of the public interface

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it already is part of the public interface as it is given to the start_lemmy_server function. How should someone who wants to embed lemmy call start_lemmy_server if they can't construct the CmdArgs? (and they don't want to parse them from command line args obviously).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it should also be called LemmyArgs as well, since they may not actually come from the command line.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is starting it as a library actually supported/ documented anywhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was done in this one #2618

And of course it is documented on docs.rs, https://docs.rs/lemmy_server/latest/lemmy_server/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with this. Lemmybb can run Lemmy as embedded library, although that project isnt maintained now because I dont have time. Anyway I dont see any downside to making them pub.

Copy link
Member

@dessalines dessalines left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the one thing from me, domain -> instance_id.

crates/db_schema/src/diesel_ltree.patch Outdated Show resolved Hide resolved
fetcher: () => Promise<T>,
checker: (t: T) => boolean,
retries = 10,
delaySeconds = 2,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems more stable anyway, thx.

federation_queue_state (id) {
id -> Int4,
#[max_length = 255]
domain -> Varchar,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that instance_id would be much better, for normalization purposes.

Copy link
Member

@dessalines dessalines left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, thanks.

src/lib.rs Outdated Show resolved Hide resolved
@phiresky phiresky enabled auto-merge (squash) September 9, 2023 00:20
@phiresky
Copy link
Collaborator Author

phiresky commented Sep 9, 2023

There's one more issue where if a user is deleted it is no longer found and the federation worker can error out. I'll fix that by both skipping internal errors when sending activities so the queue doesn't get stuck as well as restarting workers when they exit.

then I'll merge (when tests pass).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants