diff --git a/backend/pkg/schema/client.go b/backend/pkg/schema/client.go index bd175e35a..27b0d7c46 100644 --- a/backend/pkg/schema/client.go +++ b/backend/pkg/schema/client.go @@ -16,6 +16,7 @@ import ( "fmt" "net/http" "os" + "strconv" "time" "github.com/go-resty/resty/v2" @@ -203,6 +204,25 @@ func (c *Client) GetSchemaBySubject(subject string, version string) (*SchemaVers return parsed, nil } +// GetSchemasBySubject gets all versioned schemas for a given subject. +func (c *Client) GetSchemasBySubject(subject string) ([]SchemaVersionedResponse, error) { + versionRes, err := c.GetSubjectVersions(subject) + if err != nil { + return nil, err + } + + results := []SchemaVersionedResponse{} + for _, sv := range versionRes.Versions { + sr, err := c.GetSchemaBySubject(subject, strconv.FormatInt(int64(sv), 10)) + if err != nil { + return nil, fmt.Errorf("failed to get schema by subject %s and version %d", subject, sv) + } + results = append(results, *sr) + } + + return results, nil +} + // SubjectsResponse is the schema for the GET /subjects endpoint. type SubjectsResponse struct { Subjects []string // Subject names @@ -413,7 +433,7 @@ func (c *Client) GetSchemasIndividually() ([]SchemaVersionedResponse, error) { } type chRes struct { - schemaRes *SchemaVersionedResponse + schemaRes []SchemaVersionedResponse err error } ch := make(chan chRes, len(subjectsRes.Subjects)) @@ -421,7 +441,7 @@ func (c *Client) GetSchemasIndividually() ([]SchemaVersionedResponse, error) { // Describe all subjects concurrently one by one for _, subject := range subjectsRes.Subjects { go func(s string) { - r, err := c.GetSchemaBySubject(s, "latest") + r, err := c.GetSchemasBySubject(s) ch <- chRes{ schemaRes: r, err: err, @@ -435,7 +455,7 @@ func (c *Client) GetSchemasIndividually() ([]SchemaVersionedResponse, error) { if res.err != nil { return nil, fmt.Errorf("failed to fetch at least one schema: %w", res.err) } - schemas = append(schemas, *res.schemaRes) + schemas = append(schemas, res.schemaRes...) } return schemas, nil