Skip to content

Commit

Permalink
Resolved "Datakit /v1/write/:Category deflate 解压缩兼容 zlib 格式的编码"
Browse files Browse the repository at this point in the history
  • Loading branch information
meetzouxu authored and 谭彪 committed Oct 31, 2024
1 parent cf73feb commit cb172a3
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 9 deletions.
9 changes: 9 additions & 0 deletions internal/datakit/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ func putDeflateWriter(w *deflateWriter) {
}

func getDeflateReader(data []byte) *deflateReader {
// DEFLATE contains DEFLATE(1951) and ZLIB(RFC 1950) https://datatracker.ietf.org/doc/html/rfc7230#section-4.2
// The difference is ZLIB have an extra 2-byte ZLIB header and the checksum of the last 4 bytes
if len(data) > 2 &&
(data[0]&0xF) == 0x8 && // Low 4-bits must be 8
(data[0]&0x80) == 0 && // High-bit must be clear
(((int(data[0])<<8)+int(data[1]))%31) == 0 { // Validate checksum 0x789C
iStartOffset := 2
data = data[iStartOffset:]
}
if x := deflateReaderPool.Get(); x == nil {
reader := bytes.NewReader(data)
z := flate.NewReader(reader)
Expand Down
77 changes: 68 additions & 9 deletions internal/httpapi/api_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"compress/flate"
"compress/gzip"
"compress/zlib"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -625,6 +626,45 @@ func TestAPIWrite(t *testing.T) {
},
},

{
name: `write-json-with-precision-zlib`,
method: "POST",
url: "/v1/write/metric?echo_json=1&precision=s",
body: []byte(`[{"measurement":"abc", "tags": {"t1": "xxx"}, "fields":{"f1": 1.0}, "time":123}]`),
contentType: "application/json",
encodeType: "zlib",
expectStatusCode: 200,
expectBody: []*point.JSONPoint{
{
Measurement: "abc",
Tags: map[string]string{
"t1": "xxx",
},
Fields: map[string]interface{}{
"f1": 1.0,
},
Time: 123000000000,
},
},
},

{
name: "rum-zlib",
method: "POST",
url: "/v1/write/rum",
body: []byte(
"action,sdk_data_id=0.11163.4.5263024948114bc8b0baf950d5a7e2ce,action_id=ad4d88e8b7b24df6b27e719ba7d6e6d3," +
"action_type=launch_cold,action_name=app\\ cold\\ start,session_id=5d99f85983d6483195e855ef7c03b65c," +
"os=Android,application_uuid=04367135-3958-4263-bed0-0aa74e676b88,os_version=12,sdk_name=df_android_rum_sdk,env=prod,version=1.4,device_uuid=9be3739219f8f06b," +
"sdk_package_track=1.3.3,os_version_major=12,sdk_package_native=1.1.1,sdk_package_agent=1.6.3-beta02,service=df_rum_android,sdk_version=1.6.3-beta02," +
"model=Android\\ SDK\\ built\\ for\\ arm64,screen_size=1080*1794,session_type=user,arch=aarch64,app_id=appid_fed4868e0c1c446fb87741cfcaa1c9ba," +
"device=Android duration=112816750i,action_error_count=0i,action_long_task_count=0i,action_resource_count=0i 1730106947448307500",
),
contentType: "application/line-protocol",
encodeType: "zlib",
expectStatusCode: 200,
},

{
name: `write-json-with-precision-br`,
method: "POST",
Expand Down Expand Up @@ -1019,9 +1059,10 @@ measurement-2,t1=1,t2=2 f1=1,f2=2,f3.14=3.14 123000000000`),
// encode body
if tc.encodeType != "" {
tc.body, err = encodeBody(tc.body, tc.encodeType)
if err != nil {
_ = fmt.Errorf("encodeRequest err: %w", err)
if _, exists := encoders[tc.encodeType]; !exists {
assert.Error(t, err)
}
t.Logf("encodeBody: %q", tc.body)
}
httpRequest, err := http.NewRequest("POST", url, bytes.NewBuffer(tc.body))
if err != nil {
Expand All @@ -1031,6 +1072,9 @@ measurement-2,t1=1,t2=2 f1=1,f2=2,f3.14=3.14 123000000000`),
httpRequest.Header.Set("Content-Type", tc.contentType)
// Content-Encoding
if tc.encodeType != "" {
if tc.encodeType == "zlib" {
tc.encodeType = "deflate"
}
httpRequest.Header.Set("Content-Encoding", tc.encodeType)
}

Expand Down Expand Up @@ -1091,21 +1135,36 @@ measurement-2,t1=1,t2=2 f1=1,f2=2,f3.14=3.14 123000000000`),
}
}

func encodeBody(body []byte, encodeType string) ([]byte, error) {
encoders := map[string]func([]byte) ([]byte, error){
"gzip": dkzip.GZip,
"deflate": dkzip.DeflateZip,
"br": dkzip.BrotliZip,
"zstd": dkzip.ZstdZip,
}
var encoders = map[string]func([]byte) ([]byte, error){
"gzip": dkzip.GZip,
"deflate": dkzip.DeflateZip,
"zlib": encodeZlib,
"br": dkzip.BrotliZip,
"zstd": dkzip.ZstdZip,
}

func encodeBody(body []byte, encodeType string) ([]byte, error) {
if encodeFunc, exists := encoders[encodeType]; exists {
return encodeFunc(body)
}

return body, fmt.Errorf("unsupported encoding: %s", encodeType)
}

func encodeZlib(body []byte) ([]byte, error) {
var buf bytes.Buffer
w := zlib.NewWriter(&buf)
_, err := w.Write(body)
if err != nil {
return nil, err
}
err = w.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func decodeBodyNoPool(body []byte, contentEncoding string) ([]byte, error) {
var (
requestBody = bytes.NewReader(body)
Expand Down

0 comments on commit cb172a3

Please sign in to comment.