diff --git a/processor/transformprocessor/internal/logs/func_unroll.go b/processor/transformprocessor/internal/logs/func_unroll.go index e5cfb39b4311..a3a586c339d6 100644 --- a/processor/transformprocessor/internal/logs/func_unroll.go +++ b/processor/transformprocessor/internal/logs/func_unroll.go @@ -30,7 +30,8 @@ func createUnrollFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.Ex } func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottllog.TransformContext], error) { - changeCounter := 0 + + var currentExpansions []pcommon.Value var unrollType pcommon.ValueType return func(ctx context.Context, tCtx ottllog.TransformContext) (any, error) { value, err := field.Get(ctx, tCtx) @@ -38,22 +39,26 @@ func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottll return nil, fmt.Errorf("failed to get value to unroll: %w", err) } - if changeCounter > 0 { - changeCounter-- - currentLogRecord := tCtx.GetLogRecord() + currentLogRecord := tCtx.GetLogRecord() + if unrollIdx, ok := currentLogRecord.Attributes().Get("unrolled_idx"); ok { + // we're in the middle of unrolling + currentLogRecord.Attributes().Remove("unrolled_idx") switch unrollType { case pcommon.ValueTypeStr: - currentLogRecord.Body().SetStr(currentLogRecord.Body().AsString()) + value := currentExpansions[unrollIdx.Int()] + currentLogRecord.Body().SetStr(value.AsString()) case pcommon.ValueTypeInt: - currentLogRecord.Body().SetInt(currentLogRecord.Body().Int()) + value := currentExpansions[unrollIdx.Int()] + currentLogRecord.Body().SetInt(value.Int()) case pcommon.ValueTypeDouble: - currentLogRecord.Body().SetDouble(currentLogRecord.Body().Double()) + value := currentExpansions[unrollIdx.Int()] + currentLogRecord.Body().SetDouble(value.Double()) case pcommon.ValueTypeBool: - currentLogRecord.Body().SetBool(currentLogRecord.Body().Bool()) + value := currentExpansions[unrollIdx.Int()] + currentLogRecord.Body().SetBool(value.Bool()) case pcommon.ValueTypeMap, pcommon.ValueTypeSlice: - // do nothing default: - return nil, fmt.Errorf("unable to continue unrolling%v", unrollType) + return nil, fmt.Errorf("unable to continue unrolling %v", unrollType) } return nil, nil } @@ -65,28 +70,39 @@ func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottll v := value.At(i) unrollType = v.Type() expansions = append(expansions, v) - value.RemoveIf(func(removeCandidate pcommon.Value) bool { - return removeCandidate == v - }) } default: return nil, fmt.Errorf("input field is not a slice, got %T", value) } scopeLogs := tCtx.GetScopeLogs() - currentRecord := tCtx.GetLogRecord() scopeLogs.LogRecords().RemoveIf(func(removeCandidate plog.LogRecord) bool { return removeCandidate == currentRecord }) - newLogs := plog.NewScopeLogs() - scopeLogs.CopyTo(newLogs) - records := newLogs.LogRecords() - - for _, expansion := range expansions { - newRecord := records.AppendEmpty() + for idx, expansion := range expansions { + newRecord := scopeLogs.LogRecords().AppendEmpty() currentRecord.CopyTo(newRecord) + + // handle current element as base + if idx == 0 { + switch unrollType { + case pcommon.ValueTypeStr: + newRecord.Body().SetStr(expansion.AsString()) + case pcommon.ValueTypeInt: + newRecord.Body().SetInt(expansion.Int()) + case pcommon.ValueTypeDouble: + newRecord.Body().SetDouble(expansion.Double()) + case pcommon.ValueTypeBool: + newRecord.Body().SetBool(expansion.Bool()) + default: + return nil, fmt.Errorf("unable to unroll %v", unrollType) + } + continue + } + // currentLength := scopeLogs.LogRecords().Len() + newRecord.Attributes().PutInt("unrolled_idx", int64(len(currentExpansions)+idx)) switch unrollType { case pcommon.ValueTypeStr: newRecord.Body().SetStr(expansion.AsString()) @@ -99,11 +115,8 @@ func unroll(field ottl.GetSetter[ottllog.TransformContext]) (ottl.ExprFunc[ottll default: return nil, fmt.Errorf("unable to unroll %v", unrollType) } - // figure out the field being set; not always going to be the body - // newRecord.Body().SetStr(expansion) - changeCounter++ } - newLogs.MoveTo(scopeLogs) + currentExpansions = append(currentExpansions, expansions...) return nil, nil }, nil