Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

144 - Add fetch records for collect when crawling. #154

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions cmd/collect/work.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,23 @@ func ensureSchemasInitialized() error {
func deserializeJSON(jsonString string) (map[string]interface{}, error) {
var jsonData map[string]interface{}

// Unmarshal the JSON data
if err := json.Unmarshal([]byte(jsonString), &jsonData); err != nil {
zap.L().Error("failed to unmarshal JSON", zap.Error(err))

return nil, fmt.Errorf("deserializeJSON: failed to unmarshal input JSON: %w", err)
}

// Pull in IsFull and hallpass
isFull, _ := jsonData["IsFull"].(bool)
hallPass, _ := jsonData["hallpass"].(bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to pull these out? Also, are they always present? I don't think they are. I would say that we need to have a case for why these are being singled out as opposed to all the other fields.

// Safely check and retrieve optional fields
isFull := false
if v, ok := jsonData["IsFull"].(bool); ok {
isFull = v
}

hallPass := false
if v, ok := jsonData["hallpass"].(bool); ok {
hallPass = v
}

zap.L().Debug("deserialized JSON attributes",
zap.Bool("isFull", isFull),
Expand All @@ -86,6 +94,9 @@ func deserializeJSON(jsonString string) (map[string]interface{}, error) {
}

func selectSchema(jsonData map[string]interface{}) (*gojsonschema.Schema, error) {
// Debug log to inspect the incoming JSON
zap.L().Debug("selectSchema received JSON", zap.Any("jsonData", jsonData))

// Extract the "data" object
data, ok := jsonData["data"].(map[string]interface{})
if !ok {
Expand Down
39 changes: 39 additions & 0 deletions cmd/fetch/work.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main
import (
"context"
_ "embed"
"encoding/json"
"fmt"
"math"
"net/url"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/GSA-TTS/jemison/internal/postgres/work_db"
"github.com/GSA-TTS/jemison/internal/queueing"
"github.com/GSA-TTS/jemison/internal/util"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
"github.com/riverqueue/river"
"go.uber.org/zap"
Expand Down Expand Up @@ -313,5 +315,42 @@ func (w *FetchWorker) Work(_ context.Context, job *river.Job[common.FetchArgs])
Path: job.Args.Path,
}

// Generate UUID
id := uuid.New().String()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to want IDs to be unique globally, but constant. That is, we need to know that the id for this is the fetch_collect_count, not that it is a unique ID. In the schemas folder, I'd consider adding a constants.go that has the names of the ids, so we can keep them consistent. E.g.

var FetchCountSchemaId = "fetch_count"

or similar, one for each schema. This way, we can also refer to these in our conditionals when we are trying to figure out what schema to apply to the data payload.


// Create data to send to the `collect` service
collectData := map[string]interface{}{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it is possible to wrap lines 322 through 341 into a common helper function? That is, these all take a map[string]any and convert it into a JSON structure that we send to ChQSHP. However, another question: should we just pass the map[string]any over the channel, and let the process at the other end do this conversion? That is, we should be able to declare RawData to be of type map[string]any, and then pass these hash tables/maps/dictionaries over the channel directly. That way, when it gets to the other end, we can do this work in one place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had two ideas in there; I suspect the better idea is to pass the map over the channel, so that RawData is a map[string]any as opposed to type string. This saves us from doing work at every place we want to send data.

"data": map[string]interface{}{
"id": id,
"source": "fetch",
"payload": "default-payload",
"url": hostAndPath(job), // Full URL being fetched
"count": fetchCount.Load(), // Total count of fetched URLs
},
}

// Marshal the data to JSON format
collectJSON, err := json.Marshal(collectData)
if err != nil {
// Wrap the error with additional context
wrappedErr := fmt.Errorf("failed to marshal collect data to JSON: %w", err)
zap.L().Error(wrappedErr.Error(), zap.Error(err))

return wrappedErr
}

// Enqueue the data to the `collect` queue
ChQSHP <- queueing.QSHP{
Queue: "collect",
Scheme: job.Args.Scheme,
Host: job.Args.Host,
Path: job.Args.Path,
RawData: string(collectJSON), // Include data for S3 logging
}

zap.L().Info("Logged URL to collect service",
zap.String("url", hostAndPath(job)),
zap.Int64("total_count", fetchCount.Load()))

return nil
}
4 changes: 3 additions & 1 deletion internal/common/schemas/fetch_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
"properties": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question I have... which may be because I'm missing something. (These comments did not happen linearly...)

Why do we have an object with a "data" member, and under it is everything? Is there a reason we ended up with this nested design? Should id, source, payload all be at the top level, and payload contains the interesting data? Otherwise, we've just nested everything one level deep unnecessarily?

"id": { "type": "string" },
"source": { "type": "string" },
"payload": { "type": "string" }
"payload": { "type": "string" },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More a question: what is the payload in this schema? If the payload is just a string that says "default-payload"... shouldn't that actually be the data?

"properties": {
  "id" ...,
  "payload": {
    "url": ...,
    "count": ...,
}

the payload is what is being carried by the data packet, no? If it isn't, then what is it for? Happy to pair if that doesn't make sense.

"url": { "type": "string" },
"count": { "type": "integer" }
},
"required": ["id", "source", "payload"]
}
Expand Down