From cb172a3f4d3dbfdcc1021f334d613c41d5446a23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B9=E6=97=AD?= <22351293@zju.edu.cn> Date: Thu, 31 Oct 2024 14:30:10 +0800 Subject: [PATCH] =?UTF-8?q?Resolved=20"Datakit=20/v1/write/:Category=20def?= =?UTF-8?q?late=20=E8=A7=A3=E5=8E=8B=E7=BC=A9=E5=85=BC=E5=AE=B9=20zlib=20?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E7=9A=84=E7=BC=96=E7=A0=81"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/datakit/zip.go | 9 ++++ internal/httpapi/api_write_test.go | 77 ++++++++++++++++++++++++++---- 2 files changed, 77 insertions(+), 9 deletions(-) diff --git a/internal/datakit/zip.go b/internal/datakit/zip.go index 8bcad9a0e3..c941014994 100644 --- a/internal/datakit/zip.go +++ b/internal/datakit/zip.go @@ -185,6 +185,15 @@ func putDeflateWriter(w *deflateWriter) { } func getDeflateReader(data []byte) *deflateReader { + // DEFLATE contains DEFLATE(1951) and ZLIB(RFC 1950) https://datatracker.ietf.org/doc/html/rfc7230#section-4.2 + // The difference is ZLIB have an extra 2-byte ZLIB header and the checksum of the last 4 bytes + if len(data) > 2 && + (data[0]&0xF) == 0x8 && // Low 4-bits must be 8 + (data[0]&0x80) == 0 && // High-bit must be clear + (((int(data[0])<<8)+int(data[1]))%31) == 0 { // Validate checksum 0x789C + iStartOffset := 2 + data = data[iStartOffset:] + } if x := deflateReaderPool.Get(); x == nil { reader := bytes.NewReader(data) z := flate.NewReader(reader) diff --git a/internal/httpapi/api_write_test.go b/internal/httpapi/api_write_test.go index 185e425268..91d9304390 100644 --- a/internal/httpapi/api_write_test.go +++ b/internal/httpapi/api_write_test.go @@ -9,6 +9,7 @@ import ( "bytes" "compress/flate" "compress/gzip" + "compress/zlib" "encoding/json" "fmt" "io" @@ -625,6 +626,45 @@ func TestAPIWrite(t *testing.T) { }, }, + { + name: `write-json-with-precision-zlib`, + method: "POST", + url: "/v1/write/metric?echo_json=1&precision=s", + body: []byte(`[{"measurement":"abc", "tags": {"t1": "xxx"}, "fields":{"f1": 1.0}, "time":123}]`), + contentType: "application/json", + encodeType: "zlib", + expectStatusCode: 200, + expectBody: []*point.JSONPoint{ + { + Measurement: "abc", + Tags: map[string]string{ + "t1": "xxx", + }, + Fields: map[string]interface{}{ + "f1": 1.0, + }, + Time: 123000000000, + }, + }, + }, + + { + name: "rum-zlib", + method: "POST", + url: "/v1/write/rum", + body: []byte( + "action,sdk_data_id=0.11163.4.5263024948114bc8b0baf950d5a7e2ce,action_id=ad4d88e8b7b24df6b27e719ba7d6e6d3," + + "action_type=launch_cold,action_name=app\\ cold\\ start,session_id=5d99f85983d6483195e855ef7c03b65c," + + "os=Android,application_uuid=04367135-3958-4263-bed0-0aa74e676b88,os_version=12,sdk_name=df_android_rum_sdk,env=prod,version=1.4,device_uuid=9be3739219f8f06b," + + "sdk_package_track=1.3.3,os_version_major=12,sdk_package_native=1.1.1,sdk_package_agent=1.6.3-beta02,service=df_rum_android,sdk_version=1.6.3-beta02," + + "model=Android\\ SDK\\ built\\ for\\ arm64,screen_size=1080*1794,session_type=user,arch=aarch64,app_id=appid_fed4868e0c1c446fb87741cfcaa1c9ba," + + "device=Android duration=112816750i,action_error_count=0i,action_long_task_count=0i,action_resource_count=0i 1730106947448307500", + ), + contentType: "application/line-protocol", + encodeType: "zlib", + expectStatusCode: 200, + }, + { name: `write-json-with-precision-br`, method: "POST", @@ -1019,9 +1059,10 @@ measurement-2,t1=1,t2=2 f1=1,f2=2,f3.14=3.14 123000000000`), // encode body if tc.encodeType != "" { tc.body, err = encodeBody(tc.body, tc.encodeType) - if err != nil { - _ = fmt.Errorf("encodeRequest err: %w", err) + if _, exists := encoders[tc.encodeType]; !exists { + assert.Error(t, err) } + t.Logf("encodeBody: %q", tc.body) } httpRequest, err := http.NewRequest("POST", url, bytes.NewBuffer(tc.body)) if err != nil { @@ -1031,6 +1072,9 @@ measurement-2,t1=1,t2=2 f1=1,f2=2,f3.14=3.14 123000000000`), httpRequest.Header.Set("Content-Type", tc.contentType) // Content-Encoding if tc.encodeType != "" { + if tc.encodeType == "zlib" { + tc.encodeType = "deflate" + } httpRequest.Header.Set("Content-Encoding", tc.encodeType) } @@ -1091,14 +1135,15 @@ measurement-2,t1=1,t2=2 f1=1,f2=2,f3.14=3.14 123000000000`), } } -func encodeBody(body []byte, encodeType string) ([]byte, error) { - encoders := map[string]func([]byte) ([]byte, error){ - "gzip": dkzip.GZip, - "deflate": dkzip.DeflateZip, - "br": dkzip.BrotliZip, - "zstd": dkzip.ZstdZip, - } +var encoders = map[string]func([]byte) ([]byte, error){ + "gzip": dkzip.GZip, + "deflate": dkzip.DeflateZip, + "zlib": encodeZlib, + "br": dkzip.BrotliZip, + "zstd": dkzip.ZstdZip, +} +func encodeBody(body []byte, encodeType string) ([]byte, error) { if encodeFunc, exists := encoders[encodeType]; exists { return encodeFunc(body) } @@ -1106,6 +1151,20 @@ func encodeBody(body []byte, encodeType string) ([]byte, error) { return body, fmt.Errorf("unsupported encoding: %s", encodeType) } +func encodeZlib(body []byte) ([]byte, error) { + var buf bytes.Buffer + w := zlib.NewWriter(&buf) + _, err := w.Write(body) + if err != nil { + return nil, err + } + err = w.Close() + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + func decodeBodyNoPool(body []byte, contentEncoding string) ([]byte, error) { var ( requestBody = bytes.NewReader(body)