Skip to content

Commit

Permalink
backend: get all schemas for protobuf compilation in order to support…
Browse files Browse the repository at this point in the history
… records of mutliple version of same schema
  • Loading branch information
bojand committed Sep 14, 2023
1 parent 337aff2 commit 1a125ae
Showing 1 changed file with 23 additions and 3 deletions.
26 changes: 23 additions & 3 deletions backend/pkg/schema/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"net/http"
"os"
"strconv"
"time"

"github.com/go-resty/resty/v2"
Expand Down Expand Up @@ -203,6 +204,25 @@ func (c *Client) GetSchemaBySubject(subject string, version string) (*SchemaVers
return parsed, nil
}

// GetSchemasBySubject gets all versioned schemas for a given subject.
func (c *Client) GetSchemasBySubject(subject string) ([]SchemaVersionedResponse, error) {
versionRes, err := c.GetSubjectVersions(subject)
if err != nil {
return nil, err
}

results := []SchemaVersionedResponse{}
for _, sv := range versionRes.Versions {
sr, err := c.GetSchemaBySubject(subject, strconv.FormatInt(int64(sv), 10))
if err != nil {
return nil, fmt.Errorf("failed to get schema by subject %s and version %d", subject, sv)
}
results = append(results, *sr)
}

return results, nil
}

// SubjectsResponse is the schema for the GET /subjects endpoint.
type SubjectsResponse struct {
Subjects []string // Subject names
Expand Down Expand Up @@ -413,15 +433,15 @@ func (c *Client) GetSchemasIndividually() ([]SchemaVersionedResponse, error) {
}

type chRes struct {
schemaRes *SchemaVersionedResponse
schemaRes []SchemaVersionedResponse
err error
}
ch := make(chan chRes, len(subjectsRes.Subjects))

// Describe all subjects concurrently one by one
for _, subject := range subjectsRes.Subjects {
go func(s string) {
r, err := c.GetSchemaBySubject(s, "latest")
r, err := c.GetSchemasBySubject(s)
ch <- chRes{
schemaRes: r,
err: err,
Expand All @@ -435,7 +455,7 @@ func (c *Client) GetSchemasIndividually() ([]SchemaVersionedResponse, error) {
if res.err != nil {
return nil, fmt.Errorf("failed to fetch at least one schema: %w", res.err)
}
schemas = append(schemas, *res.schemaRes)
schemas = append(schemas, res.schemaRes...)
}

return schemas, nil
Expand Down

0 comments on commit 1a125ae

Please sign in to comment.