Skip to content

Commit

Permalink
[chore] Adds internal pdatautil.FlattenLogs (open-telemetry#33322)
Browse files Browse the repository at this point in the history
Followup to open-telemetry#33311. Adds the ability to flatten logs.
  • Loading branch information
djaglowski authored Jun 1, 2024
1 parent 288f5db commit 89bc3f6
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 18 deletions.
23 changes: 23 additions & 0 deletions internal/pdatautil/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)

// FlattenResourceLogs moves each LogRecord onto a dedicated ResourceLogs and ScopeLogs.
// Modifications are made in place. Order of LogRecords is preserved.
func FlattenLogs(rls plog.ResourceLogsSlice) {
tmp := plog.NewResourceLogsSlice()
rls.MoveAndAppendTo(tmp)
for i := 0; i < tmp.Len(); i++ {
groupedResource := tmp.At(i)
for j := 0; j < groupedResource.ScopeLogs().Len(); j++ {
groupedScope := groupedResource.ScopeLogs().At(j)
for k := 0; k < groupedScope.LogRecords().Len(); k++ {
flatResource := rls.AppendEmpty()
groupedResource.Resource().Attributes().CopyTo(flatResource.Resource().Attributes())
flatScope := flatResource.ScopeLogs().AppendEmpty()
flatScope.SetSchemaUrl(groupedScope.SchemaUrl())
flatScope.Scope().SetName(groupedScope.Scope().Name())
flatScope.Scope().SetVersion(groupedScope.Scope().Version())
groupedScope.Scope().Attributes().CopyTo(flatScope.Scope().Attributes())
groupedScope.LogRecords().At(k).CopyTo(flatScope.LogRecords().AppendEmpty())
}
}
}
}

// GroupByResourceLogs groups ScopeLogs by Resource. Modifications are made in place.
func GroupByResourceLogs(rls plog.ResourceLogsSlice) {
// Hash each ResourceLogs based on identifying information.
Expand Down
153 changes: 135 additions & 18 deletions internal/pdatautil/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,121 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
)

func TestFlattenResourceLogs(t *testing.T) {
testCases := []struct {
name string
input []resourceLogs
expected []resourceLogs
}{
{
name: "empty",
input: []resourceLogs{},
expected: []resourceLogs{},
},
{
name: "single",
input: []resourceLogs{
newResourceLogs(1,
newScopeLogs(11, 111),
),
},
expected: []resourceLogs{
newResourceLogs(1,
newScopeLogs(11, 111),
),
},
},
{
name: "flatten_single_scope_in_single_resource",
input: []resourceLogs{
newResourceLogs(1,
newScopeLogs(11, 101, 102, 103),
),
},
expected: []resourceLogs{
newResourceLogs(1, newScopeLogs(11, 101)),
newResourceLogs(1, newScopeLogs(11, 102)),
newResourceLogs(1, newScopeLogs(11, 103)),
},
},
{
name: "flatten_multiple_scopes_in_single_resource",
input: []resourceLogs{
newResourceLogs(1,
newScopeLogs(11, 101, 102, 103),
newScopeLogs(22, 201, 202, 203),
),
},
expected: []resourceLogs{
newResourceLogs(1, newScopeLogs(11, 101)),
newResourceLogs(1, newScopeLogs(11, 102)),
newResourceLogs(1, newScopeLogs(11, 103)),
newResourceLogs(1, newScopeLogs(22, 201)),
newResourceLogs(1, newScopeLogs(22, 202)),
newResourceLogs(1, newScopeLogs(22, 203)),
},
},
{
name: "flatten_single_scope_in_multiple_resources",
input: []resourceLogs{
newResourceLogs(1,
newScopeLogs(11, 101, 102, 103),
),
newResourceLogs(2,
newScopeLogs(11, 104, 105, 106),
),
},
expected: []resourceLogs{
newResourceLogs(1, newScopeLogs(11, 101)),
newResourceLogs(1, newScopeLogs(11, 102)),
newResourceLogs(1, newScopeLogs(11, 103)),
newResourceLogs(2, newScopeLogs(11, 104)),
newResourceLogs(2, newScopeLogs(11, 105)),
newResourceLogs(2, newScopeLogs(11, 106)),
},
},
{
name: "flatten_multiple_scopes_in_multiple_resources",
input: []resourceLogs{
newResourceLogs(1,
newScopeLogs(11, 101, 102, 103),
newScopeLogs(22, 201, 202, 203),
),
newResourceLogs(2,
newScopeLogs(11, 104, 105, 106),
newScopeLogs(22, 204, 205, 206),
),
},
expected: []resourceLogs{
newResourceLogs(1, newScopeLogs(11, 101)),
newResourceLogs(1, newScopeLogs(11, 102)),
newResourceLogs(1, newScopeLogs(11, 103)),
newResourceLogs(1, newScopeLogs(22, 201)),
newResourceLogs(1, newScopeLogs(22, 202)),
newResourceLogs(1, newScopeLogs(22, 203)),
newResourceLogs(2, newScopeLogs(11, 104)),
newResourceLogs(2, newScopeLogs(11, 105)),
newResourceLogs(2, newScopeLogs(11, 106)),
newResourceLogs(2, newScopeLogs(22, 204)),
newResourceLogs(2, newScopeLogs(22, 205)),
newResourceLogs(2, newScopeLogs(22, 206)),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := setupResourceLogsSlice(tc.input)
expected := setupResourceLogsSlice(tc.expected)
FlattenLogs(actual)
assert.Equal(t, expected.Len(), actual.Len())
for i := 0; i < expected.Len(); i++ {
assert.NoError(t, plogtest.CompareResourceLogs(expected.At(i), actual.At(i)))
}
})
}
}

func TestGroupByResourceLogs(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -296,15 +411,8 @@ func TestGroupByResourceLogs(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := plog.NewResourceLogsSlice()
for _, r := range tc.input {
r.setup(actual.AppendEmpty())
}
expected := plog.NewResourceLogsSlice()
for _, r := range tc.expected {
r.setup(expected.AppendEmpty())
}

actual := setupResourceLogsSlice(tc.input)
expected := setupResourceLogsSlice(tc.expected)
GroupByResourceLogs(actual)
assert.Equal(t, expected.Len(), actual.Len())
for i := 0; i < expected.Len(); i++ {
Expand Down Expand Up @@ -372,15 +480,8 @@ func TestGroupByScopeLogs(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := plog.NewScopeLogsSlice()
for _, s := range tc.input {
s.setup(actual.AppendEmpty())
}
expected := plog.NewScopeLogsSlice()
for _, s := range tc.expected {
s.setup(expected.AppendEmpty())
}

actual := setupScopeLogsSlice(tc.input)
expected := setupScopeLogsSlice(tc.expected)
GroupByScopeLogs(actual)
assert.Equal(t, expected.Len(), actual.Len())
for i := 0; i < expected.Len(); i++ {
Expand Down Expand Up @@ -449,6 +550,14 @@ func (r resourceLogs) setup(rl plog.ResourceLogs) {
}
}

func setupResourceLogsSlice(trls []resourceLogs) plog.ResourceLogsSlice {
rls := plog.NewResourceLogsSlice()
for _, trl := range trls {
trl.setup(rls.AppendEmpty())
}
return rls
}

type scopeLogs struct {
num int
recordNums []int
Expand All @@ -473,3 +582,11 @@ func (s scopeLogs) setup(sl plog.ScopeLogs) {
lr.Body().SetInt(int64(n))
}
}

func setupScopeLogsSlice(tsls []scopeLogs) plog.ScopeLogsSlice {
sls := plog.NewScopeLogsSlice()
for _, tsl := range tsls {
tsl.setup(sls.AppendEmpty())
}
return sls
}

0 comments on commit 89bc3f6

Please sign in to comment.