From b7718a5ac62bbcb3b295705b3c5f52958d378014 Mon Sep 17 00:00:00 2001 From: wy Date: Wed, 12 Jun 2024 17:16:31 +0800 Subject: [PATCH] feat: add more support for pipeline API to Go client --- .../streampipes/data_lake_dashboard_api.go | 6 +- .../streampipes/data_lake_widget_api.go | 6 +- streampipes-client-go/streampipes/endpoint.go | 11 +- .../streampipes/functions_api.go | 8 +- .../internal/serializer/deserializer.go | 39 +++- .../internal/serializer/serializer.go | 37 ++++ .../streampipes/model/common.go | 28 +-- .../streampipes/model/pipeline/pipeline.go | 187 ++++++++++++++++ .../streampipes/pipeline_api.go | 203 +++++++++++++++++- .../streampipes/streampipes_version_api.go | 2 +- streampipes-client-go/streampipes/user_api.go | 6 +- 11 files changed, 496 insertions(+), 37 deletions(-) create mode 100644 streampipes-client-go/streampipes/internal/serializer/serializer.go create mode 100644 streampipes-client-go/streampipes/model/pipeline/pipeline.go diff --git a/streampipes-client-go/streampipes/data_lake_dashboard_api.go b/streampipes-client-go/streampipes/data_lake_dashboard_api.go index 55718c717a..a483670220 100644 --- a/streampipes-client-go/streampipes/data_lake_dashboard_api.go +++ b/streampipes-client-go/streampipes/data_lake_dashboard_api.go @@ -42,7 +42,7 @@ func (d *DataLakeDashboard) GetSingleDataLakeDashboard(dashboardId string) (data endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard", []string{dashboardId}) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return data_lake.Dashboard{}, err } @@ -72,7 +72,7 @@ func (d *DataLakeDashboard) GetAllDataLakeDashboard() ([]data_lake.Dashboard, er endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -102,7 +102,7 @@ func (d *DataLakeDashboard) DeleteSingleDataLakeDashboard(dashboardId string) er endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard", []string{dashboardId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := d.executeRequest("DELETE", endPointUrl) + response, err := d.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err } diff --git a/streampipes-client-go/streampipes/data_lake_widget_api.go b/streampipes-client-go/streampipes/data_lake_widget_api.go index 1a015f764b..32f99c7ca5 100644 --- a/streampipes-client-go/streampipes/data_lake_widget_api.go +++ b/streampipes-client-go/streampipes/data_lake_widget_api.go @@ -43,7 +43,7 @@ func (d *DataLakeWidget) GetSingleDataLakeWidget(widgetId string) (data_lake.Dat endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard/widgets", []string{widgetId}) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return data_lake.DataExplorerWidgetModel{}, err } @@ -74,7 +74,7 @@ func (d *DataLakeWidget) GetAllDataLakeWidget() ([]data_lake.DataExplorerWidgetM endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard/widgets", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -104,7 +104,7 @@ func (d *DataLakeWidget) DeleteSingleDataLakeWidget(widgetId string) error { endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard/widgets", []string{widgetId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := d.executeRequest("DELETE", endPointUrl) + response, err := d.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err } diff --git a/streampipes-client-go/streampipes/endpoint.go b/streampipes-client-go/streampipes/endpoint.go index a9247264a2..8d664fef5d 100644 --- a/streampipes-client-go/streampipes/endpoint.go +++ b/streampipes-client-go/streampipes/endpoint.go @@ -18,9 +18,11 @@ package streampipes import ( + "bytes" "errors" "github.com/apache/streampipes/streampipes-client-go/streampipes/config" headers "github.com/apache/streampipes/streampipes-client-go/streampipes/internal/http_headers" + "io" "net/http" ) @@ -28,9 +30,12 @@ type endpoint struct { config config.StreamPipesClientConfig } -func (e *endpoint) executeRequest(method string, endPointUrl string) (*http.Response, error) { - - req, err := http.NewRequest(method, endPointUrl, nil) +func (e *endpoint) executeRequest(method string, endPointUrl string, body []byte) (*http.Response, error) { + var reader io.Reader + if body != nil { + reader = bytes.NewReader(body) + } + req, err := http.NewRequest(method, endPointUrl, reader) if err != nil { return nil, err } diff --git a/streampipes-client-go/streampipes/functions_api.go b/streampipes-client-go/streampipes/functions_api.go index 9d7e904555..6345477a05 100644 --- a/streampipes-client-go/streampipes/functions_api.go +++ b/streampipes-client-go/streampipes/functions_api.go @@ -43,7 +43,7 @@ func (f *Functions) GetFunctionLogs(functionId string) ([]functions.SpLogEntry, endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", []string{functionId, "logs"}) log.Printf("Get data from: %s", endPointUrl) - response, err := f.executeRequest("GET", endPointUrl) + response, err := f.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -74,7 +74,7 @@ func (f *Functions) GetFunctionMetrics(functionId string) (functions.SpMetricsEn endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", []string{functionId, "metrics"}) log.Printf("Get data from: %s", endPointUrl) - response, err := f.executeRequest("GET", endPointUrl) + response, err := f.executeRequest("GET", endPointUrl, nil) if err != nil { return functions.SpMetricsEntry{}, err } @@ -105,7 +105,7 @@ func (f *Functions) GetAllFunction() ([]functions.FunctionDefinition, error) { endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := f.executeRequest("GET", endPointUrl) + response, err := f.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -135,7 +135,7 @@ func (f *Functions) DeleteSingleFunction(functionId string) error { endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", []string{functionId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := f.executeRequest("DELETE", endPointUrl) + response, err := f.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err } diff --git a/streampipes-client-go/streampipes/internal/serializer/deserializer.go b/streampipes-client-go/streampipes/internal/serializer/deserializer.go index f3e7cfef0d..0e599576fb 100644 --- a/streampipes-client-go/streampipes/internal/serializer/deserializer.go +++ b/streampipes-client-go/streampipes/internal/serializer/deserializer.go @@ -24,12 +24,10 @@ import ( "github.com/apache/streampipes/streampipes-client-go/streampipes/model/pipeline" - "github.com/apache/streampipes/streampipes-client-go/streampipes/model/functions" "github.com/apache/streampipes/streampipes-client-go/streampipes/model/streampipes_user" - "github.com/apache/streampipes/streampipes-client-go/streampipes/model/streampipes_version" ) @@ -101,7 +99,6 @@ func (d *StreamPipesVersionDeserializer) Unmarshal(data []byte) (interface{}, er return dataSeries, nil } - type ResponseMessageDeserializer struct{} func NewResponseMessageDeserializer() *ResponseMessageDeserializer { @@ -159,7 +156,7 @@ func (d *DataLakeDashboardsDeserializer) Unmarshal(data []byte) (interface{}, er if err != nil { return nil, err } - + return dashborad, nil } @@ -236,7 +233,7 @@ func (p *FunctionDefinitionsDeserializer) Unmarshal(data []byte) (interface{}, e return nil, err } return functionDefinitions, nil - + } type ShortUserInfosDeserializer struct{} @@ -269,3 +266,35 @@ func (p *UserAccountDeserializer) Unmarshal(data []byte) (interface{}, error) { return userAccount, nil } + +type PipelineDeserializer struct{} + +func NewPipelineDeserializer() *PipelineDeserializer { + return &PipelineDeserializer{} +} + +func (p *PipelineDeserializer) Unmarshal(data []byte) (interface{}, error) { + var pipeLine pipeline.Pipeline + err := json.Unmarshal(data, &pipeLine) + if err != nil { + return nil, err + } + return pipeLine, nil + +} + +type PipelinesDeserializer struct{} + +func NewPipelinesDeserializer() *PipelinesDeserializer { + return &PipelinesDeserializer{} +} + +func (p *PipelinesDeserializer) Unmarshal(data []byte) (interface{}, error) { + var pipeLine []pipeline.Pipeline + err := json.Unmarshal(data, &pipeLine) + if err != nil { + return nil, err + } + return pipeLine, nil + +} diff --git a/streampipes-client-go/streampipes/internal/serializer/serializer.go b/streampipes-client-go/streampipes/internal/serializer/serializer.go new file mode 100644 index 0000000000..24439a164b --- /dev/null +++ b/streampipes-client-go/streampipes/internal/serializer/serializer.go @@ -0,0 +1,37 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package serializer + +import ( + "encoding/json" + "github.com/apache/streampipes/streampipes-client-go/streampipes/model/pipeline" +) + +type PipelineSerializer struct{} + +func NewPipelineSerializer() PipelineSerializer { + return PipelineSerializer{} +} + +func (p PipelineSerializer) Marshal(pp pipeline.Pipeline) ([]byte, error) { + data, err := json.Marshal(pp) + if err != nil { + return nil, err + } + return data, nil +} diff --git a/streampipes-client-go/streampipes/model/common.go b/streampipes-client-go/streampipes/model/common.go index cb235892eb..99d1138891 100644 --- a/streampipes-client-go/streampipes/model/common.go +++ b/streampipes-client-go/streampipes/model/common.go @@ -25,20 +25,22 @@ type ValueSpecification struct { } type EventProperty struct { - ElementID string `json:"elementId"` - Label string `json:"label,omitempty"` - Description string `json:"description,omitempty"` - RuntimeName string `json:"runtimeName,omitempty"` - Required bool `json:"required,omitempty"` - DomainProperties []string `json:"domainProperties,omitempty"` - PropertyScope string `json:"propertyScope,omitempty"` - Index int `json:"index"` - RuntimeID string `json:"runtimeId,omitempty"` - RuntimeType string `json:"runtimeType"` - MeasurementUnit string `json:"measurementUnit,omitempty"` - ValueSpecification ValueSpecification `json:"valueSpecification,omitempty"` + ElementID string `json:"elementId"` + Label string `json:"label,omitempty"` + Description string `json:"description,omitempty"` + RuntimeName string `json:"runtimeName,omitempty"` + Required bool `json:"required,omitempty"` + DomainProperties []string `json:"domainProperties,omitempty"` + PropertyScope string `json:"propertyScope,omitempty"` + Index int32 `json:"index"` + RuntimeID string `json:"runtimeId,omitempty"` + AdditionalMetadata map[string]interface{} + //RuntimeType string `json:"runtimeType"` + //MeasurementUnit string `json:"measurementUnit,omitempty"` + //ValueSpecification ValueSpecification `json:"valueSpecification,omitempty"` } +// 有点问题,后续看一下 type EventProperties struct { ElementID string `json:"elementId"` Label string `json:"label"` @@ -47,7 +49,7 @@ type EventProperties struct { Required bool `json:"required"` DomainProperties []string `json:"domainProperties"` PropertyScope string `json:"propertyScope"` - Index int `json:"index"` + Index int32 `json:"index"` RuntimeID string `json:"runtimeId"` RuntimeType string `json:"runtimeType,omitempty"` MeasurementUnit string `json:"measurementUnit,omitempty"` diff --git a/streampipes-client-go/streampipes/model/pipeline/pipeline.go b/streampipes-client-go/streampipes/model/pipeline/pipeline.go new file mode 100644 index 0000000000..6d8d03bc24 --- /dev/null +++ b/streampipes-client-go/streampipes/model/pipeline/pipeline.go @@ -0,0 +1,187 @@ +package pipeline + +import ( + "github.com/apache/streampipes/streampipes-client-go/streampipes/model" +) + +type Pipeline struct { + Sepas []DataProcessorInvocation `json:"sepas"` + Streams []SpDataStream `json:"streams"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Actions []DataSinkInvocation `json:"actions"` + Running bool `json:"running"` + RestartOnSystemReboot bool `json:"restartOnSystemReboot"` + Valid bool `json:"valid"` + StartedAt int64 `json:"startedAt,omitempty"` + CreatedAt int64 `json:"createdAt"` + PublicElement bool `json:"publicElement"` + CreatedByUser string `json:"createdByUser"` + PipelineCategories []string `json:"pipelineCategories"` + PipelineNotifications []string `json:"pipelineNotifications"` + //HealthStatus string `json:"healthStatus"` //枚举类型,OK, REQUIRES_ATTENTION, FAILURE + ID string `json:"_id,omitempty"` + Rev string `json:"_rev,omitempty"` +} + +type DataProcessorInvocation struct { + ElementId string `json:"elementId"` + Dom string `json:"dom"` + ConnectedTo []string `json:"connectedTo"` + Name string `json:"name"` + Description string `json:"description"` + IconUrl string `json:"iconUrl"` + AppId string `json:"appId"` + IncludesAssets bool `json:"includesAssets"` + IncludesLocales bool `json:"includesLocales"` + IncludedAssets []string `json:"includedAssets"` + IncludedLocales []string `json:"includedLocales"` + InternallyManaged bool `json:"internallyManaged"` + Version int32 `json:"version"` + InputStreams []SpDataStream `json:"inputStreams"` + StaticProperties []StaticProperty `json:"staticProperties"` + BelongsTo string `json:"belongsTo"` + StatusInfoSettings ElementStatusInfoSettings `json:"statusInfoSettings"` + SupportedGrounding EventGrounding `json:"supportedGrounding"` + CorrespondingPipeline string `json:"correspondingPipeline"` + CorresponddingUser string `json:"correspondingUser"` + StreamRequirements []SpDataStream `json:"streamRequirements"` + Configured bool `json:"configured"` + Uncompleted bool `json:"uncompleted"` + SelectedEndpointUrl string `json:"selectedEndpointUrl"` + //ServiceTagPrefix interface{} `json:"serviceTagPrefix"` + OutputStream SpDataStream `json:"outputStream"` + OutputStrategies []OutputStrategy `json:"outputStrategies"` + PathName string `json:"pathName"` + Category []string `json:"category"` // 可能需要更精确的类型 + Rev string `json:"_rev"` +} + +type SpDataStream struct { + ElementId string `json:"elementId"` + Dom string `json:"dom"` + ConnectedTo []string `json:"connectedTo"` + Name string `json:"name"` + Description string `json:"description"` + IconUrl string `json:"iconUrl"` + AppId string `json:"appId"` + IncludesAssets bool `json:"includesAssets"` + IncludesLocales bool `json:"includesLocales"` + IncludedAssets []string `json:"includedAssets"` + IncludedLocales []string `json:"includedLocales"` + InternallyManaged bool // 假设它是一个interface{}切片,或者你可以指定为其他类型 + EventGrounding EventGrounding `json:"eventGrounding"` + EventSchema model.EventSchema `json:"eventSchema"` + Category []string `json:"category"` + Index int32 `json:"index"` + CorrespondingAdapterId string `json:"correspondingAdapterId"` + Rev string `json:"_rev"` // 通常使用驼峰命名法,这里保持原样 +} + +type EventGrounding struct { + TransportProtocols []TransportProtocol `json:"transportProtocols"` + TransportFormats []TransportFormat `json:"transportFormats"` +} + +type TransportProtocol struct { + ElementId string `json:"elementId"` + BrokerHostname string `json:"brokerHostname"` + TopicDefinition TopicDefinition `json:"topicDefinition"` + Class string `json:"@class,omitempty"` // 假设@class是JSON中的特殊字段 +} + +type TopicDefinition struct { + ActualTopicName string `json:"actualTopicName"` + Class string `json:"@class"` // 使用json标签映射到JSON中的@class字段 +} + +type TransportFormat struct { + RdfType []string `json:"rdfType"` // 假设rdfType是一个URI的切片 +} + +type StaticPropertyType string + +const ( + AnyStaticProperty StaticPropertyType = "AnyStaticProperty" + CodeInputStaticProperty StaticPropertyType = "CodeInputStaticProperty" + CollectionStaticProperty StaticPropertyType = "CollectionStaticProperty" + ColorPickerStaticProperty StaticPropertyType = "ColorPickerStaticProperty" + DomainStaticProperty StaticPropertyType = "DomainStaticProperty" + FreeTextStaticProperty StaticPropertyType = "FreeTextStaticProperty" + FileStaticProperty StaticPropertyType = "FileStaticProperty" + MappingPropertyUnary StaticPropertyType = "MappingPropertyUnary" + MappingPropertyNary StaticPropertyType = "MappingPropertyNary" + MatchingStaticProperty StaticPropertyType = "MatchingStaticProperty" + OneOfStaticProperty StaticPropertyType = "OneOfStaticProperty" + RuntimeResolvableAnyStaticProperty StaticPropertyType = "RuntimeResolvableAnyStaticProperty" + RuntimeResolvableGroupStaticProperty StaticPropertyType = "RuntimeResolvableGroupStaticProperty" + RuntimeResolvableOneOfStaticProperty StaticPropertyType = "RuntimeResolvableOneOfStaticProperty" + RuntimeResolvableTreeInputStaticProperty StaticPropertyType = "RuntimeResolvableTreeInputStaticProperty" + StaticPropertyGroup StaticPropertyType = "StaticPropertyGroup" + StaticPropertyAlternatives StaticPropertyType = "StaticPropertyAlternatives" + StaticPropertyAlternative StaticPropertyType = "StaticPropertyAlternative" + SecretStaticProperty StaticPropertyType = "SecretStaticProperty" + SlideToggleStaticProperty StaticPropertyType = "SlideToggleStaticProperty" +) + +type StaticProperty struct { + Optional bool `json:"optional,omitempty"` + StaticPropertyType StaticPropertyType `json:"staticPropertyType"` //不一定要 + Index int32 `json:"index"` + Label string `json:"label"` + Description string `json:"description"` + InternalName string `json:"internalName"` + Predefined bool `json:"predefined"` + Class string `json:"@class"` +} + +type ElementStatusInfoSettings struct { + ElementIdentifier string `json:"elementIdentifier"` + KafkaHost string `json:"kafkaHost"` + KafkaPort int32 `json:"kafkaPort"` + ErrorTopic string `json:"errorTopic"` + StatsTopic string `json:"statsTopic"` +} + +type OutputStrategy struct { + Name string `json:"name"` + RenameRules []PropertyRenameRule `json:"renameRules"` + // Class 字段用来模拟 @class 注解,但在Go中通常不会这样命名 + // 如果你确实需要存储类信息,可以将其作为一个普通的字符串字段 + Class string `json:"class,omitempty"` // omitempty 表示在序列化时如果该字段为空则忽略 +} + +type PropertyRenameRule struct { + RuntimeID string `json:"runtimeId"` + NewRuntimeName string `json:"newRuntimeName"` +} + +type DataSinkInvocation struct { + ElementId string `json:"elementId"` + Dom string `json:"dom"` + ConnectedTo []string `json:"connectedTo"` + Name string `json:"name"` + Description string `json:"description"` + IconUrl string `json:"iconUrl"` + AppId string `json:"appId"` + IncludesAssets bool `json:"includesAssets"` + IncludesLocales bool `json:"includesLocales"` + IncludedAssets []string `json:"includedAssets"` + IncludedLocales []string `json:"includedLocales"` + InternallyManaged bool `json:"internallyManaged"` + Version int32 `json:"version"` + InputStreams []SpDataStream `json:"inputStreams"` + StaticProperties []StaticProperty `json:"staticProperties"` + BelongsTo string `json:"belongsTo"` + StatusInfoSettings ElementStatusInfoSettings `json:"statusInfoSettings"` + SupportedGrounding EventGrounding `json:"supportedGrounding"` + CorrespondingPipeline string `json:"correspondingPipeline"` + CorrespondingUser string `json:"correspondingUser"` + StreamRequirements []SpDataStream `json:"streamRequirements"` + Configured bool `json:"configured"` + Uncompleted bool `json:"uncompleted"` + SelectedEndpointUrl string `json:"selectedEndpointUrl"` + // 去掉了一个枚举类型 + Category []string `json:"category"` + Rev string `json:"_rev"` +} diff --git a/streampipes-client-go/streampipes/pipeline_api.go b/streampipes-client-go/streampipes/pipeline_api.go index 5f3155aa3a..b4560feaf2 100644 --- a/streampipes-client-go/streampipes/pipeline_api.go +++ b/streampipes-client-go/streampipes/pipeline_api.go @@ -39,12 +39,211 @@ func NewPipeline(clientConfig config.StreamPipesClientConfig) *Pipeline { } } +// GetSinglePipeline get a specific pipeline with the given id +func (p *Pipeline) GetSinglePipeline(pipelineId string) (pipeline.Pipeline, error) { + + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId}) + log.Printf("Get data from: %s", endPointUrl) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return pipeline.Pipeline{}, err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return pipeline.Pipeline{}, err + } + } + + body, err := io.ReadAll(response.Body) + if err != nil { + return pipeline.Pipeline{}, err + } + + unmarshalData, err := serializer.NewPipelineDeserializer().Unmarshal(body) + if err != nil { + return pipeline.Pipeline{}, err + } + pipeLine := unmarshalData.(pipeline.Pipeline) + + return pipeLine, nil +} + +// DeleteSinglePipeline delete a pipeline with a given id +func (p *Pipeline) DeleteSinglePipeline(pipelineId string) error { + + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId}) + log.Printf("Delete data from: %s", endPointUrl) + + response, err := p.executeRequest("DELETE", endPointUrl, nil) + if err != nil { + return err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return err + } + } + return nil +} + +// UpdateSinglePipeline update an existing pipeline +func (p *Pipeline) UpdateSinglePipeline(pp pipeline.Pipeline, pipelineId string) error { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId}) + body, err := serializer.NewPipelineSerializer().Marshal(pp) + if err != nil { + return err + } + response, err := p.executeRequest("PUT", endPointUrl, body) + if err != nil { + return err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return err + } + } + return nil +} + +// GetAllPipeline get all pipelines of the current user +func (p *Pipeline) GetAllPipeline() ([]pipeline.Pipeline, error) { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", nil) + log.Printf("Get data from: %s", endPointUrl) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return nil, err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return nil, err + } + } + + body, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + unmarshalData, err := serializer.NewPipelinesDeserializer().Unmarshal(body) + if err != nil { + return nil, err + } + pipelines := unmarshalData.([]pipeline.Pipeline) + + return pipelines, nil +} + +// CreatePipeline store a new pipeline +func (p *Pipeline) CreatePipeline(pp pipeline.Pipeline) error { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", nil) + + body, err := serializer.NewPipelineSerializer().Marshal(pp) + if err != nil { + return err + } + response, err := p.executeRequest("POST", endPointUrl, body) + if err != nil { + return err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return err + } + } + return nil +} + +// StopSinglePipeline stop the pipeline with the given id +// 待定,因为不知道要返回什么 +func (p *Pipeline) StopSinglePipeline(pipelineId string) error { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId, "stop"}) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return err + } + } + return nil +} + +// GetSinglePipelineStatus get the pipeline status of a given pipeline +func (p *Pipeline) GetSinglePipelineStatus(pipelineId string) error { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId, "status"}) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return err + } + } + return nil +} + +// StartSinglePipeline start the pipeline with the given id +func (p *Pipeline) StartSinglePipeline(pipelineId string) error { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines", []string{pipelineId, "start"}) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return err + } + } + return nil +} + +// ContainsElementPipeline returns all pipelines that contain the element with the elementld +func (p *Pipeline) ContainsElementPipeline(pipelineId string) error { + endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelines/contains", []string{pipelineId}) + + response, err := p.executeRequest("GET", endPointUrl, nil) + if err != nil { + return err + } + + if response.StatusCode != http.StatusOK { + err = p.handleStatusCode(response) + if err != nil { + return err + } + } + return nil +} + func (p *Pipeline) GetPipelineCategory() ([]pipeline.PipelineCategory, error) { endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelinecategories", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := p.executeRequest("GET", endPointUrl) + response, err := p.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -75,7 +274,7 @@ func (p *Pipeline) DeletePipelineCategory(categoryId string) (model.ResponseMess endPointUrl := util.NewStreamPipesApiPath(p.config.Url, "streampipes-backend/api/v2/pipelinecategories", []string{categoryId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := p.executeRequest("DELETE", endPointUrl) + response, err := p.executeRequest("DELETE", endPointUrl, nil) if err != nil { return model.ResponseMessage{}, err } diff --git a/streampipes-client-go/streampipes/streampipes_version_api.go b/streampipes-client-go/streampipes/streampipes_version_api.go index 54ef8ba108..4c6a54a857 100644 --- a/streampipes-client-go/streampipes/streampipes_version_api.go +++ b/streampipes-client-go/streampipes/streampipes_version_api.go @@ -44,7 +44,7 @@ func (d *Versions) GetStreamPipesVersion() (streampipes_version.Versions, error) endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v2/info/versions", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := d.executeRequest("GET", endPointUrl) + response, err := d.executeRequest("GET", endPointUrl, nil) if err != nil { return streampipes_version.Versions{}, err } diff --git a/streampipes-client-go/streampipes/user_api.go b/streampipes-client-go/streampipes/user_api.go index 40a9a4ee65..c6ee405368 100644 --- a/streampipes-client-go/streampipes/user_api.go +++ b/streampipes-client-go/streampipes/user_api.go @@ -43,7 +43,7 @@ func (s *StreamPipesUserInfo) GetSingleStreamPipesUserAccountInfo(principalId st endPointUrl := util.NewStreamPipesApiPath(s.config.Url, "streampipes-backend/api/v2/users", []string{principalId}) log.Printf("Get data from: %s", endPointUrl) - response, err := s.executeRequest("GET", endPointUrl) + response, err := s.executeRequest("GET", endPointUrl, nil) if err != nil { return streampipes_user.UserAccount{}, err } @@ -74,7 +74,7 @@ func (s *StreamPipesUserInfo) GetAllStreamPipesShortUserInfo() ([]streampipes_us endPointUrl := util.NewStreamPipesApiPath(s.config.Url, "streampipes-backend/api/v2/users", nil) log.Printf("Get data from: %s", endPointUrl) - response, err := s.executeRequest("GET", endPointUrl) + response, err := s.executeRequest("GET", endPointUrl, nil) if err != nil { return nil, err } @@ -104,7 +104,7 @@ func (s *StreamPipesUserInfo) DeleteSingleStreamPipesShortUserInfo(principalId s endPointUrl := util.NewStreamPipesApiPath(s.config.Url, "streampipes-backend/api/v2/users", []string{principalId}) log.Printf("Delete data from: %s", endPointUrl) - response, err := s.executeRequest("DELETE", endPointUrl) + response, err := s.executeRequest("DELETE", endPointUrl, nil) if err != nil { return err }