Skip to content

Commit

Permalink
feat: add more support for pipeline API to Go client (#3104)
Browse files Browse the repository at this point in the history
* feat: add more support for pipeline API to Go client

* fix: not enough arguments in call to d.executeRequest

* fix: not enough arguments in call to d.executeRequest

* fix: not enough arguments in call to d.executeRequest

* feat: add more support for pipeline API to Go client

* feat: add more support for pipeline API to Go client

* feat: add more support for pipeline API to Go client

* feat: add more support for pipeline API to Go client

* feat: add more support for pipeline API to Go client

* feat: add more support for pipeline API to Go client
  • Loading branch information
wyyolo authored Aug 25, 2024
1 parent b7496af commit 22ac222
Show file tree
Hide file tree
Showing 12 changed files with 661 additions and 47 deletions.
6 changes: 3 additions & 3 deletions streampipes-client-go/streampipes/data_lake_dashboard_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (d *DataLakeDashboard) GetSingleDataLakeDashboard(dashboardId string) (data
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard", []string{dashboardId})
log.Printf("Get data from: %s", endPointUrl)

response, err := d.executeRequest("GET", endPointUrl)
response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return data_lake.Dashboard{}, err
}
Expand Down Expand Up @@ -73,7 +73,7 @@ func (d *DataLakeDashboard) GetAllDataLakeDashboard() ([]data_lake.Dashboard, er
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard", nil)
log.Printf("Get data from: %s", endPointUrl)

response, err := d.executeRequest("GET", endPointUrl)
response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func (d *DataLakeDashboard) DeleteSingleDataLakeDashboard(dashboardId string) er
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard", []string{dashboardId})
log.Printf("Delete data from: %s", endPointUrl)

response, err := d.executeRequest("DELETE", endPointUrl)
response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions streampipes-client-go/streampipes/data_lake_measure_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func NewDataLakeMeasures(clientConfig config.StreamPipesClientConfig) *DataLakeM
}
}

// AllDataLakeMeasure retrieves a list of all measurements series from the Data Lake.
func (d *DataLakeMeasure) AllDataLakeMeasure() ([]data_lake.DataLakeMeasure, error) {
// GetAllDataLakeMeasure retrieves a list of all measurements series from the Data Lake.
func (d *DataLakeMeasure) GetAllDataLakeMeasure() ([]data_lake.DataLakeMeasure, error) {

endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", nil)
log.Printf("Get data from: %s", endPointUrl)

response, err := d.executeRequest("GET", endPointUrl)
response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (d *DataLakeMeasure) DeleteDataLakeMeasurements() error {
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", nil)
log.Printf("Delete data from: %s", endPointUrl)

response, err := d.executeRequest("DELETE", endPointUrl)
response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
Expand All @@ -102,7 +102,7 @@ func (d *DataLakeMeasure) GetSingleDataLakeMeasure(elementId string) (data_lake.
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measure", []string{elementId})
log.Printf("Get data from: %s", endPointUrl)

response, err := d.executeRequest("GET", endPointUrl)
response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return data_lake.DataLakeMeasure{}, err
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func (d *DataLakeMeasure) DeleteSingleDataLakeMeasure(elementId string) error {
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measure", []string{elementId})
log.Printf("Delete data from: %s", endPointUrl)

response, err := d.executeRequest("DELETE", endPointUrl)
response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
Expand All @@ -157,7 +157,7 @@ func (d *DataLakeMeasure) GetSingleDataSeries(measureId string) (*data_lake.Data
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", []string{measureId})
log.Printf("Get data from: %s", endPointUrl)

response, err := d.executeRequest("GET", endPointUrl)
response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (d *DataLakeMeasure) ClearDataLakeMeasureData(measureId string) error {
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", []string{measureId})
log.Printf("Clear data from: %s", endPointUrl)

response, err := d.executeRequest("DELETE", endPointUrl)
response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
Expand All @@ -212,7 +212,7 @@ func (d *DataLakeMeasure) DeleteDataLakeMeasure(measureId string) error {

endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v4/datalake/measurements", []string{measureId, "drop"})
log.Printf("Delete data from: %s", endPointUrl)
response, err := d.executeRequest("DELETE", endPointUrl)
response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions streampipes-client-go/streampipes/data_lake_widget_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (d *DataLakeWidget) GetSingleDataLakeWidget(widgetId string) (data_lake.Dat
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard/widgets", []string{widgetId})
log.Printf("Get data from: %s", endPointUrl)

response, err := d.executeRequest("GET", endPointUrl)
response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return data_lake.DataExplorerWidgetModel{}, err
}
Expand Down Expand Up @@ -75,7 +75,7 @@ func (d *DataLakeWidget) GetAllDataLakeWidget() ([]data_lake.DataExplorerWidgetM
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard/widgets", nil)
log.Printf("Get data from: %s", endPointUrl)

response, err := d.executeRequest("GET", endPointUrl)
response, err := d.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func (d *DataLakeWidget) DeleteSingleDataLakeWidget(widgetId string) error {
endPointUrl := util.NewStreamPipesApiPath(d.config.Url, "streampipes-backend/api/v3/datalake/dashboard/widgets", []string{widgetId})
log.Printf("Delete data from: %s", endPointUrl)

response, err := d.executeRequest("DELETE", endPointUrl)
response, err := d.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
Expand Down
39 changes: 22 additions & 17 deletions streampipes-client-go/streampipes/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@
package streampipes

import (
"errors"
"net/http"

"bytes"
"fmt"
"github.com/apache/streampipes/streampipes-client-go/streampipes/config"
headers "github.com/apache/streampipes/streampipes-client-go/streampipes/internal/http_headers"
"io"
"net/http"
)

type endpoint struct {
config config.StreamPipesClientConfig
}

func (e *endpoint) executeRequest(method string, endPointUrl string) (*http.Response, error) {
func (e *endpoint) executeRequest(method string, endPointUrl string, body []byte) (*http.Response, error) {

req, err := http.NewRequest(method, endPointUrl, nil)
var reader io.Reader
if body != nil {
reader = bytes.NewReader(body)
}
req, err := http.NewRequest(method, endPointUrl, reader)
if err != nil {
return nil, err
}
Expand All @@ -53,23 +58,23 @@ func (e *endpoint) handleStatusCode(resp *http.Response) error {

switch resp.StatusCode {
case http.StatusUnauthorized:
return errors.New("The streamPipes Backend returned an unauthorized error.\nplease check your ApiUser and/or Apikey to be correct.")
return fmt.Errorf("response code %d:"+"The streamPipes Backend returned an unauthorized error.\nplease check your ApiUser and/or Apikey to be correct.", resp.StatusCode)
case http.StatusForbidden:
return errors.New("There seems to be an issue with the access rights of the given user and the resource you queried.\n" +
"Apparently, this user is not allowed to query the resource.\n" +
"Please check the user's permissions or contact your StreamPipes admin.")
return fmt.Errorf("response code %d:"+"There seems to be an issue with the access rights of the given user and the resource you queried.\n"+
"Apparently, this user is not allowed to query the resource.\n"+
"Please check the user's permissions or contact your StreamPipes admin.", resp.StatusCode)
case http.StatusNotFound:
return errors.New("There seems to be an issue with the Go Client calling the API inappropriately.\n" +
"This should not happen, but unfortunately did.\n" +
"If you don't mind, it would be awesome to let us know by creating an issue at https://github.com/apache/streampipes.\n")
return fmt.Errorf("response code %d:"+"There seems to be an issue with the Go Client calling the API inappropriately.\n"+
"This should not happen, but unfortunately did.\n"+
"If you don't mind, it would be awesome to let us know by creating an issue at https://github.com/apache/streampipes.\n", resp.StatusCode)
case http.StatusMethodNotAllowed:
return errors.New("There seems to be an issue with the Go Client calling the API inappropriately.\n" +
"This should not happen, but unfortunately did.\n" +
"If you don't mind, it would be awesome to let us know by creating an issue at https://github.com/apache/streampipes.\n")
return fmt.Errorf("response code %d:"+"There seems to be an issue with the Go Client calling the API inappropriately.\n"+
"This should not happen, but unfortunately did.\n"+
"If you don't mind, it would be awesome to let us know by creating an issue at https://github.com/apache/streampipes.\n", resp.StatusCode)
case http.StatusInternalServerError:
return errors.New("streamPipes internal error")
return fmt.Errorf("response code %d:"+"streamPipes internal error", resp.StatusCode)
default:
return errors.New(resp.Status)
return fmt.Errorf(resp.Status)
}

}
8 changes: 4 additions & 4 deletions streampipes-client-go/streampipes/functions_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (f *Functions) GetFunctionLogs(functionId string) ([]functions.SpLogEntry,
endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", []string{functionId, "logs"})
log.Printf("Get data from: %s", endPointUrl)

response, err := f.executeRequest("GET", endPointUrl)
response, err := f.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -75,7 +75,7 @@ func (f *Functions) GetFunctionMetrics(functionId string) (functions.SpMetricsEn
endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", []string{functionId, "metrics"})
log.Printf("Get data from: %s", endPointUrl)

response, err := f.executeRequest("GET", endPointUrl)
response, err := f.executeRequest("GET", endPointUrl, nil)
if err != nil {
return functions.SpMetricsEntry{}, err
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (f *Functions) GetAllFunction() ([]functions.FunctionDefinition, error) {
endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", nil)
log.Printf("Get data from: %s", endPointUrl)

response, err := f.executeRequest("GET", endPointUrl)
response, err := f.executeRequest("GET", endPointUrl, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func (f *Functions) DeleteSingleFunction(functionId string) error {
endPointUrl := util.NewStreamPipesApiPath(f.config.Url, "streampipes-backend/api/v2/functions", []string{functionId})
log.Printf("Delete data from: %s", endPointUrl)

response, err := f.executeRequest("DELETE", endPointUrl)
response, err := f.executeRequest("DELETE", endPointUrl, nil)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,65 @@ func (p *UserAccountDeserializer) Unmarshal(data []byte) (interface{}, error) {
return userAccount, nil

}

type PipelineDeserializer struct{}

func NewPipelineDeserializer() *PipelineDeserializer {
return &PipelineDeserializer{}
}

func (p *PipelineDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipeLine pipeline.Pipeline
err := json.Unmarshal(data, &pipeLine)
if err != nil {
return nil, err
}
return pipeLine, nil

}

type PipelinesDeserializer struct{}

func NewPipelinesDeserializer() *PipelinesDeserializer {
return &PipelinesDeserializer{}
}

func (p *PipelinesDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipelines []pipeline.Pipeline
err := json.Unmarshal(data, &pipelines)
if err != nil {
return nil, err
}
return pipelines, nil

}

type PipelineStatusMessagesDeserializer struct{}

func NewPipelineStatusMessagesDeserializer() *PipelineStatusMessagesDeserializer {
return &PipelineStatusMessagesDeserializer{}
}

func (p *PipelineStatusMessagesDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipelineStatusMessage []pipeline.PipelineStatusMessage
err := json.Unmarshal(data, &pipelineStatusMessage)
if err != nil {
return nil, err
}
return pipelineStatusMessage, nil
}

type PipelineOperationStatusDeserializer struct{}

func NewPipelineOperationStatusDeserializer() *PipelineOperationStatusDeserializer {
return &PipelineOperationStatusDeserializer{}
}

func (p *PipelineOperationStatusDeserializer) Unmarshal(data []byte) (interface{}, error) {
var pipelineOperationStatus pipeline.PipelineOperationStatus
err := json.Unmarshal(data, &pipelineOperationStatus)
if err != nil {
return nil, err
}
return pipelineOperationStatus, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package serializer

import (
"encoding/json"
"github.com/apache/streampipes/streampipes-client-go/streampipes/model/pipeline"
)

type PipelineSerializer struct{}

func NewPipelineSerializer() PipelineSerializer {
return PipelineSerializer{}
}

func (p PipelineSerializer) Marshal(pp pipeline.Pipeline) ([]byte, error) {
data, err := json.Marshal(pp)
if err != nil {
return nil, err
}
return data, nil
}
Loading

0 comments on commit 22ac222

Please sign in to comment.