Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add moshicam post commit to kafka streamer #1530

Closed
wants to merge 2 commits into from

Conversation

jarrel-b
Copy link
Collaborator

@jarrel-b jarrel-b commented Jun 26, 2024

This PR adds dispatching messages to a separate queue for processing Moshicam owners after the offsets are committed. There isn't a convenient way to filter only Moshicam-related messages from an event alone, so the function forwards every message to the queue and lets the Moshicam task handler decide whether it should handle the event.

I'll merge once the Moshicam backend side is ready to accept the messages.

@jarrel-b jarrel-b requested a review from radazen June 26, 2024 22:01
@radazen
Copy link
Contributor

radazen commented Jun 26, 2024

There isn't a convenient way to filter only Moshicam-related messages from an event alone, so the function forwards every message to the queue and lets the Moshicam task handler decide whether it should handle the event.

Doh, right, the deployed_via_contract is available for tokens but not for owners. Hmm. I'm a little worried that it could be an absolute flood of tasks, but I'm all for turning it on and seeing how it goes. Worst case scenario, we could do something very similar to what we do with the Opensea streamer: use a bloom filter to store the addresses of contracts that are relevant to us, and update it every so often (could be as often as every few minutes, since I'd expect the query to be pretty fast!). And then we'd be able to filter relevant events at the streamer level, though it'd be additional moshi-specific logic in the streamer.

@jarrel-b
Copy link
Collaborator Author

jarrel-b commented Jun 27, 2024

There isn't a convenient way to filter only Moshicam-related messages from an event alone, so the function forwards every message to the queue and lets the Moshicam task handler decide whether it should handle the event.

Doh, right, the deployed_via_contract is available for tokens but not for owners. Hmm. I'm a little worried that it could be an absolute flood of tasks, but I'm all for turning it on and seeing how it goes. Worst case scenario, we could do something very similar to what we do with the Opensea streamer: use a bloom filter to store the addresses of contracts that are relevant to us, and update it every so often (could be as often as every few minutes, since I'd expect the query to be pretty fast!). And then we'd be able to filter relevant events at the streamer level, though it'd be additional moshi-specific logic in the streamer.

I was thinking the same thing! I'd like to avoid using bloom filters since they need updating. I noticed the token schema includes token_count and owner_count. So I'm assuming that Simplehash sends an update message whenever more tokens of an NFT is minted? Would you know offhand if that's the case?

I think the challenge is finding the new owners from the token message and ensuring we don't send duplicate notifications. My idea is that we query the owners table for owners added a little before the last_updated date of the token (in case processing of the token topic is lagging behind the owners topic) and some time after to get new owners.

@radazen
Copy link
Contributor

radazen commented Jun 27, 2024 via email

@jarrel-b jarrel-b force-pushed the jarrel/add-moshicam-queue-to-streamer branch from f3fd563 to 65e593e Compare June 27, 2024 02:41
@jarrel-b jarrel-b force-pushed the jarrel/add-moshicam-queue-to-streamer branch from 65e593e to bd94751 Compare June 27, 2024 02:50
@radazen
Copy link
Contributor

radazen commented Jun 27, 2024

Thinking about this a bit more, it kind of feels like we should skip the task queue and just poll the database. On one hand, I don't love the idea of polling, but on the other hand...if we can do it efficiently, it might be a lot more straightforward and actually put less strain on the database.

I was thinking through the logistics of the task-based model, and in my mind, it goes something like this:

  • token gets updated, we receive a task
  • we look for new owners of that token by querying the owners table for entries with a created_at timestamp somewhere from now()-1 to now()
  • if we find any new entries, we send notifications for them
  • we have a database table where we store the most recent owner.created_at timestamp for each token, and we make sure not to send notifications for any owner entry with a created_at <= that timestamp, and that's how we avoid duplicates

Complication: owners and tokens come through different topics and different partitions, so I don't think it's guaranteed that we'll catch the owner updates for a particular token during task processing. We had discussed a retry mechanism here, and that's probably what we'd need to do, but it feels a little ugly.

Consider the alternative, where we poll the database every so often (every 15 seconds? every 30? every minute?)

  • we save a single timestamp: the now() value from the last run of the polling loop
  • each loop, we run a single query: find all owner entries for Moshi tokens with a created_at timestamp between the last run of the polling loop and now()
  • we send notifications for those entries
  • no need to store timestamps on a per-token basis, since we have a single timestamp that covers all owner updates
  • no need to worry about retries, since any weirdness with tokens and owners being out of sync will get picked up in a future loop when both entries are present in the database

Of course, polling could be tricky to scale in a future where we have a ton of activity, if the query is efficient (and you'd have to imagine that searching a short window in the owners table would be), I bet it would work well in the short and medium term!

FWIW, this is how the leaderboard update process also works, though it only runs every 5 minutes. It looks for any owner updates (last_updated in this case) since its last run, and then recalculates scores for only the users who could be affected by those owner updates.

@radazen
Copy link
Contributor

radazen commented Jun 27, 2024

Though I guess there is one issue with my suggested polling approach, too: making sure only one instance of the loop is running. The leaderboard uses a Cloud Scheduler job that hits an admin endpoint every 5 minutes to trigger the update. That's safe and easy because the leaderboard update takes <1 second and there's no risk of a race condition where the job is running twice at the same time (and even if it does happen, the scoreboard updates will still be correct).

But with a notification loop, we'd really only want a single polling loop running, and if we're running it every X seconds it's hard to guarantee it'll finish before the next Cloud Scheduler job starts.

@radazen
Copy link
Contributor

radazen commented Jun 27, 2024

Actually, I suppose there would be similar concurrency issues with the created_at method for avoiding duplicates in the task-based version, and I can actually think of easy handling for it in the polling case, so I'm back to thinking polling is the more straightforward approach.

@jarrel-b jarrel-b closed this Jul 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants