diff --git a/streampipes-client-go/streampipes/data_lake_dashboard_api.go b/streampipes-client-go/streampipes/data_lake_dashboard_api.go index 5c08c29fdb..728e5f7f74 100644 --- a/streampipes-client-go/streampipes/data_lake_dashboard_api.go +++ b/streampipes-client-go/streampipes/data_lake_dashboard_api.go @@ -43,7 +43,7 @@ func (d *DataLakeDashboard) GetSingleDataLakeDashboard(dashboardId string) (data endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard", []string{dashboardId}) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return data_lake.Dashboard{}, err } @@ -73,7 +73,7 @@ func (d *DataLakeDashboard) GetAllDataLakeDashboard() ([]data_lake.Dashboard, er endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -103,7 +103,7 @@ func (d *DataLakeDashboard) DeleteSingleDataLakeDashboard(dashboardId string) er endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard", []string{dashboardId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := d.executeRequest("DELETE", endPointUrl) + response, err := d.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err } diff --git a/streampipes-client-go/streampipes/data_lake_measure_api.go b/streampipes-client-go/streampipes/data_lake_measure_api.go index d4719b81b2..012e7d1030 100644 --- a/streampipes-client-go/streampipes/data_lake_measure_api.go +++ b/streampipes-client-go/streampipes/data_lake_measure_api.go @@ -43,13 +43,13 @@ func NewDataLakeMeasures(clientConfig config.StreamPipesClientConfig) *DataLakeM } } -// AllDataLakeMeasure retrieves a list of all measurements series from the Data Lake. -func (d *DataLakeMeasure) AllDataLakeMeasure() ([]data_lake.DataLakeMeasure, error) { +// GetAllDataLakeMeasure retrieves a list of all measurements series from the Data Lake. +func (d *DataLakeMeasure) GetAllDataLakeMeasure() ([]data_lake.DataLakeMeasure, error) { endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -81,7 +81,7 @@ func (d *DataLakeMeasure) DeleteDataLakeMeasurements() error { endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", nil) log.Printf("Delete data from: %s", endPointUrl) - response, err := d.executeRequest("DELETE", endPointUrl) + response, err := d.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err } @@ -102,7 +102,7 @@ func (d *DataLakeMeasure) GetSingleDataLakeMeasure(elementId string) (data_lake. endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measure", []string{elementId}) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return data_lake.DataLakeMeasure{}, err } @@ -134,7 +134,7 @@ func (d *DataLakeMeasure) DeleteSingleDataLakeMeasure(elementId string) error { endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measure", []string{elementId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := d.executeRequest("DELETE", endPointUrl) + response, err := d.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err } @@ -157,7 +157,7 @@ func (d *DataLakeMeasure) GetSingleDataSeries(measureId string) (*data_lake.Data endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", []string{measureId}) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -190,7 +190,7 @@ func (d *DataLakeMeasure) ClearDataLakeMeasureData(measureId string) error { endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", []string{measureId}) log.Printf("Clear data from: %s", endPointUrl) - response, err := d.executeRequest("DELETE", endPointUrl) + response, err := d.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err } @@ -212,7 +212,7 @@ func (d *DataLakeMeasure) DeleteDataLakeMeasure(measureId string) error { endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", []string{measureId, "drop"}) log.Printf("Delete data from: %s", endPointUrl) - response, err := d.executeRequest("DELETE", endPointUrl) + response, err := d.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err } diff --git a/streampipes-client-go/streampipes/data_lake_widget_api.go b/streampipes-client-go/streampipes/data_lake_widget_api.go index 4988408f33..f010dd77bd 100644 --- a/streampipes-client-go/streampipes/data_lake_widget_api.go +++ b/streampipes-client-go/streampipes/data_lake_widget_api.go @@ -44,7 +44,7 @@ func (d *DataLakeWidget) GetSingleDataLakeWidget(widgetId string) (data_lake.Dat endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard/widgets", []string{widgetId}) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return data_lake.DataExplorerWidgetModel{}, err } @@ -75,7 +75,7 @@ func (d *DataLakeWidget) GetAllDataLakeWidget() ([]data_lake.DataExplorerWidgetM endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard/widgets", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -105,7 +105,7 @@ func (d *DataLakeWidget) DeleteSingleDataLakeWidget(widgetId string) error { endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard/widgets", []string{widgetId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := d.executeRequest("DELETE", endPointUrl) + response, err := d.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err } diff --git a/streampipes-client-go/streampipes/endpoint.go b/streampipes-client-go/streampipes/endpoint.go index 41384b7e91..5fccd5f3a5 100644 --- a/streampipes-client-go/streampipes/endpoint.go +++ b/streampipes-client-go/streampipes/endpoint.go @@ -18,20 +18,25 @@ package streampipes import ( - "errors" - "net/http" - + "bytes" + "fmt" "github.com/apache/streampipes/streampipes-client-go/streampipes/config" headers "github.com/apache/streampipes/streampipes-client-go/streampipes/internal/http_headers" + "io" + "net/http" ) type endpoint struct { config config.StreamPipesClientConfig } -func (e *endpoint) executeRequest(method string, endPointUrl string) (*http.Response, error) { +func (e *endpoint) executeRequest(method string, endPointUrl string, body []byte) (*http.Response, error) { - req, err := http.NewRequest(method, endPointUrl, nil) + var reader io.Reader + if body != nil { + reader = bytes.NewReader(body) + } + req, err := http.NewRequest(method, endPointUrl, reader) if err != nil { return nil, err } @@ -53,23 +58,23 @@ func (e *endpoint) handleStatusCode(resp *http.Response) error { switch resp.StatusCode { case http.StatusUnauthorized: - return errors.New("The streamPipes Backend returned an unauthorized error.\nplease check your ApiUser and/or Apikey to be correct.") + return fmt.Errorf("response code %d:"+"The streamPipes Backend returned an unauthorized error.\nplease check your ApiUser and/or Apikey to be correct.", resp.StatusCode) case http.StatusForbidden: - return errors.New("There seems to be an issue with the access rights of the given user and the resource you queried.\n" + - "Apparently, this user is not allowed to query the resource.\n" + - "Please check the user's permissions or contact your StreamPipes admin.") + return fmt.Errorf("response code %d:"+"There seems to be an issue with the access rights of the given user and the resource you queried.\n"+ + "Apparently, this user is not allowed to query the resource.\n"+ + "Please check the user's permissions or contact your StreamPipes admin.", resp.StatusCode) case http.StatusNotFound: - return errors.New("There seems to be an issue with the Go Client calling the API inappropriately.\n" + - "This should not happen, but unfortunately did.\n" + - "If you don't mind, it would be awesome to let us know by creating an issue at https://github.com/apache/streampipes.\n") + return fmt.Errorf("response code %d:"+"There seems to be an issue with the Go Client calling the API inappropriately.\n"+ + "This should not happen, but unfortunately did.\n"+ + "If you don't mind, it would be awesome to let us know by creating an issue at https://github.com/apache/streampipes.\n", resp.StatusCode) case http.StatusMethodNotAllowed: - return errors.New("There seems to be an issue with the Go Client calling the API inappropriately.\n" + - "This should not happen, but unfortunately did.\n" + - "If you don't mind, it would be awesome to let us know by creating an issue at https://github.com/apache/streampipes.\n") + return fmt.Errorf("response code %d:"+"There seems to be an issue with the Go Client calling the API inappropriately.\n"+ + "This should not happen, but unfortunately did.\n"+ + "If you don't mind, it would be awesome to let us know by creating an issue at https://github.com/apache/streampipes.\n", resp.StatusCode) case http.StatusInternalServerError: - return errors.New("streamPipes internal error") + return fmt.Errorf("response code %d:"+"streamPipes internal error", resp.StatusCode) default: - return errors.New(resp.Status) + return fmt.Errorf(resp.Status) } } diff --git a/streampipes-client-go/streampipes/functions_api.go b/streampipes-client-go/streampipes/functions_api.go index 3b696753aa..14375f3e1e 100644 --- a/streampipes-client-go/streampipes/functions_api.go +++ b/streampipes-client-go/streampipes/functions_api.go @@ -44,7 +44,7 @@ func (f *Functions) GetFunctionLogs(functionId string) ([]functions.SpLogEntry, endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", []string{functionId, "logs"}) log.Printf("Get data from: %s", endPointUrl) - response, err := f.executeRequest("GET", endPointUrl) + response, err := f.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -75,7 +75,7 @@ func (f *Functions) GetFunctionMetrics(functionId string) (functions.SpMetricsEn endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", []string{functionId, "metrics"}) log.Printf("Get data from: %s", endPointUrl) - response, err := f.executeRequest("GET", endPointUrl) + response, err := f.executeRequest("GET", endPointUrl, nil) if err != nil { return functions.SpMetricsEntry{}, err } @@ -106,7 +106,7 @@ func (f *Functions) GetAllFunction() ([]functions.FunctionDefinition, error) { endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := f.executeRequest("GET", endPointUrl) + response, err := f.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -136,7 +136,7 @@ func (f *Functions) DeleteSingleFunction(functionId string) error { endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", []string{functionId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := f.executeRequest("DELETE", endPointUrl) + response, err := f.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err } diff --git a/streampipes-client-go/streampipes/internal/serializer/deserializer.go b/streampipes-client-go/streampipes/internal/serializer/deserializer.go index a6007b9e86..fd9bb40eb3 100644 --- a/streampipes-client-go/streampipes/internal/serializer/deserializer.go +++ b/streampipes-client-go/streampipes/internal/serializer/deserializer.go @@ -267,3 +267,65 @@ func (p *UserAccountDeserializer) Unmarshal(data []byte) (interface{}, error) { return userAccount, nil } + +type PipelineDeserializer struct{} + +func NewPipelineDeserializer() *PipelineDeserializer { + return &PipelineDeserializer{} +} + +func (p *PipelineDeserializer) Unmarshal(data []byte) (interface{}, error) { + var pipeLine pipeline.Pipeline + err := json.Unmarshal(data, &pipeLine) + if err != nil { + return nil, err + } + return pipeLine, nil + +} + +type PipelinesDeserializer struct{} + +func NewPipelinesDeserializer() *PipelinesDeserializer { + return &PipelinesDeserializer{} +} + +func (p *PipelinesDeserializer) Unmarshal(data []byte) (interface{}, error) { + var pipelines []pipeline.Pipeline + err := json.Unmarshal(data, &pipelines) + if err != nil { + return nil, err + } + return pipelines, nil + +} + +type PipelineStatusMessagesDeserializer struct{} + +func NewPipelineStatusMessagesDeserializer() *PipelineStatusMessagesDeserializer { + return &PipelineStatusMessagesDeserializer{} +} + +func (p *PipelineStatusMessagesDeserializer) Unmarshal(data []byte) (interface{}, error) { + var pipelineStatusMessage []pipeline.PipelineStatusMessage + err := json.Unmarshal(data, &pipelineStatusMessage) + if err != nil { + return nil, err + } + return pipelineStatusMessage, nil +} + +type PipelineOperationStatusDeserializer struct{} + +func NewPipelineOperationStatusDeserializer() *PipelineOperationStatusDeserializer { + return &PipelineOperationStatusDeserializer{} +} + +func (p *PipelineOperationStatusDeserializer) Unmarshal(data []byte) (interface{}, error) { + var pipelineOperationStatus pipeline.PipelineOperationStatus + err := json.Unmarshal(data, &pipelineOperationStatus) + if err != nil { + return nil, err + } + return pipelineOperationStatus, nil +} diff --git a/streampipes-client-go/streampipes/internal/serializer/serializer.go b/streampipes-client-go/streampipes/internal/serializer/serializer.go new file mode 100644 index 0000000000..24439a164b --- /dev/null +++ b/streampipes-client-go/streampipes/internal/serializer/serializer.go @@ -0,0 +1,37 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package serializer + +import ( + "encoding/json" + "github.com/apache/streampipes/streampipes-client-go/streampipes/model/pipeline" +) + +type PipelineSerializer struct{} + +func NewPipelineSerializer() PipelineSerializer { + return PipelineSerializer{} +} + +func (p PipelineSerializer) Marshal(pp pipeline.Pipeline) ([]byte, error) { + data, err := json.Marshal(pp) + if err != nil { + return nil, err + } + return data, nil +} diff --git a/streampipes-client-go/streampipes/model/common.go b/streampipes-client-go/streampipes/model/common.go index 5a853fee55..7d71666411 100644 --- a/streampipes-client-go/streampipes/model/common.go +++ b/streampipes-client-go/streampipes/model/common.go @@ -65,12 +65,91 @@ type DataSeries struct { type ResponseMessage struct { Success bool `json:"success"` - ElementName interface{} `json:"elementName"` + ElementName string `json:"elementName"` Notifications []Notification `json:"notifications"` } type Notification struct { - Title string `json:"title"` - Description string `json:"description"` - AdditionalInformation string `json:"additionalInformation"` + Title string `json:"title"` + Description interface{} `json:"description"` + AdditionalInformation string `json:"additionalInformation"` +} + +type StaticPropertyType string + +const ( + AnyStaticProperty StaticPropertyType = "AnyStaticProperty" + CodeInputStaticProperty StaticPropertyType = "CodeInputStaticProperty" + CollectionStaticProperty StaticPropertyType = "CollectionStaticProperty" + ColorPickerStaticProperty StaticPropertyType = "ColorPickerStaticProperty" + DomainStaticProperty StaticPropertyType = "DomainStaticProperty" + FreeTextStaticProperty StaticPropertyType = "FreeTextStaticProperty" + FileStaticProperty StaticPropertyType = "FileStaticProperty" + MappingPropertyUnary StaticPropertyType = "MappingPropertyUnary" + MappingPropertyNary StaticPropertyType = "MappingPropertyNary" + MatchingStaticProperty StaticPropertyType = "MatchingStaticProperty" + OneOfStaticProperty StaticPropertyType = "OneOfStaticProperty" + RuntimeResolvableAnyStaticProperty StaticPropertyType = "RuntimeResolvableAnyStaticProperty" + RuntimeResolvableGroupStaticProperty StaticPropertyType = "RuntimeResolvableGroupStaticProperty" + RuntimeResolvableOneOfStaticProperty StaticPropertyType = "RuntimeResolvableOneOfStaticProperty" + RuntimeResolvableTreeInputStaticProperty StaticPropertyType = "RuntimeResolvableTreeInputStaticProperty" + StaticPropertyGroup StaticPropertyType = "StaticPropertyGroup" + StaticPropertyAlternatives StaticPropertyType = "StaticPropertyAlternatives" + StaticPropertyAlternative StaticPropertyType = "StaticPropertyAlternative" + SecretStaticProperty StaticPropertyType = "SecretStaticProperty" + SlideToggleStaticProperty StaticPropertyType = "SlideToggleStaticProperty" +) + +type StaticProperty struct { + Optional bool `json:"optional,omitempty"` + StaticPropertyType StaticPropertyType `json:"staticPropertyType"` + Index int32 `json:"index"` + Label string `json:"label"` + Description string `json:"description"` + InternalName string `json:"internalName"` + Predefined bool `json:"predefined"` + Class string `json:"@class"` +} + +type SpDataStream struct { + ElementId string `json:"elementId"` + Dom string `json:"dom"` + ConnectedTo []string `json:"connectedTo"` + Name string `json:"name"` + Description string `json:"description"` + IconUrl string `json:"iconUrl"` + AppId string `json:"appId"` + IncludesAssets bool `json:"includesAssets"` + IncludesLocales bool `json:"includesLocales"` + IncludedAssets []string `json:"includedAssets"` + IncludedLocales []string `json:"includedLocales"` + InternallyManaged bool `json:"internallyManaged"` + EventGrounding EventGrounding `json:"eventGrounding"` + EventSchema EventSchema `json:"eventSchema"` + Category []string `json:"category"` + Index int32 `json:"index"` + CorrespondingAdapterId string `json:"correspondingAdapterId"` + Rev string `json:"_rev"` +} + +type EventGrounding struct { + TransportProtocols []TransportProtocol `json:"transportProtocols"` + TransportFormats []TransportFormat `json:"transportFormats"` +} + +type TransportProtocol struct { + ElementId string `json:"elementId"` + BrokerHostname string `json:"brokerHostname"` + TopicDefinition TopicDefinition `json:"topicDefinition"` + Class string `json:"@class,omitempty"` + Port int `json:"port"` +} + +type TopicDefinition struct { + ActualTopicName string `json:"actualTopicName"` + Class string `json:"@class"` +} + +type TransportFormat struct { + RdfType []string `json:"rdfType"` } diff --git a/streampipes-client-go/streampipes/model/pipeline/pipeline.go b/streampipes-client-go/streampipes/model/pipeline/pipeline.go new file mode 100644 index 0000000000..0586be659d --- /dev/null +++ b/streampipes-client-go/streampipes/model/pipeline/pipeline.go @@ -0,0 +1,163 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package pipeline + +import ( + "github.com/apache/streampipes/streampipes-client-go/streampipes/model" +) + +type PipelineHealthStatus string + +const ( + OK PipelineHealthStatus = "OK" + REQUIRES_ATTENTION PipelineHealthStatus = "REQUIRES_ATTENTION" + FAILURE PipelineHealthStatus = "FAILURE" +) + +type Pipeline struct { + Sepas []DataProcessorInvocation //`json:"sepas"` + Streams []model.SpDataStream //`json:"streams"` + Name string //`json:"name"` + Description string //`json:"description,omitempty"` + Actions []DataSinkInvocation //`json:"actions"` + Running bool //`json:"running"` + RestartOnSystemReboot bool //`json:"restartOnSystemReboot"` + Valid bool //`json:"valid"` + StartedAt int64 //`json:"startedAt,omitempty"` + CreatedAt int64 //`json:"createdAt"` + PublicElement bool //`json:"publicElement"` + CreatedByUser string //`json:"createdByUser"` + PipelineCategories []string //`json:"pipelineCategories"` + PipelineNotifications []string //`json:"pipelineNotifications"` + HealthStatus PipelineHealthStatus //`json:"healthStatus"` + ID string //`json:"_id,omitempty"` + Rev string //`json:"_rev,omitempty"` +} + +type DataProcessorInvocation struct { + ElementId string `json:"elementId"` + Dom string `json:"dom"` + ConnectedTo []string `json:"connectedTo"` + Name string `json:"name"` + Description string `json:"description"` + IconUrl string `json:"iconUrl"` + AppId string `json:"appId"` + IncludesAssets bool `json:"includesAssets"` + IncludesLocales bool `json:"includesLocales"` + IncludedAssets []string `json:"includedAssets"` + IncludedLocales []string `json:"includedLocales"` + InternallyManaged bool `json:"internallyManaged"` + Version int32 `json:"version"` + InputStreams []model.SpDataStream `json:"inputStreams"` + StaticProperties []model.StaticProperty `json:"staticProperties"` + BelongsTo string `json:"belongsTo"` + StatusInfoSettings ElementStatusInfoSettings `json:"statusInfoSettings"` + SupportedGrounding model.EventGrounding `json:"supportedGrounding"` + CorrespondingPipeline string `json:"correspondingPipeline"` + CorresponddingUser string `json:"correspondingUser"` + StreamRequirements []model.SpDataStream `json:"streamRequirements"` + Configured bool `json:"configured"` + Uncompleted bool `json:"uncompleted"` + SelectedEndpointUrl string `json:"selectedEndpointUrl"` + OutputStream model.SpDataStream `json:"outputStream"` + OutputStrategies []OutputStrategy `json:"outputStrategies"` + PathName string `json:"pathName"` + Category []string `json:"category"` + Rev string `json:"_rev"` +} + +type ElementStatusInfoSettings struct { + ElementIdentifier string `json:"elementIdentifier"` + KafkaHost string `json:"kafkaHost"` + KafkaPort int32 `json:"kafkaPort"` + ErrorTopic string `json:"errorTopic"` + StatsTopic string `json:"statsTopic"` +} + +type OutputStrategy struct { + Name string `json:"name"` + RenameRules []PropertyRenameRule `json:"renameRules"` + Class string `json:"class,omitempty"` +} + +type PropertyRenameRule struct { + RuntimeID string `json:"runtimeId"` + NewRuntimeName string `json:"newRuntimeName"` +} + +type DataSinkInvocation struct { + ElementId string `json:"elementId"` + Dom string `json:"dom"` + ConnectedTo []string `json:"connectedTo"` + Name string `json:"name"` + Description string `json:"description"` + IconUrl string `json:"iconUrl"` + AppId string `json:"appId"` + IncludesAssets bool `json:"includesAssets"` + IncludesLocales bool `json:"includesLocales"` + IncludedAssets []string `json:"includedAssets"` + IncludedLocales []string `json:"includedLocales"` + InternallyManaged bool `json:"internallyManaged"` + Version int32 `json:"version"` + InputStreams []model.SpDataStream `json:"inputStreams"` + StaticProperties []model.StaticProperty `json:"staticProperties"` + BelongsTo string `json:"belongsTo"` + StatusInfoSettings ElementStatusInfoSettings `json:"statusInfoSettings"` + SupportedGrounding model.EventGrounding `json:"supportedGrounding"` + CorrespondingPipeline string `json:"correspondingPipeline"` + CorrespondingUser string `json:"correspondingUser"` + StreamRequirements []model.SpDataStream `json:"streamRequirements"` + Configured bool `json:"configured"` + Uncompleted bool `json:"uncompleted"` + SelectedEndpointUrl string `json:"selectedEndpointUrl"` + Category []string `json:"category"` + Rev string `json:"_rev"` +} + +type PipelineElementStatus struct { + ElementID string `json:"elementId"` + ElementName string `json:"elementName"` + OptionalMessage string `json:"optionalMessage"` + Success bool `json:"success"` +} + +type PipelineOperationStatus struct { + PipelineId string `json:"pipelineId"` + PipelineName string `json:"pipelineName"` + Title string `json:"title"` + Success bool `json:"success"` + ElementStatus []PipelineElementStatus `json:"elementStatus"` +} + +type PipelineStatusMessage struct { + PipelineId string `json:"pipelineId"` + Timestamp int64 `json:"timestamp"` + MessageType string `json:"messageType"` + Message string `json:"message"` +} +type PipelineElementValidationLevel string + +const ( + ValidationInfo PipelineElementValidationLevel = "INFO" + ValidationError PipelineElementValidationLevel = "ERROR" +) + +type PipelineElementValidationInfo struct { + Level PipelineElementValidationLevel `json:"level"` + Message string `json:"message"` +} diff --git a/streampipes-client-go/streampipes/pipeline_api.go b/streampipes-client-go/streampipes/pipeline_api.go index ba1e49f7fe..e58ccc7096 100644 --- a/streampipes-client-go/streampipes/pipeline_api.go +++ b/streampipes-client-go/streampipes/pipeline_api.go @@ -18,10 +18,10 @@ package streampipes import ( + "fmt" "io" "log" "net/http" - "github.com/apache/streampipes/streampipes-client-go/streampipes/config" "github.com/apache/streampipes/streampipes-client-go/streampipes/internal/serializer" "github.com/apache/streampipes/streampipes-client-go/streampipes/internal/util" @@ -40,12 +40,280 @@ func NewPipeline(clientConfig config.StreamPipesClientConfig) *Pipeline { } } +// GetSinglePipeline get a specific pipeline with the given id +func (p *Pipeline) GetSinglePipeline(pipelineId string) (pipeline.Pipeline, error) { + + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId}) + log.Printf("Get data from: %s", endPointUrl) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return pipeline.Pipeline{}, err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return pipeline.Pipeline{}, err + } + } + + body, err := io.ReadAll(response.Body) + if err != nil { + return pipeline.Pipeline{}, err + } + + unmarshalData, err := serializer.NewPipelineDeserializer().Unmarshal(body) + if err != nil { + return pipeline.Pipeline{}, err + } + pipeLine := unmarshalData.(pipeline.Pipeline) + + return pipeLine, nil +} + +// DeleteSinglePipeline delete a pipeline with a given id +func (p *Pipeline) DeleteSinglePipeline(pipelineId string) error { + + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId}) + log.Printf("Delete data from: %s", endPointUrl) + + response, err := p.executeRequest("DELETE", endPointUrl, nil) + if err != nil { + return err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return err + } + } + return nil +} + +// UpdateSinglePipeline update an existing pipeline. +// If the pipeline cannot be updated successfully, it may be due to the incorrect pipeline that was passed in. +func (p *Pipeline) UpdateSinglePipeline(pp pipeline.Pipeline, pipelineId string) (model.ResponseMessage, error) { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId}) + body, err := serializer.NewPipelineSerializer().Marshal(pp) + if err != nil { + return model.ResponseMessage{}, err + } + response, err := p.executeRequest("PUT", endPointUrl, body) + if err != nil { + return model.ResponseMessage{}, err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return model.ResponseMessage{}, err + } + } + data, err := io.ReadAll(response.Body) + if err != nil { + return model.ResponseMessage{}, err + } + + unmarshalData, err := serializer.NewResponseMessageDeserializer().Unmarshal(data) + if err != nil { + return model.ResponseMessage{}, err + } + message := unmarshalData.(model.ResponseMessage) + + return message, nil +} + +// GetAllPipeline get all pipelines of the current user +func (p *Pipeline) GetAllPipeline() ([]pipeline.Pipeline, error) { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", nil) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return nil, err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return nil, err + } + } + + body, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + unmarshalData, err := serializer.NewPipelinesDeserializer().Unmarshal(body) + if err != nil { + return nil, err + } + pipelines := unmarshalData.([]pipeline.Pipeline) + + return pipelines, nil +} + +// CreatePipeline store a new pipeline +func (p *Pipeline) CreatePipeline(pp pipeline.Pipeline) (model.ResponseMessage, error) { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", nil) + + body, err := serializer.NewPipelineSerializer().Marshal(pp) + if err != nil { + return model.ResponseMessage{}, err + } + response, err := p.executeRequest("POST", endPointUrl, body) + if err != nil { + return model.ResponseMessage{}, err + } + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return model.ResponseMessage{}, err + } + } + data, err := io.ReadAll(response.Body) + if err != nil { + return model.ResponseMessage{}, err + } + + unmarshalData, err := serializer.NewResponseMessageDeserializer().Unmarshal(data) + if err != nil { + fmt.Println(err, 11) + return model.ResponseMessage{}, err + } + message := unmarshalData.(model.ResponseMessage) + + return message, nil +} + +// StopSinglePipeline stop the pipeline with the given id +func (p *Pipeline) StopSinglePipeline(pipelineId string) (pipeline.PipelineOperationStatus, error) { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId, "stop"}) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return pipeline.PipelineOperationStatus{}, err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return pipeline.PipelineOperationStatus{}, err + } + } + + body, err := io.ReadAll(response.Body) + if err != nil { + return pipeline.PipelineOperationStatus{}, err + } + + unmarshalData, err := serializer.NewPipelineOperationStatusDeserializer().Unmarshal(body) + if err != nil { + return pipeline.PipelineOperationStatus{}, err + } + status := unmarshalData.(pipeline.PipelineOperationStatus) + + return status, nil +} + +// GetSinglePipelineStatus get the pipeline status of a given pipeline +func (p *Pipeline) GetSinglePipelineStatus(pipelineId string) ([]pipeline.PipelineStatusMessage, error) { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId, "status"}) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return nil, err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return nil, err + } + } + + body, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + unmarshalData, err := serializer.NewPipelineStatusMessagesDeserializer().Unmarshal(body) + if err != nil { + return nil, err + } + status := unmarshalData.([]pipeline.PipelineStatusMessage) + + return status, nil +} + +// StartSinglePipeline start the pipeline with the given id +func (p *Pipeline) StartSinglePipeline(pipelineId string) (pipeline.PipelineOperationStatus, error) { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId, "start"}) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return pipeline.PipelineOperationStatus{}, err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return pipeline.PipelineOperationStatus{}, err + } + } + + body, err := io.ReadAll(response.Body) + if err != nil { + return pipeline.PipelineOperationStatus{}, err + } + + unmarshalData, err := serializer.NewPipelineOperationStatusDeserializer().Unmarshal(body) + if err != nil { + return pipeline.PipelineOperationStatus{}, err + } + status := unmarshalData.(pipeline.PipelineOperationStatus) + + return status, nil +} + +// GetContainsElementPipeline returns all pipelines that contain the element with the elementld +func (p *Pipeline) GetContainsElementPipeline(pipelineId string) ([]pipeline.Pipeline, error) { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines/contains", []string{pipelineId}) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return nil, err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return nil, err + } + } + + body, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + unmarshalData, err := serializer.NewPipelinesDeserializer().Unmarshal(body) + if err != nil { + return nil, err + } + pipelines := unmarshalData.([]pipeline.Pipeline) + + return pipelines, nil +} + func (p *Pipeline) GetPipelineCategory() ([]pipeline.PipelineCategory, error) { endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelinecategories", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := p.executeRequest("GET", endPointUrl) + response, err := p.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -76,7 +344,7 @@ func (p *Pipeline) DeletePipelineCategory(categoryId string) (model.ResponseMess endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelinecategories", []string{categoryId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := p.executeRequest("DELETE", endPointUrl) + response, err := p.executeRequest("DELETE", endPointUrl, nil) if err != nil { return model.ResponseMessage{}, err } diff --git a/streampipes-client-go/streampipes/streampipes_version_api.go b/streampipes-client-go/streampipes/streampipes_version_api.go index 195ecc476d..6c5b924e0e 100644 --- a/streampipes-client-go/streampipes/streampipes_version_api.go +++ b/streampipes-client-go/streampipes/streampipes_version_api.go @@ -45,7 +45,7 @@ func (d *Versions) GetStreamPipesVersion() (streampipes_version.Versions, error) endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v2/info/versions", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return streampipes_version.Versions{}, err } diff --git a/streampipes-client-go/streampipes/user_api.go b/streampipes-client-go/streampipes/user_api.go index 6499d50b7d..22e8832284 100644 --- a/streampipes-client-go/streampipes/user_api.go +++ b/streampipes-client-go/streampipes/user_api.go @@ -44,7 +44,7 @@ func (s *StreamPipesUserInfo) GetSingleStreamPipesUserAccountInfo(principalId st endPointUrl := util.NewStreamPipesApiPath(s.config.Url, "streampipes-backend/api/v2/users", []string{principalId}) log.Printf("Get data from: %s", endPointUrl) - response, err := s.executeRequest("GET", endPointUrl) + response, err := s.executeRequest("GET", endPointUrl, nil) if err != nil { return streampipes_user.UserAccount{}, err } @@ -75,7 +75,7 @@ func (s *StreamPipesUserInfo) GetAllStreamPipesShortUserInfo() ([]streampipes_us endPointUrl := util.NewStreamPipesApiPath(s.config.Url, "streampipes-backend/api/v2/users", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := s.executeRequest("GET", endPointUrl) + response, err := s.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -105,7 +105,7 @@ func (s *StreamPipesUserInfo) DeleteSingleStreamPipesShortUserInfo(principalId s endPointUrl := util.NewStreamPipesApiPath(s.config.Url, "streampipes-backend/api/v2/users", []string{principalId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := s.executeRequest("DELETE", endPointUrl) + response, err := s.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err }