Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autosocial Service #1204

Merged
merged 17 commits into from
Oct 3, 2023
Merged

Autosocial Service #1204

merged 17 commits into from
Oct 3, 2023

Conversation

benny-conn
Copy link
Collaborator

@benny-conn benny-conn commented Sep 26, 2023

Changes:

  • Added 2 new cloud services, autosocial and autosocial-orchestrator
  • Added 1 new cloud task queue, autosocial for the individual requests of batched users to be processed. This task queue can be called by the orchestrator as well as the backend when a new user is created or a wallet is added
  • Added 1 new cron job, calling the orchestrator daily to process all users without socials (currently just Lens and Farcaster)

How does it work:

  1. cron job runs 1 time a day, calls orchestrator service
  2. orchestrator service iterates through every user without lens or farcaster socials, chunking them in batches of 200
  3. a cloud task for every 200 users is created
  4. cloud task calls autosocial service which will call lens and farcaster APIs and add the socials to the user

Lots of concurrency within services and also low concurrent request limit for the autosocial service itself so it spins up more services rather than handling more requests on one service. This will help with lens rate limiting because the requests will be coming from different origins 😜

Copy link
Contributor

@radazen radazen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! I like the batching approach. I left some questions/comments!

One other question: is the split between autosocial and autosocial-orchestrator solely so we can limit the number of concurrent requests to the autosocial service without affecting the batching functionality? Would tweaking the task queue dispatch parameters be a better way to handle Lens rate-limiting without requiring a separate service and a bunch of new instances?

@@ -10,162 +10,206 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: "Install build deps"
- name: 'Install build deps'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why the formatting of all these strings changed from " to '? Seems fine either way, just curious if there's a meaningful difference!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something on vscode when I save did this, I have no idea why lol but I imagine it won't make a difference.


if address, ok := socials[persist.SocialProviderLens]; ok && address.Address() != "" {

lp.Go(func(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO it could help readability if the functions for handling Lens/Farcaster here were extracted into their own separate methods that get called from this loop, instead of being defined inline. E.g.

for u, s := range in.Users {
    userID := u
    socials := s

    if address, ok := socials[persist.SocialProviderLens]; ok && address.Address() != "" {
        lp.Go(GetLensProfile(ctx))
    }

func processAllUsers(pg *pgxpool.Pool, ctc *cloudtasks.Client) gin.HandlerFunc {
return func(c *gin.Context) {

noSocials, err := pg.Query(c, `select u.id, w.address, u.pii_socials->>'Lens' is null, u.pii_socials->>'Farcaster' is null from pii.user_view u join wallets w on w.id = any(u.wallets) where u.deleted = false and w.chain = 0 and w.deleted = false and u.universal = false and (u.pii_socials->>'Lens' is null or u.pii_socials->>'Farcaster' is null) order by u.created_at desc;`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something we could use sqlc for? It's a little more effort up front, but compiling against our schema will help detect breaking changes in the future!

if _, ok := send[t.ID]; !ok {
send[t.ID] = make(map[persist.SocialProvider]persist.ChainAddress)
}
send[t.ID][t.Social] = t.Address
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for a user to have multiple addresses we'd want to check for a particular social service? If I'm reading this correctly, the map would only allow a single address to be checked per user/social combo (i.e. last tuple in wins).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, I should make this an array of addresses instead. Thank you!

rec := make(chan []idAddressTuple)
errs := make(chan error)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit of concurrency here? Is it to allow multiple Task creation calls in parallel, since those are IO bound? If so, it might be clearer and more concise to do the majority of this work synchronously and without channels, and then to kick off the Task requests with some sort of pool. I.e. just loop, group into chunks of 200, fire off an async task request, keep looping.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am not understanding the internals of row iteration but I thought as far as scaling goes, that if we had 1mil users and wanted to iterate through all of them, that it would be better to have tasks going off as we batch so that nothing is ever really stored in memory. That way we don't have to generate the batches, store them, and send them all concurrently.

tuples = append(tuples, idAddressTuple{ID: userID, Address: persist.NewChainAddress(walletAddress, persist.ChainETH), Social: persist.SocialProviderFarcaster})
}

if i%100 == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be i%200, or do we want to check for half a batch size?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be either, but you are correct for consistency might as well do 200.

@@ -430,7 +431,14 @@ func (api UserAPI) AddWalletToUser(ctx context.Context, chainAddress persist.Cha
return err
}

return nil
return task.CreateTaskForAutosocialProcessUsers(ctx, task.AutosocialProcessUsersMessage{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to return errors from task creation here. Assuming we get an error for some reason, we'll return that error to the user, even though we did add the wallet to their account. IMHO this is more of an add-on thing that we should log or report to Sentry if it fails, but not return an error as if the whole wallet creation process failed (especially since the cron job will try again in the near future!).

return userID, galleryID, err
}

err = task.CreateTaskForAutosocialProcessUsers(ctx, task.AutosocialProcessUsersMessage{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above (though I recognize that we were already returning an error here if the dispatch failed, too!). At this point, we've successfully committed a transaction and the user has been successfully created, so returning an error when a follow-up item fails seems incorrect to me -- if it happens, the user is going to be in that weird state where we tell them that account creation failed, so they go and try to create their account again, but there's already an account with their wallet registered, etc etc. The user wouldn't come back through this flow again, since their account already does exist now, so there's not much point in considering this a failure.

@@ -205,6 +209,14 @@ func CreateTaskForPostPreflight(ctx context.Context, message PostPreflightMessag
return submitTask(ctx, client, queue, url, withJSON(message), withTrace(span))
}

func CreateTaskForAutosocialProcessUsers(ctx context.Context, message AutosocialProcessUsersMessage, client *gcptasks.Client) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some sort of auth protecting this service?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is this handy dandy OIDC token that these services pass around to each other and it is verified automatically at a network level so we don't have to do anything and these services can only be accessed by our GCP services that we permit on IAM.

@benny-conn benny-conn merged commit 6a583fc into main Oct 3, 2023
6 checks passed
@benny-conn benny-conn deleted the benny/autosocial branch October 3, 2023 19:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants