Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the stats/telemetry job, and add initial job docs to the README (https://github.com/masa-finance/issues/issues/216) #19

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ RUN useradd -m -s /bin/bash masa && mkdir -p /home/masa && chown -R masa:masa /h
USER masa

WORKDIR /home/masa
ENV DATA_DIR=/home/masa

# Expose necessary ports
EXPOSE 8080

# Set default command to start the Go application
CMD ego run /usr/bin/masa-tee-worker
CMD ego run /usr/bin/masa-tee-worker
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ $(TEST_COOKIE_DIR):

test: tee/private.pem $(TEST_COOKIE_DIR)
@docker build --target=dependencies --build-arg baseimage=builder --secret id=private_key,src=./tee/private.pem -t $(IMAGE) -f Dockerfile .
@docker run --user root -e TWITTER_TEST_ACCOUNT -v $(TEST_COOKIE_DIR):/cookies -e TEST_COOKIE_DIR=/cookies -v $(PWD)/coverage:/app/coverage --rm --workdir /app $(IMAGE) go test -coverprofile=coverage/coverage.txt -covermode=atomic -v ./...
@docker run --user root -e TWITTER_TEST_ACCOUNT -e LOG_LEVEL=debug -v $(TEST_COOKIE_DIR):/cookies -e TEST_COOKIE_DIR=/cookies -v $(PWD)/coverage:/app/coverage --rm --workdir /app $(IMAGE) go test -coverprofile=coverage/coverage.txt -covermode=atomic -v ./...
83 changes: 79 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ docker-compose up
The tee-worker exposes a simple http API to submit jobs, retrieve results, and decrypt the results.

```bash
SIG=$(curl localhost:8080/job/generate -H "Content-Type: application/json" -d '{ "type": "webscraper", "arguments": { "url": "google" } }')
SIG=$(curl localhost:8080/job/generate -H "Content-Type: application/json" -d '{ "type": "web-scraper", "arguments": { "url": "google" } }')

### Submitting jobs
curl localhost:8080/job/add -H "Content-Type: application/json" -d '{ "encrypted_job": "'$SIG'" }'
uuid=$(curl localhost:8080/job/add -H "Content-Type: application/json" -d '{ "encrypted_job": "'$SIG'" }' | jq -r .uid)

### Jobs results
result=$(curl localhost:8080/job/status/b678ff77-118d-4a7a-a6ea-190eb850c28a)
result=$(curl localhost:8080/job/status/$uuid)

### Decrypt job results
curl localhost:8080/job/result -H "Content-Type: application/json" -d '{ "encrypted_result": "'$result'", "encrypted_request": "'$SIG'" }'
Expand All @@ -51,7 +51,7 @@ curl localhost:8080/job/result -H "Content-Type: application/json" -d '{ "encryp
### Golang client

It is available a simple golang client to interact with the API:

```golang
import(
. "github.com/masa-finance/tee-worker/pkg/client"
Expand Down Expand Up @@ -90,3 +90,78 @@ func main() {
decryptedResult, err := clientInstance.Decrypt(jobSignature, encryptedResult)
}
```

### Job types

The tee-worker currently supports 3 job types:

**TODO:** Add descriptions of the return values.

#### `web-scraper`

Scrapes a URL down to some depth.

**Arguments**

* `url` (string): The URL to scrape.
* `depth` (int): How deep to go (if unset or less than 0, will be set to 1).

#### `twitter-scraper`

Performs different types of Twitter searches.

**Arguments**

* `type` (string): Type of query (see below).
* `query` (string): The query to execute. Its meaning depends on the type of query (see below)
* `count` (int): How many results to return.
* `next_cursor` (int): Cursor returned from the previous query, for pagination (for those job types that support it).

**Job types**

Some jobs types have both `get` and `fetch` variants. The `get` variants ignore the `next_cursor` parameter and are meant for quick retrieval of the first `count` records. If you need to get more records (paginate) you should use the `fetch` job types which give you access to a cursor.

**Jobs that return tweets or lists of tweets**

* `searchbyquery` - Executes a query and returns the tweets that match. The `query` parameter is a query using the [Twitter API query syntax](https://developer.x.com/en/docs/x-api/v1/tweets/search/guides/standard-operators)
* `getbyid` - Returns a tweet given its ID. The `query` parameter is the tweet ID.
* `getreplies` - Returns a list of all the replies to a given tweet. The `query` parameter is the tweet ID.
* `gettweets` / `fetchusertweets` - Returns all the tweets for a given profile. The `query` parameter is the profile to search.
* `gethometweets` / `fetchhometweets` - Returns all the tweets from a profile's home timeline. The `query` parameter is the profile to search.
* `getforyoutweets` / `fetchforyoutweets` - Returns all the tweets from a profile's "For You" timeline. The `query` parameter is the profile to search.
* `getbookmarks` / `fetchbookmarks` - Returns all of a profile's bookmarked tweets. The `query` parameter is the profile to search.

**Jobs that return profiles or lists of profiles**

* `getprofilebyid` / `searchbyprofile` - Returns a given user profile. The `query` parameter is the profile to search for.
* `getfollowers` / `searchfollowers` - Returns a list of profiles of the followers of a given profile. The `query` parameter is the profile to search.
* `getfollowing` - Returns all of the profiles a profile is following. The `query` parameter is the profile to search.
* `getretweeters` - Returns a list of profiles that have retweeted a given tweet. The `query` parameter is the tweet ID.

**Jobs that return other types of data**

* `getmedia` / `fetchusermedia` - Returns info about all the photos and videos for a given user. The `query` parameter is the profile to search.
* `gettrends`- Returns a list of all the trending topics. The `query` parameter is ignored.
* `getspace`- Returns info regarding a Twitter Space given its ID. The `query` parameter is the space ID.

#### `telemetry`

This job type has no parameters, and returns the current state of the worker. It returns an object with the following fields. All timestamps are given in local time, in seconds since the Unix epoch (1/1/1970 00:00:00 UTC). The counts represent the interval between the `boot_time` and the `current_time`. All the fields in the `stats` object are optional (if they are missing it means that its value is 0).

Note that the stats are reset whenever the node is rebooted (therefore we need the `boot_time` to properly account for the stats)

These are the fields in the response:

* `boot_time` - Timestamp when the process started up.
* `last_operation_time` - Timestamp when the last operation happened.
* `current_time` - Current timestamp of the host.
* `stats.twitter_scrapes` - Total number of Twitter scrapes.
* `stats.twitter_returned_tweets` - Number of tweets returned to clients (this does not consider other types of data such as profiles or trending topics).
* `stats.twitter_returned_profiles` - Number of profiles returned to clients.
* `stats.twitter_returned_other` - Number of other records returned to clients (e.g. media, spaces or trending topics).
* `stats.twitter_errors` - Number of errors while scraping tweets (excluding authentication and rate-limiting).
* `stats.twitter_ratelimit_errors` - Number of Twitter rate-limiting errors.
* `stats.twitter_auth_errors` - Number of Twitter authentication errors.
* `stats.web_success` - Number of successful web scrapes.
* `stats.web_errors` - Number of web scrapes that resulted in an error.
* `stats.web_invalid` - Number of invalid web scrape requests (at the moment, blacklisted domains).
33 changes: 33 additions & 0 deletions cmd/tee-worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/joho/godotenv"
"github.com/masa-finance/tee-worker/api/types"
"github.com/sirupsen/logrus"
)

func readConfig() types.JobConfiguration {
Expand Down Expand Up @@ -48,9 +50,40 @@ func readConfig() types.JobConfiguration {
jc["twitter_accounts"] = twitterAccounts
}

jc["stats_buf_size"] = statsBufSize()

logLevel := os.Getenv("LOG_LEVEL")
switch strings.ToLower(logLevel) {
case "debug":
logrus.SetLevel(logrus.DebugLevel)
case "info":
logrus.SetLevel(logrus.InfoLevel)
case "warn":
logrus.SetLevel(logrus.WarnLevel)
case "error":
logrus.SetLevel(logrus.ErrorLevel)
default:
logrus.SetLevel(logrus.InfoLevel)
}

return jc
}

// statsBufSize returns the size of the stats channel buffer
func statsBufSize() uint {
bufSizeStr := os.Getenv("STATS_BUF_SIZE")
if bufSizeStr == "" {
bufSizeStr = "128"
}

bufSize, err := strconv.Atoi(bufSizeStr)
if err != nil {
logrus.Errorf("Error parsing STATS_BUF_SIZE: %s. Setting to default.", err)
bufSize = 128
}
return uint(bufSize)
}

func listenAddress() string {
listenAddress := os.Getenv("LISTEN_ADDRESS")
if listenAddress == "" {
Expand Down
7 changes: 5 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
masa-tee-worker:
# network_mode: "host"
network_mode: "host"
image: masaengineering/tee-worker:main
# Uncomment to build from source
# build: .
Expand All @@ -10,8 +10,11 @@ services:
LISTEN_ADDRESS: ":8080"
# uncomment if not running with Intel SGX HW
# OE_SIMULATION: "1"
LOG_LEVEL: "info"
volumes:
- ./.env:/home/masa/.env
restart: always
# uncomment if running with Intel SGX
# devices:
# - /dev/sgx_enclave
# - /dev/sgx_provision
# - /dev/sgx_provision
107 changes: 107 additions & 0 deletions internal/jobs/stats/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package stats

import (
"encoding/json"
"sync"
"time"

"github.com/sirupsen/logrus"
)

// These are the types of statistics that we can add. The value is the JSON key that will be used for serialization.
type statType string

const (
TwitterScrapes statType = "twitter_scrapes"
TwitterTweets statType = "twitter_returned_tweets"
TwitterProfiles statType = "twitter_returned_profiles"
TwitterOther statType = "twitter_returned_other"
TwitterErrors statType = "twitter_errors"
TwitterAuthErrors statType = "twitter_auth_errors"
TwitterRateErrors statType = "twitter_ratelimit_errors"
WebSuccess statType = "web_success"
WebErrors statType = "web_errors"
WebInvalid statType = "web_invalid"
// TODO: Should we add stats for calls to each of the Twitter job types?

)

// allStats is a list of all the stats that we support.
// Make sure to keep this in sync with the above!
var allStats []statType = []statType{
TwitterScrapes,
TwitterTweets,
TwitterProfiles,
TwitterOther,
TwitterErrors,
TwitterAuthErrors,
TwitterRateErrors,
WebSuccess,
WebErrors,
}

// AddStat is the struct used in the rest of the tee-worker for sending statistics
type AddStat struct {
Type statType
Num uint
}

// stats is the structure we use to store the statistics
type stats struct {
mcamou marked this conversation as resolved.
Show resolved Hide resolved
BootTimeUnix int64 `json:"boot_time"`
LastOperationUnix int64 `json:"last_operation_time"`
CurrentTimeUnix int64 `json:"current_time"`
Stats map[statType]uint `json:"stats"`
sync.Mutex
}

// StatsCollector is the object used to collect statistics
type StatsCollector struct {
Stats *stats
Chan chan AddStat
}

// StartCollector starts a goroutine that listens to a channel for AddStat messages and updates the stats accordingly.
func StartCollector(bufSize uint) *StatsCollector {
logrus.Info("Starting stats collector")

s := stats{
BootTimeUnix: time.Now().Unix(),
mcamou marked this conversation as resolved.
Show resolved Hide resolved
Stats: make(map[statType]uint),
}
for _, t := range allStats {
s.Stats[t] = 0
}

ch := make(chan AddStat, bufSize)

go func(s *stats, ch chan AddStat) {
for {
stat := <-ch
s.Lock()
s.LastOperationUnix = time.Now().Unix()
if _, ok := s.Stats[stat.Type]; ok {
s.Stats[stat.Type] += stat.Num
} else {
s.Stats[stat.Type] = stat.Num
}
s.Unlock()
logrus.Debugf("Added %d to stat %s. Current stats: %#v", stat.Num, stat.Type, s)
}
}(&s, ch)

return &StatsCollector{Stats: &s, Chan: ch}
}

// Json returns the current statistics as a JSON byte array
func (s *StatsCollector) Json() ([]byte, error) {
s.Stats.Lock()
defer s.Stats.Unlock()
s.Stats.CurrentTimeUnix = time.Now().Unix()
return json.Marshal(s.Stats)
}

// Add is a convenience method to add a number to a statistic
func (s *StatsCollector) Add(typ statType, num uint) {
s.Chan <- AddStat{Type: typ, Num: num}
}
37 changes: 37 additions & 0 deletions internal/jobs/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package jobs

import (
"github.com/masa-finance/tee-worker/api/types"
"github.com/masa-finance/tee-worker/internal/jobs/stats"
"github.com/sirupsen/logrus"
)

const TelemetryJobType = "telemetry"

type TelemetryJob struct {
collector *stats.StatsCollector
}

func NewTelemetryJob(jc types.JobConfiguration, c *stats.StatsCollector) TelemetryJob {
return TelemetryJob{collector: c}
mcamou marked this conversation as resolved.
Show resolved Hide resolved
}

func (t TelemetryJob) ExecuteJob(j types.Job) (types.JobResult, error) {
logrus.Debug("Executing telemetry job")

if t.collector == nil {
return types.JobResult{Error: "No StatsCollector configured", Job: j}, nil
}
data, err := t.collector.Json()
if err != nil {
return types.JobResult{
Error: err.Error(),
Job: j,
}, err
}

return types.JobResult{
Data: data,
Job: j,
}, nil
}
Loading
Loading