Skip to content

Commit

Permalink
Merge branch 'master' into 30-support-asctime-field-for-timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
KevRiver committed Dec 11, 2024
2 parents 4546a39 + d50f608 commit cfb5e32
Show file tree
Hide file tree
Showing 9 changed files with 527 additions and 28 deletions.
3 changes: 2 additions & 1 deletion cmd/humanlog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,10 @@ func newApp() *cli.App {
return fmt.Errorf("this feature requires a valid machine ID, which requires an environment. failed to login: %v", err)
}
}
llIceptor := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn}))
apiHttpClient := getHTTPClient(cctx, getAPIUrl(cctx))
apiClientOpts := []connect.ClientOption{
connect.WithInterceptors(auth.Interceptors(ll, getTokenSource(cctx))...),
connect.WithInterceptors(auth.Interceptors(llIceptor, getTokenSource(cctx))...),
}

machineID = uint64(*state.MachineID)
Expand Down
81 changes: 67 additions & 14 deletions json_handler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package humanlog

import (
"bytes"
"encoding/json"
"fmt"
"math"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -98,14 +99,23 @@ func getFlattenedFields(v map[string]interface{}) map[string]string {
extValues := make(map[string]string)
for key, nestedVal := range v {
switch valTyped := nestedVal.(type) {
case float64:
if valTyped-math.Floor(valTyped) < 0.000001 && valTyped < 1e9 {
extValues[key] = fmt.Sprintf("%d", int(valTyped))
} else {
extValues[key] = fmt.Sprintf("%g", valTyped)
case json.Number:
if z, err := valTyped.Int64(); err == nil {
extValues[key] = fmt.Sprintf("%d", z)
continue
}
if f, err := valTyped.Float64(); err == nil {
extValues[key] = fmt.Sprintf("%g", f)
continue
}
extValues[key] = valTyped.String()
case string:
extValues[key] = fmt.Sprintf("%q", valTyped)
case []interface{}:
flattenedArrayFields := getFlattenedArrayFields(valTyped)
for k, v := range flattenedArrayFields {
extValues[key+"."+k] = v
}
case map[string]interface{}:
flattenedFields := getFlattenedFields(valTyped)
for keyNested, valStr := range flattenedFields {
Expand All @@ -118,10 +128,45 @@ func getFlattenedFields(v map[string]interface{}) map[string]string {
return extValues
}

func getFlattenedArrayFields(data []interface{}) map[string]string {
flattened := make(map[string]string)
for i, v := range data {
switch vt := v.(type) {
case json.Number:
if z, err := vt.Int64(); err == nil {
flattened[strconv.Itoa(i)] = fmt.Sprintf("%d", z)
} else if f, err := vt.Float64(); err == nil {
flattened[strconv.Itoa(i)] = fmt.Sprintf("%g", f)
} else {
flattened[strconv.Itoa(i)] = vt.String()
}
case string:
flattened[strconv.Itoa(i)] = vt
case []interface{}:
flattenedArrayFields := getFlattenedArrayFields(vt)
for k, v := range flattenedArrayFields {
flattened[fmt.Sprintf("%d.%s", i, k)] = v
}
case map[string]interface{}:
flattenedFields := getFlattenedFields(vt)
for k, v := range flattenedFields {
flattened[fmt.Sprintf("%d.%s", i, k)] = v
}
default:
flattened[strconv.Itoa(i)] = fmt.Sprintf("%v", vt)
}
}
return flattened
}

// UnmarshalJSON sets the fields of the handler.
func (h *JSONHandler) UnmarshalJSON(data []byte) bool {

dec := json.NewDecoder(bytes.NewReader(data))
dec.UseNumber()

raw := make(map[string]interface{})
err := json.Unmarshal(data, &raw)
err := dec.Decode(&raw)
if err != nil {
return false
}
Expand Down Expand Up @@ -163,19 +208,27 @@ func (h *JSONHandler) UnmarshalJSON(data []byte) bool {

for key, val := range raw {
switch v := val.(type) {
case float64:
if v-math.Floor(v) < 0.000001 && v < 1e9 {
// looks like an integer that's not too large
h.Fields[key] = fmt.Sprintf("%d", int(v))
} else {
h.Fields[key] = fmt.Sprintf("%g", v)
case json.Number:
if z, err := v.Int64(); err == nil {
h.Fields[key] = fmt.Sprintf("%d", z)
continue
}
if f, err := v.Float64(); err == nil {
h.Fields[key] = fmt.Sprintf("%g", f)
continue
}
h.Fields[key] = v.String()
case string:
h.Fields[key] = fmt.Sprintf("%q", v)
case []interface{}:
flattenedArrayFields := getFlattenedArrayFields(v)
for k, v := range flattenedArrayFields {
h.Fields[key+"."+k] = v
}
case map[string]interface{}:
flattenedFields := getFlattenedFields(v)
for keyNested, val := range flattenedFields {
h.Fields[key+"."+keyNested] = fmt.Sprintf("%v", val)
h.Fields[key+"."+keyNested] = val
}
default:
h.Fields[key] = fmt.Sprintf("%v", v)
Expand Down
45 changes: 45 additions & 0 deletions json_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,51 @@ func TestJSONHandler_UnmarshalJSON_ParsesCustomMultiNestedFields(t *testing.T) {
}
}

func TestJsonHandler_TryHandle_LargeNumbers(t *testing.T) {
h := humanlog.JSONHandler{Opts: humanlog.DefaultOptions()}
ev := new(typesv1.StructuredLogEvent)
raw := []byte(`{"storage":{"session.id":1730187806608637000, "some": {"float": 1.2345}}}`)
if !h.TryHandle(raw, ev) {
t.Fatalf("failed to handle log")
}
require.Equal(t, "1.2345", h.Fields["storage.some.float"])
require.Equal(t, "1730187806608637000", h.Fields["storage.session.id"])
}

func TestJsonHandler_TryHandle_FlattendArrayFields(t *testing.T) {
handler := humanlog.JSONHandler{Opts: humanlog.DefaultOptions()}
ev := new(typesv1.StructuredLogEvent)
raw := []byte(`{"peers":[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}],"storage":{"session.id":1730187806608637000, "some": {"float": 1.2345}}}`)
if !handler.TryHandle(raw, ev) {
t.Fatalf("failed to handle log")
}
require.Equal(t, "\"10.244.0.126:8083\"", handler.Fields["peers.0.ID"])
require.Equal(t, "\"10.244.0.126:8083\"", handler.Fields["peers.0.URI"])
require.Equal(t, "\"10.244.0.206:8083\"", handler.Fields["peers.1.ID"])
require.Equal(t, "\"10.244.0.206:8083\"", handler.Fields["peers.1.URI"])
require.Equal(t, "\"10.244.1.150:8083\"", handler.Fields["peers.2.ID"])
require.Equal(t, "\"10.244.1.150:8083\"", handler.Fields["peers.2.URI"])
}

func TestJsonHandler_TryHandle_FlattenedArrayFields_NestedArray(t *testing.T) {
handler := humanlog.JSONHandler{Opts: humanlog.DefaultOptions()}
ev := new(typesv1.StructuredLogEvent)
raw := []byte(`{"peers":[[1,2,3.14],[4,50.55,[6,7]],["hello","world"],{"foo":"bar"}]}`)
if !handler.TryHandle(raw, ev) {
t.Fatalf("failed to handle log")
}
require.Equal(t, "1", handler.Fields["peers.0.0"])
require.Equal(t, "2", handler.Fields["peers.0.1"])
require.Equal(t, "3.14", handler.Fields["peers.0.2"])
require.Equal(t, "4", handler.Fields["peers.1.0"])
require.Equal(t, "50.55", handler.Fields["peers.1.1"])
require.Equal(t, "6", handler.Fields["peers.1.2.0"])
require.Equal(t, "7", handler.Fields["peers.1.2.1"])
require.Equal(t, "hello", handler.Fields["peers.2.0"])
require.Equal(t, "world", handler.Fields["peers.2.1"])
require.Equal(t, "\"bar\"", handler.Fields["peers.3.foo"])
}

func TestParseAsctimeFields(t *testing.T) {
tests := []struct {
name string
Expand Down
17 changes: 17 additions & 0 deletions optimize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package humanlog

// moveToFront moves the element at index `i` to the front
// of the slice
func moveToFront[El any](i int, s []El) []El {
if i == 0 {
return s
}
el := s[i]
for j := i; j > 0; j-- {
s[j] = s[j-1]
}
s[0] = el
return s
}

const dynamicReordering = true
47 changes: 38 additions & 9 deletions scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ func Scan(ctx context.Context, src io.Reader, sink sink.Sink, opts *HandlerOptio
data := new(typesv1.StructuredLogEvent)
ev.Structured = data

handlers := []func([]byte, *typesv1.StructuredLogEvent) bool{
jsonEntry.TryHandle,
logfmtEntry.TryHandle,
func(lineData []byte, data *typesv1.StructuredLogEvent) bool {
return tryDockerComposePrefix(lineData, data, &jsonEntry)
},
func(lineData []byte, data *typesv1.StructuredLogEvent) bool {
return tryDockerComposePrefix(lineData, data, &logfmtEntry)
},
func(lineData []byte, data *typesv1.StructuredLogEvent) bool {
return tryZapDevPrefix(lineData, data, &jsonEntry)
},
}

skipNextScan := false
for {
if !in.Scan() {
Expand Down Expand Up @@ -65,21 +79,36 @@ func Scan(ctx context.Context, src io.Reader, sink sink.Sink, opts *HandlerOptio

// remove that pesky syslog crap
lineData = bytes.TrimPrefix(lineData, []byte("@cee: "))
switch {

case jsonEntry.TryHandle(lineData, data):
handled := false
handled_line:
for i, tryHandler := range handlers {
if tryHandler(lineData, data) {
if dynamicReordering {
handlers = moveToFront(i, handlers)
}
handled = true
break handled_line
}
}
if !handled {
ev.Structured = nil
}
// switch {

case logfmtEntry.TryHandle(lineData, data):
// case jsonEntry.TryHandle(lineData, data):

case tryDockerComposePrefix(lineData, data, &jsonEntry):
// case logfmtEntry.TryHandle(lineData, data):

case tryDockerComposePrefix(lineData, data, &logfmtEntry):
// case tryDockerComposePrefix(lineData, data, &jsonEntry):

case tryZapDevPrefix(lineData, data, &jsonEntry):
// case tryDockerComposePrefix(lineData, data, &logfmtEntry):

default:
ev.Structured = nil
}
// case tryZapDevPrefix(lineData, data, &jsonEntry):

// default:

// }
if err := sink.Receive(ctx, ev); err != nil {
return err
}
Expand Down
Loading

0 comments on commit cfb5e32

Please sign in to comment.