From c1af902439db71945af712f0799700f9558b67b4 Mon Sep 17 00:00:00 2001 From: mcamou Date: Wed, 29 Jan 2025 16:34:04 +0100 Subject: [PATCH] Add the telemetry/stats job (https://github.com/masa-finance/issues/issues/216) Add initial job docs to the README Refactoring and simplification Fix compilation errors Fix failing tests Add tests Fix some tests PR comments PR comments Test fixes --- Dockerfile | 3 +- Makefile | 2 +- README.md | 83 ++++++++- cmd/tee-worker/config.go | 33 ++++ docker-compose.yml | 7 +- internal/jobs/stats/stats.go | 107 +++++++++++ internal/jobs/telemetry.go | 37 ++++ internal/jobs/twitter.go | 280 ++++++++++++++--------------- internal/jobs/twitter/common.go | 2 +- internal/jobs/twitter/followers.go | 2 +- internal/jobs/twitter/profile.go | 2 +- internal/jobs/twitter/tweets.go | 50 ------ internal/jobs/twitter_test.go | 16 +- internal/jobs/webscraper.go | 13 +- internal/jobs/webscraper_test.go | 44 ++++- internal/jobserver/jobserver.go | 34 ++-- 16 files changed, 487 insertions(+), 228 deletions(-) create mode 100644 internal/jobs/stats/stats.go create mode 100644 internal/jobs/telemetry.go delete mode 100644 internal/jobs/twitter/tweets.go diff --git a/Dockerfile b/Dockerfile index 535ea3f..bd24675 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,9 +33,10 @@ RUN useradd -m -s /bin/bash masa && mkdir -p /home/masa && chown -R masa:masa /h USER masa WORKDIR /home/masa +ENV DATA_DIR=/home/masa # Expose necessary ports EXPOSE 8080 # Set default command to start the Go application -CMD ego run /usr/bin/masa-tee-worker \ No newline at end of file +CMD ego run /usr/bin/masa-tee-worker diff --git a/Makefile b/Makefile index 8a2a526..c477981 100644 --- a/Makefile +++ b/Makefile @@ -39,4 +39,4 @@ $(TEST_COOKIE_DIR): test: tee/private.pem $(TEST_COOKIE_DIR) @docker build --target=dependencies --build-arg baseimage=builder --secret id=private_key,src=./tee/private.pem -t $(IMAGE) -f Dockerfile . - @docker run --user root -e TWITTER_TEST_ACCOUNT -v $(TEST_COOKIE_DIR):/cookies -e TEST_COOKIE_DIR=/cookies -v $(PWD)/coverage:/app/coverage --rm --workdir /app $(IMAGE) go test -coverprofile=coverage/coverage.txt -covermode=atomic -v ./... + @docker run --user root -e TWITTER_TEST_ACCOUNT -e LOG_LEVEL=debug -v $(TEST_COOKIE_DIR):/cookies -e TEST_COOKIE_DIR=/cookies -v $(PWD)/coverage:/app/coverage --rm --workdir /app $(IMAGE) go test -coverprofile=coverage/coverage.txt -covermode=atomic -v ./... diff --git a/README.md b/README.md index bab6501..24aebc7 100644 --- a/README.md +++ b/README.md @@ -36,13 +36,13 @@ docker-compose up The tee-worker exposes a simple http API to submit jobs, retrieve results, and decrypt the results. ```bash -SIG=$(curl localhost:8080/job/generate -H "Content-Type: application/json" -d '{ "type": "webscraper", "arguments": { "url": "google" } }') +SIG=$(curl localhost:8080/job/generate -H "Content-Type: application/json" -d '{ "type": "web-scraper", "arguments": { "url": "google" } }') ### Submitting jobs -curl localhost:8080/job/add -H "Content-Type: application/json" -d '{ "encrypted_job": "'$SIG'" }' +uuid=$(curl localhost:8080/job/add -H "Content-Type: application/json" -d '{ "encrypted_job": "'$SIG'" }' | jq -r .uid) ### Jobs results -result=$(curl localhost:8080/job/status/b678ff77-118d-4a7a-a6ea-190eb850c28a) +result=$(curl localhost:8080/job/status/$uuid) ### Decrypt job results curl localhost:8080/job/result -H "Content-Type: application/json" -d '{ "encrypted_result": "'$result'", "encrypted_request": "'$SIG'" }' @@ -51,7 +51,7 @@ curl localhost:8080/job/result -H "Content-Type: application/json" -d '{ "encryp ### Golang client It is available a simple golang client to interact with the API: - + ```golang import( . "github.com/masa-finance/tee-worker/pkg/client" @@ -90,3 +90,78 @@ func main() { decryptedResult, err := clientInstance.Decrypt(jobSignature, encryptedResult) } ``` + +### Job types + +The tee-worker currently supports 3 job types: + +**TODO:** Add descriptions of the return values. + +#### `web-scraper` + +Scrapes a URL down to some depth. + +**Arguments** + +* `url` (string): The URL to scrape. +* `depth` (int): How deep to go (if unset or less than 0, will be set to 1). + +#### `twitter-scraper` + +Performs different types of Twitter searches. + +**Arguments** + +* `type` (string): Type of query (see below). +* `query` (string): The query to execute. Its meaning depends on the type of query (see below) +* `count` (int): How many results to return. +* `next_cursor` (int): Cursor returned from the previous query, for pagination (for those job types that support it). + +**Job types** + +Some jobs types have both `get` and `fetch` variants. The `get` variants ignore the `next_cursor` parameter and are meant for quick retrieval of the first `count` records. If you need to get more records (paginate) you should use the `fetch` job types which give you access to a cursor. + +**Jobs that return tweets or lists of tweets** + +* `searchbyquery` - Executes a query and returns the tweets that match. The `query` parameter is a query using the [Twitter API query syntax](https://developer.x.com/en/docs/x-api/v1/tweets/search/guides/standard-operators) +* `getbyid` - Returns a tweet given its ID. The `query` parameter is the tweet ID. +* `getreplies` - Returns a list of all the replies to a given tweet. The `query` parameter is the tweet ID. +* `gettweets` / `fetchusertweets` - Returns all the tweets for a given profile. The `query` parameter is the profile to search. +* `gethometweets` / `fetchhometweets` - Returns all the tweets from a profile's home timeline. The `query` parameter is the profile to search. +* `getforyoutweets` / `fetchforyoutweets` - Returns all the tweets from a profile's "For You" timeline. The `query` parameter is the profile to search. +* `getbookmarks` / `fetchbookmarks` - Returns all of a profile's bookmarked tweets. The `query` parameter is the profile to search. + +**Jobs that return profiles or lists of profiles** + +* `getprofilebyid` / `searchbyprofile` - Returns a given user profile. The `query` parameter is the profile to search for. +* `getfollowers` / `searchfollowers` - Returns a list of profiles of the followers of a given profile. The `query` parameter is the profile to search. +* `getfollowing` - Returns all of the profiles a profile is following. The `query` parameter is the profile to search. +* `getretweeters` - Returns a list of profiles that have retweeted a given tweet. The `query` parameter is the tweet ID. + +**Jobs that return other types of data** + +* `getmedia` / `fetchusermedia` - Returns info about all the photos and videos for a given user. The `query` parameter is the profile to search. +* `gettrends`- Returns a list of all the trending topics. The `query` parameter is ignored. +* `getspace`- Returns info regarding a Twitter Space given its ID. The `query` parameter is the space ID. + +#### `telemetry` + +This job type has no parameters, and returns the current state of the worker. It returns an object with the following fields. All timestamps are given in local time, in seconds since the Unix epoch (1/1/1970 00:00:00 UTC). The counts represent the interval between the `boot_time` and the `current_time`. All the fields in the `stats` object are optional (if they are missing it means that its value is 0). + +Note that the stats are reset whenever the node is rebooted (therefore we need the `boot_time` to properly account for the stats) + +These are the fields in the response: + +* `boot_time` - Timestamp when the process started up. +* `last_operation_time` - Timestamp when the last operation happened. +* `current_time` - Current timestamp of the host. +* `stats.twitter_scrapes` - Total number of Twitter scrapes. +* `stats.twitter_returned_tweets` - Number of tweets returned to clients (this does not consider other types of data such as profiles or trending topics). +* `stats.twitter_returned_profiles` - Number of profiles returned to clients. +* `stats.twitter_returned_other` - Number of other records returned to clients (e.g. media, spaces or trending topics). +* `stats.twitter_errors` - Number of errors while scraping tweets (excluding authentication and rate-limiting). +* `stats.twitter_ratelimit_errors` - Number of Twitter rate-limiting errors. +* `stats.twitter_auth_errors` - Number of Twitter authentication errors. +* `stats.web_success` - Number of successful web scrapes. +* `stats.web_errors` - Number of web scrapes that resulted in an error. +* `stats.web_invalid` - Number of invalid web scrape requests (at the moment, blacklisted domains). diff --git a/cmd/tee-worker/config.go b/cmd/tee-worker/config.go index 141b2cc..54a046f 100644 --- a/cmd/tee-worker/config.go +++ b/cmd/tee-worker/config.go @@ -4,10 +4,12 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" "github.com/joho/godotenv" "github.com/masa-finance/tee-worker/api/types" + "github.com/sirupsen/logrus" ) func readConfig() types.JobConfiguration { @@ -48,9 +50,40 @@ func readConfig() types.JobConfiguration { jc["twitter_accounts"] = twitterAccounts } + jc["stats_buf_size"] = statsBufSize() + + logLevel := os.Getenv("LOG_LEVEL") + switch strings.ToLower(logLevel) { + case "debug": + logrus.SetLevel(logrus.DebugLevel) + case "info": + logrus.SetLevel(logrus.InfoLevel) + case "warn": + logrus.SetLevel(logrus.WarnLevel) + case "error": + logrus.SetLevel(logrus.ErrorLevel) + default: + logrus.SetLevel(logrus.InfoLevel) + } + return jc } +// statsBufSize returns the size of the stats channel buffer +func statsBufSize() uint { + bufSizeStr := os.Getenv("STATS_BUF_SIZE") + if bufSizeStr == "" { + bufSizeStr = "128" + } + + bufSize, err := strconv.Atoi(bufSizeStr) + if err != nil { + logrus.Errorf("Error parsing STATS_BUF_SIZE: %s. Setting to default.", err) + bufSize = 128 + } + return uint(bufSize) +} + func listenAddress() string { listenAddress := os.Getenv("LISTEN_ADDRESS") if listenAddress == "" { diff --git a/docker-compose.yml b/docker-compose.yml index e26719a..dee711c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: masa-tee-worker: - # network_mode: "host" + network_mode: "host" image: masaengineering/tee-worker:main # Uncomment to build from source # build: . @@ -10,8 +10,11 @@ services: LISTEN_ADDRESS: ":8080" # uncomment if not running with Intel SGX HW # OE_SIMULATION: "1" + LOG_LEVEL: "info" + volumes: + - ./.env:/home/masa/.env restart: always # uncomment if running with Intel SGX # devices: # - /dev/sgx_enclave - # - /dev/sgx_provision \ No newline at end of file + # - /dev/sgx_provision diff --git a/internal/jobs/stats/stats.go b/internal/jobs/stats/stats.go new file mode 100644 index 0000000..3b9709e --- /dev/null +++ b/internal/jobs/stats/stats.go @@ -0,0 +1,107 @@ +package stats + +import ( + "encoding/json" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// These are the types of statistics that we can add. The value is the JSON key that will be used for serialization. +type statType string + +const ( + TwitterScrapes statType = "twitter_scrapes" + TwitterTweets statType = "twitter_returned_tweets" + TwitterProfiles statType = "twitter_returned_profiles" + TwitterOther statType = "twitter_returned_other" + TwitterErrors statType = "twitter_errors" + TwitterAuthErrors statType = "twitter_auth_errors" + TwitterRateErrors statType = "twitter_ratelimit_errors" + WebSuccess statType = "web_success" + WebErrors statType = "web_errors" + WebInvalid statType = "web_invalid" + // TODO: Should we add stats for calls to each of the Twitter job types? + +) + +// allStats is a list of all the stats that we support. +// Make sure to keep this in sync with the above! +var allStats []statType = []statType{ + TwitterScrapes, + TwitterTweets, + TwitterProfiles, + TwitterOther, + TwitterErrors, + TwitterAuthErrors, + TwitterRateErrors, + WebSuccess, + WebErrors, +} + +// AddStat is the struct used in the rest of the tee-worker for sending statistics +type AddStat struct { + Type statType + Num uint +} + +// stats is the structure we use to store the statistics +type stats struct { + BootTimeUnix int64 `json:"boot_time"` + LastOperationUnix int64 `json:"last_operation_time"` + CurrentTimeUnix int64 `json:"current_time"` + Stats map[statType]uint `json:"stats"` + sync.Mutex +} + +// StatsCollector is the object used to collect statistics +type StatsCollector struct { + Stats *stats + Chan chan AddStat +} + +// StartCollector starts a goroutine that listens to a channel for AddStat messages and updates the stats accordingly. +func StartCollector(bufSize uint) *StatsCollector { + logrus.Info("Starting stats collector") + + s := stats{ + BootTimeUnix: time.Now().Unix(), + Stats: make(map[statType]uint), + } + for _, t := range allStats { + s.Stats[t] = 0 + } + + ch := make(chan AddStat, bufSize) + + go func(s *stats, ch chan AddStat) { + for { + stat := <-ch + s.Lock() + s.LastOperationUnix = time.Now().Unix() + if _, ok := s.Stats[stat.Type]; ok { + s.Stats[stat.Type] += stat.Num + } else { + s.Stats[stat.Type] = stat.Num + } + s.Unlock() + logrus.Debugf("Added %d to stat %s. Current stats: %#v", stat.Num, stat.Type, s) + } + }(&s, ch) + + return &StatsCollector{Stats: &s, Chan: ch} +} + +// Json returns the current statistics as a JSON byte array +func (s *StatsCollector) Json() ([]byte, error) { + s.Stats.Lock() + defer s.Stats.Unlock() + s.Stats.CurrentTimeUnix = time.Now().Unix() + return json.Marshal(s.Stats) +} + +// Add is a convenience method to add a number to a statistic +func (s *StatsCollector) Add(typ statType, num uint) { + s.Chan <- AddStat{Type: typ, Num: num} +} diff --git a/internal/jobs/telemetry.go b/internal/jobs/telemetry.go new file mode 100644 index 0000000..054b496 --- /dev/null +++ b/internal/jobs/telemetry.go @@ -0,0 +1,37 @@ +package jobs + +import ( + "github.com/masa-finance/tee-worker/api/types" + "github.com/masa-finance/tee-worker/internal/jobs/stats" + "github.com/sirupsen/logrus" +) + +const TelemetryJobType = "telemetry" + +type TelemetryJob struct { + collector *stats.StatsCollector +} + +func NewTelemetryJob(jc types.JobConfiguration, c *stats.StatsCollector) TelemetryJob { + return TelemetryJob{collector: c} +} + +func (t TelemetryJob) ExecuteJob(j types.Job) (types.JobResult, error) { + logrus.Debug("Executing telemetry job") + + if t.collector == nil { + return types.JobResult{Error: "No StatsCollector configured", Job: j}, nil + } + data, err := t.collector.Json() + if err != nil { + return types.JobResult{ + Error: err.Error(), + Job: j, + }, err + } + + return types.JobResult{ + Data: data, + Job: j, + }, nil +} diff --git a/internal/jobs/twitter.go b/internal/jobs/twitter.go index c8e0156..73a8e19 100644 --- a/internal/jobs/twitter.go +++ b/internal/jobs/twitter.go @@ -8,6 +8,7 @@ import ( twitterscraper "github.com/imperatrona/twitter-scraper" "github.com/masa-finance/tee-worker/api/types" + "github.com/masa-finance/tee-worker/internal/jobs/stats" "github.com/masa-finance/tee-worker/internal/jobs/twitter" "github.com/sirupsen/logrus" @@ -34,24 +35,38 @@ func parseAccounts(accountPairs []string) []*twitter.TwitterAccount { } func (ts *TwitterScraper) getAuthenticatedScraper(baseDir string) (*twitter.Scraper, *twitter.TwitterAccount, error) { + // if baseDir is empty, use the default data directory + if baseDir == "" { + baseDir = ts.configuration.DataDir + } + account := ts.accountManager.GetNextAccount() if account == nil { + ts.statsCollector.Add(stats.TwitterAuthErrors, 1) return nil, nil, fmt.Errorf("all accounts are rate-limited") } + scraper := twitter.NewScraper(account, baseDir) if scraper == nil { + ts.statsCollector.Add(stats.TwitterAuthErrors, 1) logrus.Errorf("Authentication failed for %s", account.Username) return nil, account, fmt.Errorf("twitter authentication failed for %s", account.Username) } + return scraper, account, nil } -func (ts *TwitterScraper) handleRateLimit(err error, account *twitter.TwitterAccount) bool { +// handleError handles Twitter API errors, detecting rate limits and marking accounts as rate-limited if necessary +// It returns true if the account is rate-limited, false otherwise +func (ts *TwitterScraper) handleError(err error, account *twitter.TwitterAccount) bool { if strings.Contains(err.Error(), "Rate limit exceeded") { + ts.statsCollector.Add(stats.TwitterRateErrors, 1) ts.accountManager.MarkAccountRateLimited(account) logrus.Warnf("rate limited: %s", account.Username) return true } + + ts.statsCollector.Add(stats.TwitterErrors, 1) return false } @@ -71,10 +86,11 @@ func (ts *TwitterScraper) ScrapeFollowersForProfile(baseDir string, username str return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") if errString != "" { err := fmt.Errorf("rate limited: %s", errString) - if ts.handleRateLimit(err, account) { + if ts.handleError(err, account) { return nil, err } @@ -82,6 +98,7 @@ func (ts *TwitterScraper) ScrapeFollowersForProfile(baseDir string, username str return nil, fmt.Errorf("error fetching followers: %s", errString) } + ts.statsCollector.Add(stats.TwitterProfiles, uint(len(followingResponse))) return followingResponse, nil } @@ -91,13 +108,14 @@ func (ts *TwitterScraper) ScrapeTweetsProfile(baseDir string, username string) ( return twitterscraper.Profile{}, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) profile, err := scraper.GetProfile(username) if err != nil { - if ts.handleRateLimit(err, account) { - return twitterscraper.Profile{}, err - } + _ = ts.handleError(err, account) return twitterscraper.Profile{}, err } + + ts.statsCollector.Add(stats.TwitterProfiles, 1) return profile, nil } @@ -107,155 +125,153 @@ func (ts *TwitterScraper) ScrapeTweetsByQuery(baseDir string, query string, coun return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) var tweets []*TweetResult ctx := context.Background() scraper.SetSearchMode(twitterscraper.SearchLatest) + for tweet := range scraper.SearchTweets(ctx, query, count) { if tweet.Error != nil { - if ts.handleRateLimit(tweet.Error, account) { - return nil, tweet.Error - } + _ = ts.handleError(tweet.Error, account) return nil, tweet.Error } tweets = append(tweets, &TweetResult{Tweet: &tweet.Tweet}) } + + ts.statsCollector.Add(stats.TwitterTweets, uint(len(tweets))) return tweets, nil } +func (ts *TwitterScraper) ScrapeTweetByID(baseDir string, tweetID string) (*twitterscraper.Tweet, error) { + ts.statsCollector.Add(stats.TwitterScrapes, 1) + + scraper, account, err := ts.getAuthenticatedScraper(baseDir) + if err != nil { + return nil, err + } + + tweet, err := scraper.GetTweet(tweetID) + if err != nil { + _ = ts.handleError(err, account) + return nil, err + } + + ts.statsCollector.Add(stats.TwitterTweets, 1) + return tweet, nil +} + // End of adapted code from masa-oracle (commit: bf277c646d44c49cc387bc5219c900e96b06dc02) // GetTweet retrieves a tweet by ID func (ts *TwitterScraper) GetTweet(baseDir, tweetID string) (*TweetResult, error) { - - // if baseDir is empty, use the default data directory - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) tweet, err := scraper.GetTweet(tweetID) if err != nil { - if ts.handleRateLimit(err, account) { - return nil, err - } + _ = ts.handleError(err, account) return nil, err } + + ts.statsCollector.Add(stats.TwitterTweets, 1) return &TweetResult{Tweet: tweet}, nil } // GetTweetReplies retrieves replies to a tweet func (ts *TwitterScraper) GetTweetReplies(baseDir, tweetID string, cursor string) ([]*TweetResult, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } - // handle rate limit - if ts.handleRateLimit(err, account) { - return nil, err - } - + ts.statsCollector.Add(stats.TwitterScrapes, 1) var replies []*TweetResult tweets, threadCursor, err := scraper.GetTweetReplies(tweetID, cursor) + for i, tweet := range tweets { if err != nil { - - replies = append(replies, &TweetResult{Tweet: tweet, ThreadCursor: threadCursor[i], Error: err}) + _ = ts.handleError(err, account) + return nil, err } + replies = append(replies, &TweetResult{Tweet: tweet, ThreadCursor: threadCursor[i], Error: err}) } + + ts.statsCollector.Add(stats.TwitterTweets, uint(len(replies))) return replies, nil } // GetTweetRetweeters retrieves users who retweeted a tweet func (ts *TwitterScraper) GetTweetRetweeters(baseDir, tweetID string, count int, cursor string) ([]*twitterscraper.Profile, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(ts.configuration.DataDir) if err != nil { return nil, err } - retweeters, nextCursor, err := scraper.GetTweetRetweeters(tweetID, count, cursor) - fmt.Sprintf("Next cursor: %s", nextCursor) + ts.statsCollector.Add(stats.TwitterScrapes, 1) + retweeters, _, err := scraper.GetTweetRetweeters(tweetID, count, cursor) if err != nil { - if ts.handleRateLimit(err, account) { - return nil, err - } + _ = ts.handleError(err, account) return nil, err } + + ts.statsCollector.Add(stats.TwitterProfiles, uint(len(retweeters))) return retweeters, nil } // GetUserTweets retrieves tweets from a user func (ts *TwitterScraper) GetUserTweets(baseDir, username string, count int) ([]*TweetResult, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) var tweets []*TweetResult for tweet := range scraper.GetTweets(context.Background(), username, count) { if tweet.Error != nil { - if ts.handleRateLimit(tweet.Error, account) { - return nil, tweet.Error - } + _ = ts.handleError(tweet.Error, account) return nil, tweet.Error } tweets = append(tweets, &TweetResult{Tweet: &tweet.Tweet}) } + + ts.statsCollector.Add(stats.TwitterTweets, uint(len(tweets))) return tweets, nil } // FetchUserTweets retrieves tweets from a user func (ts *TwitterScraper) FetchUserTweets(baseDir, username string, count int, cursor string) ([]*twitterscraper.Tweet, string, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, "", err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) tweets, nextCursor, err := scraper.FetchTweets(username, count, cursor) if err != nil { - if ts.handleRateLimit(err, account) { - return nil, "", err - } + _ = ts.handleError(err, account) return nil, "", err } + + ts.statsCollector.Add(stats.TwitterTweets, uint(len(tweets))) return tweets, nextCursor, nil } // GetUserMedia retrieves media tweets from a user func (ts *TwitterScraper) GetUserMedia(baseDir, username string, count int) ([]*TweetResult, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) var media []*TweetResult for tweet := range scraper.GetTweetsAndReplies(context.Background(), username, count) { if tweet.Error != nil { - if ts.handleRateLimit(tweet.Error, account) { + if ts.handleError(tweet.Error, account) { return nil, tweet.Error } continue @@ -264,47 +280,41 @@ func (ts *TwitterScraper) GetUserMedia(baseDir, username string, count int) ([]* media = append(media, &TweetResult{Tweet: &tweet.Tweet}) } } + + ts.statsCollector.Add(stats.TwitterOther, uint(len(media))) return media, nil } // FetchUserMedia retrieves media tweets from a user func (ts *TwitterScraper) FetchUserMedia(baseDir, username string, count int, cursor string) ([]*twitterscraper.Tweet, string, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, "", err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) tweets, nextCursor, err := scraper.FetchTweetsAndReplies(username, count, cursor) if err != nil { - if ts.handleRateLimit(err, account) { - return nil, "", err - } + _ = ts.handleError(err, account) return nil, "", err } + + ts.statsCollector.Add(stats.TwitterTweets, uint(len(tweets))) return tweets, nextCursor, nil } // GetBookmarks retrieves user's bookmarked tweets func (ts *TwitterScraper) GetBookmarks(baseDir string, count int) ([]*TweetResult, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) var bookmarks []*TweetResult for tweet := range scraper.GetBookmarks(context.Background(), count) { if tweet.Error != nil { - if ts.handleRateLimit(tweet.Error, account) { - return nil, tweet.Error - } + _ = ts.handleError(tweet.Error, account) return nil, tweet.Error } bookmarks = append(bookmarks, &TweetResult{Tweet: &tweet.Tweet}) @@ -312,47 +322,41 @@ func (ts *TwitterScraper) GetBookmarks(baseDir string, count int) ([]*TweetResul break } } + + ts.statsCollector.Add(stats.TwitterTweets, uint(len(bookmarks))) return bookmarks, nil } // FetchBookmarks retrieves user's bookmarked tweets func (ts *TwitterScraper) FetchBookmarks(baseDir string, count int, cursor string) ([]*twitterscraper.Tweet, string, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, "", err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) tweets, nextCursor, err := scraper.FetchBookmarks(count, cursor) if err != nil { - if ts.handleRateLimit(err, account) { - return nil, "", err - } + _ = ts.handleError(err, account) return nil, "", err } + + ts.statsCollector.Add(stats.TwitterTweets, uint(len(tweets))) return tweets, nextCursor, nil } // GetHomeTweets retrieves tweets from user's home timeline func (ts *TwitterScraper) GetHomeTweets(baseDir string, count int) ([]*TweetResult, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) var tweets []*TweetResult for tweet := range scraper.GetHomeTweets(context.Background(), count) { if tweet.Error != nil { - if ts.handleRateLimit(tweet.Error, account) { - return nil, tweet.Error - } + _ = ts.handleError(tweet.Error, account) return nil, tweet.Error } tweets = append(tweets, &TweetResult{Tweet: &tweet.Tweet}) @@ -360,96 +364,87 @@ func (ts *TwitterScraper) GetHomeTweets(baseDir string, count int) ([]*TweetResu break } } + + ts.statsCollector.Add(stats.TwitterOther, uint(len(tweets))) return tweets, nil } // FetchHomeTweets retrieves tweets from user's home timeline func (ts *TwitterScraper) FetchHomeTweets(baseDir string, count int, cursor string) ([]*twitterscraper.Tweet, string, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, "", err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) tweets, nextCursor, err := scraper.FetchHomeTweets(count, cursor) if err != nil { - if ts.handleRateLimit(err, account) { - return nil, "", err - } + _ = ts.handleError(err, account) return nil, "", err } + + ts.statsCollector.Add(stats.TwitterTweets, uint(len(tweets))) return tweets, nextCursor, nil } // GetForYouTweets retrieves tweets from For You timeline func (ts *TwitterScraper) GetForYouTweets(baseDir string, count int) ([]*TweetResult, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) var tweets []*TweetResult for tweet := range scraper.GetForYouTweets(context.Background(), count) { if tweet.Error != nil { - if ts.handleRateLimit(tweet.Error, account) { - return nil, tweet.Error - } + _ = ts.handleError(tweet.Error, account) return nil, tweet.Error } + tweets = append(tweets, &TweetResult{Tweet: &tweet.Tweet}) if len(tweets) >= count { break } } + + ts.statsCollector.Add(stats.TwitterTweets, uint(len(tweets))) return tweets, nil } // FetchForYouTweets retrieves tweets from For You timeline func (ts *TwitterScraper) FetchForYouTweets(baseDir string, count int, cursor string) ([]*twitterscraper.Tweet, string, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, "", err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) tweets, nextCursor, err := scraper.FetchForYouTweets(count, cursor) if err != nil { - if ts.handleRateLimit(err, account) { - return nil, "", err - } + _ = ts.handleError(err, account) return nil, "", err } + + ts.statsCollector.Add(stats.TwitterTweets, uint(len(tweets))) return tweets, nextCursor, nil } // GetProfileByID retrieves a user profile by ID func (ts *TwitterScraper) GetProfileByID(baseDir, userID string) (*twitterscraper.Profile, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) profile, err := scraper.GetProfileByID(userID) if err != nil { - if ts.handleRateLimit(err, account) { - return nil, err - } + _ = ts.handleError(err, account) return nil, err } + + ts.statsCollector.Add(stats.TwitterProfiles, 1) return &profile, nil } @@ -460,97 +455,93 @@ func (ts *TwitterScraper) SearchProfile(query string, count int) ([]*twitterscra return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) var profiles []*twitterscraper.ProfileResult for profile := range scraper.SearchProfiles(context.Background(), query, count) { if len(profiles) >= count { break } + profiles = append(profiles, profile) } + + ts.statsCollector.Add(stats.TwitterProfiles, uint(len(profiles))) return profiles, nil } // GetTrends retrieves current trending topics func (ts *TwitterScraper) GetTrends(baseDir string) ([]string, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } + + ts.statsCollector.Add(stats.TwitterScrapes, 1) trends, err := scraper.GetTrends() if err != nil { - if ts.handleRateLimit(err, account) { - return nil, err - } + _ = ts.handleError(err, account) return nil, err } + + // TODO: Should this be the number of topics, or 1 ? + ts.statsCollector.Add(stats.TwitterOther, uint(len(trends))) return trends, nil } // GetFollowers retrieves users that follow a user func (ts *TwitterScraper) GetFollowers(baseDir, user string, count int, cursor string) ([]*twitterscraper.Profile, string, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } - // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, "", err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) followers, nextCursor, err := scraper.FetchFollowers(user, count, cursor) if err != nil { - if ts.handleRateLimit(err, account) { - return nil, "", err - } + _ = ts.handleError(err, account) return nil, "", err } + + ts.statsCollector.Add(stats.TwitterProfiles, uint(len(followers))) return followers, nextCursor, nil } // GetFollowing retrieves users that a user follows func (ts *TwitterScraper) GetFollowing(baseDir, username string, count int) ([]*twitterscraper.Profile, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) following, errString, _ := scraper.FetchFollowing(username, count, "") if errString != "" { err := fmt.Errorf("error fetching following: %s", errString) - if ts.handleRateLimit(err, account) { - return nil, err - } + _ = ts.handleError(err, account) return nil, err } + + ts.statsCollector.Add(stats.TwitterProfiles, uint(len(following))) return following, nil } // GetSpace retrieves space information by ID func (ts *TwitterScraper) GetSpace(baseDir, spaceID string) (*twitterscraper.Space, error) { - if baseDir == "" { - baseDir = ts.configuration.DataDir - } // get the authenticated scraper scraper, account, err := ts.getAuthenticatedScraper(baseDir) if err != nil { return nil, err } + ts.statsCollector.Add(stats.TwitterScrapes, 1) space, err := scraper.GetSpace(spaceID) if err != nil { - if ts.handleRateLimit(err, account) { - return nil, err - } + _ = ts.handleError(err, account) return nil, err } + + ts.statsCollector.Add(stats.TwitterOther, 1) return space, nil } @@ -559,6 +550,7 @@ const TwitterScraperType = "twitter-scraper" type TwitterScraper struct { configuration TwitterScraperConfiguration accountManager *twitter.TwitterAccountManager + statsCollector *stats.StatsCollector } type TwitterScraperConfiguration struct { @@ -573,14 +565,18 @@ type TwitterScraperArgs struct { NextCursor string `json:"next_cursor"` } -func NewTwitterScraper(jc types.JobConfiguration) *TwitterScraper { +func NewTwitterScraper(jc types.JobConfiguration, c *stats.StatsCollector) *TwitterScraper { config := TwitterScraperConfiguration{} jc.Unmarshal(&config) accounts := parseAccounts(config.Accounts) accountManager := twitter.NewTwitterAccountManager(accounts) - return &TwitterScraper{configuration: config, accountManager: accountManager} + return &TwitterScraper{ + configuration: config, + accountManager: accountManager, + statsCollector: c, + } } func (ws *TwitterScraper) ExecuteJob(j types.Job) (types.JobResult, error) { @@ -619,7 +615,7 @@ func (ws *TwitterScraper) ExecuteJob(j types.Job) (types.JobResult, error) { }, err case "getbyid": - tweet, err := twitter.ScrapeTweetByID(ws.configuration.DataDir, args.Query) + tweet, err := ws.ScrapeTweetByID(ws.configuration.DataDir, args.Query) if err != nil { return types.JobResult{Error: err.Error()}, err } @@ -808,8 +804,6 @@ func (ws *TwitterScraper) ExecuteJob(j types.Job) (types.JobResult, error) { }, err } - // Do the web scraping here - // For now, just return the URL return types.JobResult{ Error: "invalid search type", }, fmt.Errorf("invalid search type") diff --git a/internal/jobs/twitter/common.go b/internal/jobs/twitter/common.go index 8a3ee55..d9e8cb0 100644 --- a/internal/jobs/twitter/common.go +++ b/internal/jobs/twitter/common.go @@ -63,7 +63,7 @@ func getAuthenticatedScraper(baseDir string) (*Scraper, *TwitterAccount, error) return scraper, account, nil } -func handleRateLimit(err error, account *TwitterAccount) bool { +func handleError(err error, account *TwitterAccount) bool { if strings.Contains(err.Error(), "Rate limit exceeded") { accountManager.MarkAccountRateLimited(account) logrus.Warnf("rate limited: %s", account.Username) diff --git a/internal/jobs/twitter/followers.go b/internal/jobs/twitter/followers.go index b0e1e2b..0423164 100644 --- a/internal/jobs/twitter/followers.go +++ b/internal/jobs/twitter/followers.go @@ -17,7 +17,7 @@ func ScrapeFollowersForProfile(baseDir string, username string, count int) ([]*t followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") if errString != "" { err := fmt.Errorf("rate limited: %s", errString) - if handleRateLimit(err, account) { + if handleError(err, account) { return nil, err } diff --git a/internal/jobs/twitter/profile.go b/internal/jobs/twitter/profile.go index ba8328c..ea75e6a 100644 --- a/internal/jobs/twitter/profile.go +++ b/internal/jobs/twitter/profile.go @@ -12,7 +12,7 @@ func ScrapeTweetsProfile(baseDir string, username string) (twitterscraper.Profil profile, err := scraper.GetProfile(username) if err != nil { - if handleRateLimit(err, account) { + if handleError(err, account) { return twitterscraper.Profile{}, err } return twitterscraper.Profile{}, err diff --git a/internal/jobs/twitter/tweets.go b/internal/jobs/twitter/tweets.go deleted file mode 100644 index d1a45f8..0000000 --- a/internal/jobs/twitter/tweets.go +++ /dev/null @@ -1,50 +0,0 @@ -package twitter - -import ( - "context" - - twitterscraper "github.com/imperatrona/twitter-scraper" -) - -type TweetResult struct { - Tweet *twitterscraper.Tweet - Error error -} - -func ScrapeTweetsByQuery(baseDir string, query string, count int) ([]*TweetResult, error) { - scraper, account, err := getAuthenticatedScraper(baseDir) - if err != nil { - return nil, err - } - - var tweets []*TweetResult - ctx := context.Background() - scraper.SetSearchMode(twitterscraper.SearchLatest) - for tweet := range scraper.SearchTweets(ctx, query, count) { - if tweet.Error != nil { - if handleRateLimit(tweet.Error, account) { - return nil, tweet.Error - } - return nil, tweet.Error - } - tweets = append(tweets, &TweetResult{Tweet: &tweet.Tweet}) - } - return tweets, nil -} - -func ScrapeTweetByID(baseDir string, tweetID string) (*twitterscraper.Tweet, error) { - scraper, account, err := getAuthenticatedScraper(baseDir) - if err != nil { - return nil, err - } - - tweet, err := scraper.GetTweet(tweetID) - if err != nil { - if handleRateLimit(err, account) { - return nil, err - } - return nil, err - } - - return tweet, nil -} diff --git a/internal/jobs/twitter_test.go b/internal/jobs/twitter_test.go index d4e2064..9ef9e44 100644 --- a/internal/jobs/twitter_test.go +++ b/internal/jobs/twitter_test.go @@ -9,11 +9,13 @@ import ( twitterscraper "github.com/imperatrona/twitter-scraper" "github.com/masa-finance/tee-worker/api/types" . "github.com/masa-finance/tee-worker/internal/jobs" + "github.com/masa-finance/tee-worker/internal/jobs/stats" ) var _ = Describe("Twitter Scraper", func() { var twitterScraper *TwitterScraper + var statsCollector *stats.StatsCollector var tempDir string var err error @@ -32,10 +34,12 @@ var _ = Describe("Twitter Scraper", func() { Skip("TWITTER_TEST_ACCOUNT is not set") } + statsCollector = stats.StartCollector(128) + twitterScraper = NewTwitterScraper(types.JobConfiguration{ "twitter_accounts": []string{account}, "data_dir": tempDir, - }) + }, statsCollector) }) AfterEach(func() { @@ -57,9 +61,11 @@ var _ = Describe("Twitter Scraper", func() { var results []*TweetResult err = res.Unmarshal(&results) Expect(err).NotTo(HaveOccurred()) - Expect(len(results)).ToNot(BeZero()) + Expect(results).ToNot(BeEmpty()) Expect(results[0].Tweet.Text).ToNot(BeEmpty()) + Expect(statsCollector.Stats.Stats[stats.TwitterScrapes]).To(BeNumerically("==", 1)) + Expect(statsCollector.Stats.Stats[stats.TwitterTweets]).To(BeNumerically("==", uint(len(results)))) }) It("should scrape a profile", func() { @@ -80,6 +86,9 @@ var _ = Describe("Twitter Scraper", func() { Expect(len(results)).ToNot(BeZero()) Expect(results[0].Website).To(ContainSubstring("nasa.gov")) + + Expect(statsCollector.Stats.Stats[stats.TwitterScrapes]).To(BeNumerically("==", 0)) + Expect(statsCollector.Stats.Stats[stats.TwitterProfiles]).To(BeNumerically("==", uint(len(results)))) }) It("should scrape tweets with a search query", func() { @@ -99,6 +108,9 @@ var _ = Describe("Twitter Scraper", func() { Expect(err).NotTo(HaveOccurred()) Expect(len(results)).ToNot(BeZero()) Expect(results[0].Username).ToNot(BeEmpty()) + + Expect(statsCollector.Stats.Stats[stats.TwitterScrapes]).To(BeNumerically("==", 1)) + Expect(statsCollector.Stats.Stats[stats.TwitterProfiles]).To(BeNumerically("==", uint(len(results)))) }) It("should get tweet by ID", func() { diff --git a/internal/jobs/webscraper.go b/internal/jobs/webscraper.go index 1abfc97..9b7ce8b 100644 --- a/internal/jobs/webscraper.go +++ b/internal/jobs/webscraper.go @@ -11,6 +11,7 @@ import ( "github.com/cenkalti/backoff" "github.com/gocolly/colly" "github.com/masa-finance/tee-worker/api/types" + "github.com/masa-finance/tee-worker/internal/jobs/stats" "github.com/sirupsen/logrus" ) @@ -18,6 +19,7 @@ const WebScraperType = "web-scraper" type WebScraper struct { configuration WebScraperConfiguration + stats *stats.StatsCollector } type WebScraperConfiguration struct { @@ -29,10 +31,13 @@ type WebScraperArgs struct { Depth int `json:"depth"` } -func NewWebScraper(jc types.JobConfiguration) *WebScraper { +func NewWebScraper(jc types.JobConfiguration, statsCollector *stats.StatsCollector) *WebScraper { config := WebScraperConfiguration{} jc.Unmarshal(&config) - return &WebScraper{configuration: config} + return &WebScraper{ + configuration: config, + stats: statsCollector, + } } func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { @@ -41,6 +46,8 @@ func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { for _, u := range ws.configuration.Blacklist { if strings.Contains(args.URL, u) { + ws.stats.Add(stats.WebInvalid, 1) + logrus.Errorf("Blacklisted URL: %s", args.URL) return types.JobResult{ Error: fmt.Sprintf("URL blacklisted: %s", args.URL), }, nil @@ -49,11 +56,13 @@ func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { result, err := scrapeWeb([]string{args.URL}, args.Depth) if err != nil { + ws.stats.Add(stats.WebErrors, 1) return types.JobResult{Error: err.Error()}, err } // Do the web scraping here // For now, just return the URL + ws.stats.Add(stats.WebSuccess, 1) return types.JobResult{ Data: result, }, nil diff --git a/internal/jobs/webscraper_test.go b/internal/jobs/webscraper_test.go index f6f849a..4d00ef1 100644 --- a/internal/jobs/webscraper_test.go +++ b/internal/jobs/webscraper_test.go @@ -1,16 +1,25 @@ package jobs_test import ( + "time" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/masa-finance/tee-worker/api/types" . "github.com/masa-finance/tee-worker/internal/jobs" + "github.com/masa-finance/tee-worker/internal/jobs/stats" ) +var statsCollector *stats.StatsCollector + var _ = Describe("Webscraper", func() { + BeforeEach(func() { + statsCollector = stats.StartCollector(128) + }) + It("should scrape now", func() { - webScraper := NewWebScraper(types.JobConfiguration{}) + webScraper := NewWebScraper(types.JobConfiguration{}, statsCollector) res, err := webScraper.ExecuteJob(types.Job{ Type: WebScraperType, @@ -25,11 +34,18 @@ var _ = Describe("Webscraper", func() { res.Unmarshal(&scrapedData) Expect(err).NotTo(HaveOccurred()) - Expect(len(scrapedData.Pages)).ToNot(BeZero()) + Expect(scrapedData.Pages).ToNot(BeEmpty()) + + Eventually(func() uint { + return statsCollector.Stats.Stats[stats.WebSuccess] + }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1)) + Eventually(func() uint { + return statsCollector.Stats.Stats[stats.WebErrors] + }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) }) It("does not return data with invalid hosts", func() { - webScraper := NewWebScraper(types.JobConfiguration{}) + webScraper := NewWebScraper(types.JobConfiguration{}, statsCollector) res, err := webScraper.ExecuteJob(types.Job{ Type: WebScraperType, @@ -44,13 +60,22 @@ var _ = Describe("Webscraper", func() { res.Unmarshal(&scrapedData) Expect(err).NotTo(HaveOccurred()) - Expect(len(scrapedData.Pages)).To(BeZero()) + Expect(scrapedData.Pages).To(BeEmpty()) + Eventually(func() uint { + return statsCollector.Stats.Stats[stats.WebSuccess] + }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1)) + Eventually(func() uint { + return statsCollector.Stats.Stats[stats.WebErrors] + }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) + Eventually(func() uint { + return statsCollector.Stats.Stats[stats.WebInvalid] + }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) }) It("should allow to blacklist urls", func() { webScraper := NewWebScraper(types.JobConfiguration{ "webscraper_blacklist": []string{"google"}, - }) + }, statsCollector) res, err := webScraper.ExecuteJob(types.Job{ Type: WebScraperType, @@ -60,5 +85,14 @@ var _ = Describe("Webscraper", func() { }) Expect(err).ToNot(HaveOccurred()) Expect(res.Error).To(Equal("URL blacklisted: google")) + Eventually(func() uint { + return statsCollector.Stats.Stats[stats.WebSuccess] + }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) + Eventually(func() uint { + return statsCollector.Stats.Stats[stats.WebErrors] + }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 0)) + Eventually(func() uint { + return statsCollector.Stats.Stats[stats.WebInvalid] + }, 5*time.Second, 10*time.Millisecond).Should(BeNumerically("==", 1)) }) }) diff --git a/internal/jobserver/jobserver.go b/internal/jobserver/jobserver.go index a9dd6a6..807af58 100644 --- a/internal/jobserver/jobserver.go +++ b/internal/jobserver/jobserver.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/masa-finance/tee-worker/api/types" "github.com/masa-finance/tee-worker/internal/jobs" + "github.com/masa-finance/tee-worker/internal/jobs/stats" ) type JobServer struct { @@ -27,23 +28,26 @@ type jobWorkerEntry struct { } func NewJobServer(workers int, jc types.JobConfiguration) *JobServer { - if workers == 0 { - workers++ + if workers <= 0 { + workers = 1 } - jobworkers := make(map[string]*jobWorkerEntry) - - for _, t := range []string{jobs.WebScraperType, jobs.TwitterScraperType} { - switch t { - case jobs.WebScraperType: - jobworkers[jobs.WebScraperType] = &jobWorkerEntry{ - w: jobs.NewWebScraper(jc), - } - case jobs.TwitterScraperType: - jobworkers[jobs.TwitterScraperType] = &jobWorkerEntry{ - w: jobs.NewTwitterScraper(jc), - } - } + bufSize, ok := jc["stats_buf_size"].(uint) + if !ok { + bufSize = 128 + } + s := stats.StartCollector(bufSize) + + jobworkers := map[string]*jobWorkerEntry{ + jobs.WebScraperType: { + w: jobs.NewWebScraper(jc, s), + }, + jobs.TwitterScraperType: { + w: jobs.NewTwitterScraper(jc, s), + }, + jobs.TelemetryJobType: { + w: jobs.NewTelemetryJob(jc, s), + }, } return &JobServer{