Skip to content

Commit

Permalink
feat: support selecting multiple property aliases and ids in time ser…
Browse files Browse the repository at this point in the history
…ies queries
  • Loading branch information
hwandersman committed Mar 3, 2025
1 parent 8c15783 commit 17a0e09
Show file tree
Hide file tree
Showing 68 changed files with 27,714 additions and 1,304 deletions.
8 changes: 7 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ datasources:

### Running e2e tests locally

To run e2e tests locally:
To run e2e tests locally, first run the local server:

```
yarn server:dev
```

Then run the e2e test command:

```
yarn run e2e
Expand Down
4 changes: 3 additions & 1 deletion cspell.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
"sqlds",
"errorsource",
"viant",
"unmarshalling"
"unmarshalling",
"assetid",
"propid"
]
}
11 changes: 7 additions & 4 deletions pkg/framer/property_interpolated.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (p InterpolatedAssetPropertyValue) Frames(ctx context.Context, resources re
func (p InterpolatedAssetPropertyValue) Frame(ctx context.Context, property *iotsitewise.DescribeAssetPropertyOutput, v []*iotsitewise.InterpolatedAssetPropertyValue) (*data.Frame, error) {
// TODO: make this work with the API instead of ad-hoc dataType inference
// https://github.com/grafana/iot-sitewise-datasource/issues/98#issuecomment-892947756
if util.IsAssetProperty(property) && *property.AssetProperty.DataType == *aws.String("?") {
if util.IsAssetProperty(property) && !isPropertyDataTypeDefined(*property.AssetProperty.DataType) {
property.AssetProperty.DataType = aws.String(getPropertyVariantValueType(v[0].Value))
}

Expand All @@ -58,10 +58,13 @@ func (p InterpolatedAssetPropertyValue) Frame(ctx context.Context, property *iot
frame := data.NewFrame(name, timeField, valueField)

entryId := ""
if property.AssetId != nil {
entryId = *property.AssetId
if property.AssetId != nil && property.AssetProperty.Id != nil {
entryId = *util.GetEntryIdFromAssetProperty(*property.AssetId, *property.AssetProperty.Id)
} else {
entryId = util.GetPropertyName(property)
// In resource/sitewise.go the property resource with a disassociated alias
// is manually set with the alias in the name field
alias := util.GetPropertyName(property)
entryId = *util.GetEntryIdFromPropertyAlias(alias)
}
frame.Meta = &data.FrameMeta{
Custom: models.SitewiseCustomMeta{
Expand Down
2 changes: 1 addition & 1 deletion pkg/framer/property_value_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (p AssetPropertyValueBatch) Frames(ctx context.Context, resources resource.
for _, r := range p.Responses {
for _, e := range r.SuccessEntries {
property := properties[*e.EntryId]
if util.IsAssetProperty(property) && *property.AssetProperty.DataType == *aws.String("?") && e.AssetPropertyValue != nil {
if util.IsAssetProperty(property) && !isPropertyDataTypeDefined(*property.AssetProperty.DataType) && e.AssetPropertyValue != nil {
property.AssetProperty.DataType = aws.String(getPropertyVariantValueType(e.AssetPropertyValue.Value))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/framer/property_value_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (p AssetPropertyValueHistory) Frames(ctx context.Context, resources resourc
}
// TODO: make this work with the API instead of ad-hoc dataType inference
// https://github.com/grafana/iot-sitewise-datasource/issues/98#issuecomment-892947756
if util.IsAssetProperty(property) && *property.AssetProperty.DataType == *aws.String("?") {
if util.IsAssetProperty(property) && !isPropertyDataTypeDefined(*property.AssetProperty.DataType) {
property.AssetProperty.DataType = aws.String(getPropertyVariantValueType(p.AssetPropertyValueHistory[0].Value))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/framer/property_value_history_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (p AssetPropertyValueHistoryBatch) Frame(ctx context.Context, property *iot
length := len(h)
// TODO: make this work with the API instead of ad-hoc dataType inference
// https://github.com/grafana/iot-sitewise-datasource/issues/98#issuecomment-892947756
if util.IsAssetProperty(property) && *property.AssetProperty.DataType == *aws.String("?") {
if util.IsAssetProperty(property) && !isPropertyDataTypeDefined(*property.AssetProperty.DataType) {
if length != 0 {
property.AssetProperty.DataType = aws.String(getPropertyVariantValueType(h[0].Value))
} else {
Expand Down
4 changes: 4 additions & 0 deletions pkg/framer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func getTime(ts *iotsitewise.TimeInNanos) time.Time {
return time.Unix(sec, 0)
}

func isPropertyDataTypeDefined(dataType string) bool {
return dataType == "BOOLEAN" || dataType == "DOUBLE" || dataType == "INTEGER" || dataType == "STRING"
}

func getPropertyVariantValue(variant *iotsitewise.Variant) interface{} {

if val := variant.BooleanValue; val != nil {
Expand Down
18 changes: 9 additions & 9 deletions pkg/models/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func GetDescribeAssetQuery(dq *backend.DataQuery) (*DescribeAssetQuery, error) {
return nil, err
}

// AssetId <--> AssetIds backward compatibility
query.MigrateAssetId()
// Backward compatibility for asset, property, and property alias string --> list
query.MigrateAssetProperty()

// add on the DataQuery params
query.QueryType = dq.QueryType
Expand All @@ -55,8 +55,8 @@ func GetListAssetPropertiesQuery(dq *backend.DataQuery) (*ListAssetPropertiesQue
return nil, err
}

// AssetId <--> AssetIds backward compatibility
query.MigrateAssetId()
// Backward compatibility for asset, property, and property alias string --> list
query.MigrateAssetProperty()

query.QueryType = dq.QueryType
return query, nil
Expand All @@ -68,8 +68,8 @@ func GetListAssetsQuery(dq *backend.DataQuery) (*ListAssetsQuery, error) {
return nil, err
}

// AssetId <--> AssetIds backward compatibility
query.MigrateAssetId()
// Backward compatibility for asset, property, and property alias string --> list
query.MigrateAssetProperty()

// add on the DataQuery params
query.MaxDataPoints = dq.MaxDataPoints
Expand All @@ -81,7 +81,7 @@ func GetListTimeSeriesQuery(dq *backend.DataQuery) (*ListTimeSeriesQuery, error)
query := &ListTimeSeriesQuery{}
if err := json.Unmarshal(dq.JSON, query); err != nil {
return nil, err
}
}

// add on the DataQuery params
query.MaxDataPoints = dq.MaxDataPoints
Expand All @@ -95,8 +95,8 @@ func GetListAssociatedAssetsQuery(dq *backend.DataQuery) (*ListAssociatedAssetsQ
return nil, err
}

// AssetId <--> AssetIds backward compatibility
query.MigrateAssetId()
// Backward compatibility for asset, property, and property alias string --> list
query.MigrateAssetProperty()

// add on the DataQuery params
query.MaxDataPoints = dq.MaxDataPoints
Expand Down
8 changes: 4 additions & 4 deletions pkg/models/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func GetListAssetModelsQuery(dq *backend.DataQuery) (*ListAssetModelsQuery, erro
return nil, err
}

// AssetId <--> AssetIds backward compatibility
query.MigrateAssetId()
// Backward compatibility for asset, property, and property alias string --> list
query.MigrateAssetProperty()

// add on the DataQuery params
query.MaxDataPoints = dq.MaxDataPoints
Expand All @@ -44,8 +44,8 @@ func GetDescribeAssetModelQuery(dq *backend.DataQuery) (*DescribeAssetModelQuery
return nil, err
}

// AssetId <--> AssetIds backward compatibility
query.MigrateAssetId()
// Backward compatibility for asset, property, and property alias string --> list
query.MigrateAssetProperty()

// add on the DataQuery params
query.QueryType = dq.QueryType
Expand Down
18 changes: 10 additions & 8 deletions pkg/models/property.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,23 @@ type AssetPropertyValueQuery struct {
FlattenL4e bool `json:"flattenL4e,omitempty"`
}

// Track the assetId, propertyId, and property alias of a data stream
// after lookup for consistent batched processing
type AssetPropertyEntry struct {
AssetId string `json:"assetId,omitempty"`
PropertyId string `json:"propertyId,omitempty"`
PropertyAlias string `json:"propertyAlias,omitempty"`
}

func GetAssetPropertyValueQuery(dq *backend.DataQuery) (*AssetPropertyValueQuery, error) {

query := &AssetPropertyValueQuery{}
if err := json.Unmarshal(dq.JSON, query); err != nil {
return nil, err
}

// AssetId <--> AssetIds backward compatibility
query.MigrateAssetId()

//if propertyAlias is set make sure to set the assetId and propertyId to nil
if query.PropertyAlias != "" {
query.AssetId = ""
query.PropertyId = ""
}
// Backward compatibility for asset, property, and property alias string --> list
query.MigrateAssetProperty()

if query.TimeOrdering == "" {
query.TimeOrdering = "ASCENDING"
Expand Down
40 changes: 26 additions & 14 deletions pkg/models/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,43 @@ const (
type BaseQuery struct {
// General
AwsRegion string `json:"region,omitempty"`

QueryType string `json:"-"`

// Sitewise specific
// Deprecated: use assetIds
AssetId string `json:"assetId,omitempty"`
AssetIds []string `json:"assetIds,omitempty"`
PropertyId string `json:"propertyId,omitempty"`
PropertyAlias string `json:"propertyAlias,omitempty"`
NextToken string `json:"nextToken,omitempty"`
NextTokens map[string]string `json:"nextTokens,omitempty"`
MaxPageAggregations int `json:"maxPageAggregations,omitempty"`
ResponseFormat string `json:"responseFormat,omitempty"`
// Deprecated: use AssetIds
AssetId string `json:"assetId,omitempty"`
AssetIds []string `json:"assetIds,omitempty"`
// Deprecated: use PropertyIds
PropertyId string `json:"propertyId,omitempty"`
PropertyIds []string `json:"propertyIds,omitempty"`
// Deprecated: use PropertyAliases
PropertyAlias string `json:"propertyAlias,omitempty"`
PropertyAliases []string `json:"propertyAliases,omitempty"`
AssetPropertyEntries []AssetPropertyEntry `json:"assetPropertyEntries,omitempty"`
NextToken string `json:"nextToken,omitempty"`
NextTokens map[string]string `json:"nextTokens,omitempty"`
MaxPageAggregations int `json:"maxPageAggregations,omitempty"`
ResponseFormat string `json:"responseFormat,omitempty"`

// Also provided by sqlutil.Query. Migrate to that
Interval time.Duration `json:"-"`
TimeRange backend.TimeRange `json:"-"`
MaxDataPoints int64 `json:"-"`
}

// MigrateAssetId handles AssetId <--> AssetIds backward compatibility.
// This is needed for compatibility for queries saved before the Batch API changes were introduced in 1.6.0
func (query *BaseQuery) MigrateAssetId() {
// MigrateAssetProperty handles AssetId, PropertyId, PropertyAlias --> AssetIds, PropertyIds, PropertyAliases backward compatibility.
// This is needed for compatibility for queries saved before the Batch API changes were introduced in 2.1.0
func (query *BaseQuery) MigrateAssetProperty() {
if query.AssetId != "" {
query.AssetIds = []string{query.AssetId}
} else if len(query.AssetIds) > 0 {
query.AssetId = query.AssetIds[0]
}

if query.PropertyId != "" {
query.PropertyIds = []string{query.PropertyId}
}

if query.PropertyAlias != "" {
query.PropertyAliases = []string{query.PropertyAlias}
}
}
32 changes: 19 additions & 13 deletions pkg/resource/query_resouce_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,41 @@ func (rp *queryResourceProvider) Assets(ctx context.Context) (map[string]*iotsit

func (rp *queryResourceProvider) Property(ctx context.Context) (*iotsitewise.DescribeAssetPropertyOutput, error) {
assetId := ""
propertyId := ""
propertyAlias := ""

// use the first asset id if there are multiple
// ok to only use the first item in the list since this function is called for non-batch queries

// use the first assetId if there are multiple
if len(rp.baseQuery.AssetIds) > 0 {
assetId = rp.baseQuery.AssetIds[0]
}

return rp.resources.Property(ctx, assetId, rp.baseQuery.PropertyId, rp.baseQuery.PropertyAlias)
// use the first propertyId if there are multiple
if len(rp.baseQuery.PropertyIds) > 0 {
propertyId = rp.baseQuery.PropertyIds[0]
}

// use the first propertyAlias if there are multiple
if len(rp.baseQuery.PropertyAliases) > 0 {
propertyAlias = rp.baseQuery.PropertyAliases[0]
}

return rp.resources.Property(ctx, assetId, propertyId, propertyAlias)
}

func (rp *queryResourceProvider) Properties(ctx context.Context) (map[string]*iotsitewise.DescribeAssetPropertyOutput, error) {
properties := map[string]*iotsitewise.DescribeAssetPropertyOutput{}
// if the query for a PropertyAlias doesn't have an assetId or propertyId, it means it's a disassociated stream
// in that case, we call Property() with empty values, which will set AssetProperty.Name to the alias
// and will set the EntryId to the hashed alias (to access values in results)
if len(rp.baseQuery.AssetIds) == 0 && rp.baseQuery.PropertyId == "" && rp.baseQuery.PropertyAlias != "" {
prop, err := rp.resources.Property(ctx, "", "", rp.baseQuery.PropertyAlias)
for _, entry := range rp.baseQuery.AssetPropertyEntries {
prop, err := rp.resources.Property(ctx, entry.AssetId, entry.PropertyId, entry.PropertyAlias)
if err != nil {
return nil, err
}
entryId := util.GetEntryId(rp.baseQuery)
entryId := util.GetEntryIdFromAssetPropertyEntry(entry)
properties[*entryId] = prop
} else {
for _, id := range rp.baseQuery.AssetIds {
prop, err := rp.resources.Property(ctx, id, rp.baseQuery.PropertyId, rp.baseQuery.PropertyAlias)
if err != nil {
return nil, err
}
properties[id] = prop
}
}

return properties, nil
Expand Down
20 changes: 18 additions & 2 deletions pkg/resource/sitewise.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,29 @@ func (rp *SitewiseResources) Asset(ctx context.Context, assetId string) (*iotsit

func (rp *SitewiseResources) Property(ctx context.Context, assetId string, propertyId string, propertyAlias string) (*iotsitewise.DescribeAssetPropertyOutput, error) {
if propertyAlias != "" && (assetId == "" && propertyId == "") {
return &iotsitewise.DescribeAssetPropertyOutput{
defaultOutput := &iotsitewise.DescribeAssetPropertyOutput{
AssetName: aws.String(""),
AssetProperty: &iotsitewise.Property{
Name: aws.String(propertyAlias),
DataType: aws.String("?"),
},
}, nil
}

resp, err := rp.client.DescribeTimeSeriesWithContext(ctx, &iotsitewise.DescribeTimeSeriesInput{
Alias: aws.String(propertyAlias),
})
if err != nil {
return defaultOutput, err
}

if resp.AssetId != nil && resp.PropertyId != nil {
return rp.client.DescribeAssetPropertyWithContext(ctx, &iotsitewise.DescribeAssetPropertyInput{
AssetId: resp.AssetId,
PropertyId: resp.PropertyId,
})
}

return defaultOutput, nil
}

return rp.client.DescribeAssetPropertyWithContext(ctx, &iotsitewise.DescribeAssetPropertyInput{
Expand Down
11 changes: 11 additions & 0 deletions pkg/server/test/contants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package test

import (
"github.com/grafana/iot-sitewise-datasource/pkg/util"
)

var mockAssetId = "1assetid-aaaa-2222-bbbb-3333cccc4444"
var mockPropertyId = "11propid-aaaa-2222-bbbb-3333cccc4444"
var mockPropertyAlias = "/amazon/renton/1/rpm"
var mockAssetPropertyEntryId = util.GetEntryIdFromAssetProperty(mockAssetId, mockPropertyId)
var mockPropertyAliasEntryId = util.GetEntryIdFromPropertyAlias(mockPropertyAlias)
Loading

0 comments on commit 17a0e09

Please sign in to comment.