diff --git a/pulsaradmin/pkg/admin/schema.go b/pulsaradmin/pkg/admin/schema.go index d97cd7adc5..7190bd9961 100644 --- a/pulsaradmin/pkg/admin/schema.go +++ b/pulsaradmin/pkg/admin/schema.go @@ -43,6 +43,22 @@ type Schema interface { // CreateSchemaByPayload creates a schema for a given topic CreateSchemaByPayload(topic string, schemaPayload utils.PostSchemaPayload) error + + // CreateSchemaBySchemaInfo creates a schema for a given topic + CreateSchemaBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) error + + // GetVersionBySchemaInfo gets the version of a schema + GetVersionBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) (int64, error) + + // GetVersionByPayload gets the version of a schema + GetVersionByPayload(topic string, schemaPayload utils.PostSchemaPayload) (int64, error) + + // TestCompatibilityWithSchemaInfo tests compatibility with a schema + TestCompatibilityWithSchemaInfo(topic string, schemaInfo utils.SchemaInfo) (*utils.IsCompatibility, error) + + // TestCompatibilityWithPostSchemaPayload tests compatibility with a schema + TestCompatibilityWithPostSchemaPayload(topic string, + schemaPayload utils.PostSchemaPayload) (*utils.IsCompatibility, error) } type schemas struct { @@ -148,3 +164,46 @@ func (s *schemas) CreateSchemaByPayload(topic string, schemaPayload utils.PostSc return s.pulsar.Client.Post(endpoint, &schemaPayload) } + +func (s *schemas) CreateSchemaBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) error { + schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo) + return s.CreateSchemaByPayload(topic, schemaPayload) +} + +func (s *schemas) GetVersionBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) (int64, error) { + schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo) + return s.GetVersionByPayload(topic, schemaPayload) +} + +func (s *schemas) GetVersionByPayload(topic string, schemaPayload utils.PostSchemaPayload) (int64, error) { + topicName, err := utils.GetTopicName(topic) + if err != nil { + return 0, err + } + version := struct { + Version int64 `json:"version"` + }{} + endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), + topicName.GetLocalName(), "version") + err = s.pulsar.Client.PostWithObj(endpoint, &schemaPayload, &version) + return version.Version, err +} + +func (s *schemas) TestCompatibilityWithSchemaInfo(topic string, + schemaInfo utils.SchemaInfo) (*utils.IsCompatibility, error) { + schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo) + return s.TestCompatibilityWithPostSchemaPayload(topic, schemaPayload) +} + +func (s *schemas) TestCompatibilityWithPostSchemaPayload(topic string, + schemaPayload utils.PostSchemaPayload) (*utils.IsCompatibility, error) { + topicName, err := utils.GetTopicName(topic) + if err != nil { + return nil, err + } + var isCompatibility utils.IsCompatibility + endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), + topicName.GetLocalName(), "compatibility") + err = s.pulsar.Client.PostWithObj(endpoint, &schemaPayload, &isCompatibility) + return &isCompatibility, err +} diff --git a/pulsaradmin/pkg/admin/schema_test.go b/pulsaradmin/pkg/admin/schema_test.go index 17c1a54dd1..3560559e60 100644 --- a/pulsaradmin/pkg/admin/schema_test.go +++ b/pulsaradmin/pkg/admin/schema_test.go @@ -77,3 +77,48 @@ func TestSchemas_ForceDeleteSchema(t *testing.T) { assert.Errorf(t, err, "Schema not found") } + +func TestSchemas_CreateSchemaBySchemaInfo(t *testing.T) { + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + schemaInfo := utils.SchemaInfo{ + Schema: []byte(""), + Type: "STRING", + } + topic := fmt.Sprintf("my-topic-%v", time.Now().Nanosecond()) + err = admin.Schemas().CreateSchemaBySchemaInfo(topic, schemaInfo) + assert.NoError(t, err) + + info, err := admin.Schemas().GetSchemaInfo(topic) + assert.NoError(t, err) + assert.Equal(t, schemaInfo.Type, info.Type) + + version, err := admin.Schemas().GetVersionBySchemaInfo(topic, schemaInfo) + assert.NoError(t, err) + assert.Equal(t, version, int64(0)) + + schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo) + version, err = admin.Schemas().GetVersionByPayload(topic, schemaPayload) + assert.NoError(t, err) + assert.Equal(t, version, int64(0)) + + compatibility, err := admin.Schemas().TestCompatibilityWithSchemaInfo(topic, schemaInfo) + assert.NoError(t, err) + assert.Equal(t, compatibility.IsCompatibility, true) + assert.Equal(t, compatibility.SchemaCompatibilityStrategy, utils.SchemaCompatibilityStrategy("FULL")) + + compatibility, err = admin.Schemas().TestCompatibilityWithPostSchemaPayload(topic, schemaPayload) + assert.NoError(t, err) + assert.Equal(t, compatibility.IsCompatibility, true) + assert.Equal(t, compatibility.SchemaCompatibilityStrategy, utils.SchemaCompatibilityStrategy("FULL")) + + err = admin.Schemas().ForceDeleteSchema(topic) + assert.NoError(t, err) + + _, err = admin.Schemas().GetSchemaInfo(topic) + assert.Errorf(t, err, "Schema not found") + +} diff --git a/pulsaradmin/pkg/utils/schema_util.go b/pulsaradmin/pkg/utils/schema_util.go index 08aaf54ac6..3b83669049 100644 --- a/pulsaradmin/pkg/utils/schema_util.go +++ b/pulsaradmin/pkg/utils/schema_util.go @@ -44,6 +44,11 @@ type GetSchemaResponse struct { Properties map[string]string `json:"properties"` } +type IsCompatibility struct { + IsCompatibility bool `json:"compatibility"` + SchemaCompatibilityStrategy SchemaCompatibilityStrategy `json:"schemaCompatibilityStrategy"` +} + func ConvertGetSchemaResponseToSchemaInfo(tn *TopicName, response GetSchemaResponse) *SchemaInfo { info := new(SchemaInfo) schema := make([]byte, 0, 10) @@ -61,6 +66,24 @@ func ConvertGetSchemaResponseToSchemaInfo(tn *TopicName, response GetSchemaRespo return info } +func ConvertSchemaDataToStringLegacy(schemaInfo SchemaInfo) string { + schema := schemaInfo.Schema + if schema == nil { + return "" + } + // TODO: KEY_VALUE + return string(schema) + +} + +func ConvertSchemaInfoToPostSchemaPayload(schemaInfo SchemaInfo) PostSchemaPayload { + return PostSchemaPayload{ + SchemaType: schemaInfo.Type, + Schema: ConvertSchemaDataToStringLegacy(schemaInfo), + Properties: schemaInfo.Properties, + } +} + func ConvertGetSchemaResponseToSchemaInfoWithVersion(tn *TopicName, response GetSchemaResponse) *SchemaInfoWithVersion { info := new(SchemaInfoWithVersion) info.SchemaInfo = ConvertGetSchemaResponseToSchemaInfo(tn, response)