Skip to content

Commit

Permalink
Merge branch '2368-iss-bodydecompress' into 'dev'
Browse files Browse the repository at this point in the history
resolove "Datakit /v1/write/:Category 支持数据压缩"

See merge request cloudcare-tools/datakit!3208
  • Loading branch information
谭彪 committed Oct 18, 2024
2 parents d4419ee + e0887fc commit 4b5bcf3
Show file tree
Hide file tree
Showing 72 changed files with 257,378 additions and 18 deletions.
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
github.com/itchyny/timefmt-go v0.1.5 // indirect
github.com/jessevdk/go-flags v1.5.0
github.com/kardianos/service v1.2.1
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/compress v1.17.9
github.com/mssola/user_agent v0.6.0 // indirect
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417
github.com/openzipkin/zipkin-go v0.2.2
Expand Down Expand Up @@ -177,7 +177,7 @@ require (
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/goccy/go-json v0.10.2
github.com/godbus/dbus/v5 v5.0.6 // indirect
github.com/gogo/googleapis v1.4.0 // indirect
github.com/gogo/status v1.0.3 // indirect
Expand Down Expand Up @@ -390,6 +390,8 @@ require (
github.com/valyala/fastjson v1.6.3
)

require github.com/andybalholm/brotli v1.0.4

replace (
github.com/c-bata/go-prompt => github.com/coanor/go-prompt v0.2.6
github.com/google/gopacket => github.com/GuanceCloud/gopacket v0.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43 h1:WFwa9pqou0Nb4Ddf
github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43/go.mod h1:tJPYQG4mnMeUtQvQKNkbsFrnmZOg59Qnf8CcctFv5v4=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
github.com/andybalholm/brotli v1.0.2/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
Expand Down
277 changes: 277 additions & 0 deletions internal/datakit/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ package datakit

import (
"bytes"
"compress/flate"
"compress/gzip"
"fmt"
"io"
"sync"

"github.com/andybalholm/brotli"
"github.com/klauspost/compress/zstd"
)

var gzReaderPool,
Expand Down Expand Up @@ -144,3 +148,276 @@ func UnGZip(data []byte) ([]byte, error) {

return raw, zr.z.Close()
}

var deflateReaderPool,
deflateWriterPool sync.Pool

type deflateWriter struct {
buf *bytes.Buffer
z *flate.Writer
}

type deflateReader struct {
reader *bytes.Reader
z io.ReadCloser
}

func getDeflateWriter() *deflateWriter {
if x := deflateWriterPool.Get(); x == nil {
buf := new(bytes.Buffer)
z, _ := flate.NewWriter(buf, flate.DefaultCompression)
return &deflateWriter{
buf: buf,
z: z,
}
} else {
return x.(*deflateWriter)
}
}

func putDeflateWriter(w *deflateWriter) {
if w == nil {
return
}
w.buf.Reset()
w.z.Reset(w.buf)
deflateWriterPool.Put(w)
}

func getDeflateReader(data []byte) *deflateReader {
if x := deflateReaderPool.Get(); x == nil {
reader := bytes.NewReader(data)
z := flate.NewReader(reader)
return &deflateReader{
z: z,
reader: reader,
}
} else {
r := x.(*deflateReader)
r.reader.Reset(data)
if err := r.z.(flate.Resetter).Reset(r.reader, nil); err != nil {
return nil
}
return r
}
}

func putDeflateReader(r *deflateReader) {
if r == nil {
return
}
deflateReaderPool.Put(r)
}

func DeflateZip(data []byte) ([]byte, error) {
zw := getDeflateWriter()
defer putDeflateWriter(zw)

if _, err := zw.z.Write(data); err != nil {
return nil, err
}

if err := zw.z.Flush(); err != nil {
return nil, err
}

if err := zw.z.Close(); err != nil {
return nil, err
}

return zw.buf.Bytes(), nil
}

func UnDeflateZip(data []byte) ([]byte, error) {
zr := getDeflateReader(data)
defer putDeflateReader(zr)

raw, err := io.ReadAll(zr.z)
if err != nil {
return nil, err
}

return raw, zr.z.Close()
}

var brReaderPool,
brWriterPool sync.Pool

type brotliWriter struct {
buf *bytes.Buffer
z *brotli.Writer
}

type brotliReader struct {
reader *bytes.Reader
z *brotli.Reader
}

func getBrotliWriter() *brotliWriter {
if x := brWriterPool.Get(); x == nil {
buf := new(bytes.Buffer)
z := brotli.NewWriter(buf)
return &brotliWriter{
buf: buf,
z: z,
}
} else {
return x.(*brotliWriter)
}
}

func putBrotliWriter(w *brotliWriter) {
if w == nil {
return
}
w.buf.Reset()
w.z.Reset(w.buf)
brWriterPool.Put(w)
}

func getBrotliReader(data []byte) *brotliReader {
if x := brReaderPool.Get(); x == nil {
reader := bytes.NewReader(data)
z := brotli.NewReader(reader)
return &brotliReader{
z: z,
reader: reader,
}
} else {
r := x.(*brotliReader)
r.reader.Reset(data)
if err := r.z.Reset(r.reader); err != nil {
return nil
}
return r
}
}

func putBrotliReader(r *brotliReader) {
if r == nil {
return
}
brReaderPool.Put(r)
}

func BrotliZip(data []byte) ([]byte, error) {
zw := getBrotliWriter()
defer putBrotliWriter(zw)

if _, err := zw.z.Write(data); err != nil {
return nil, err
}

if err := zw.z.Flush(); err != nil {
return nil, err
}

if err := zw.z.Close(); err != nil {
return nil, err
}

return zw.buf.Bytes(), nil
}

func UnBrotliZip(data []byte) ([]byte, error) {
zr := getBrotliReader(data)
defer putBrotliReader(zr)

raw, err := io.ReadAll(zr.z)
if err != nil {
return nil, err
}

return raw, nil
}

var zstdReaderPool,
zstdWriterPool sync.Pool

type zstdWriter struct {
buf *bytes.Buffer
z *zstd.Encoder
}

type zstdReader struct {
reader *bytes.Reader
z *zstd.Decoder
}

func getZstdWriter() *zstdWriter {
if x := zstdWriterPool.Get(); x == nil {
buf := new(bytes.Buffer)
z, _ := zstd.NewWriter(buf)
return &zstdWriter{
buf: buf,
z: z,
}
} else {
return x.(*zstdWriter)
}
}

func putZstdWriter(w *zstdWriter) {
if w == nil {
return
}
w.buf.Reset()
w.z.Reset(w.buf)
zstdWriterPool.Put(w)
}

func getZstdReader(data []byte) *zstdReader {
if x := zstdReaderPool.Get(); x == nil {
reader := bytes.NewReader(data)
z, _ := zstd.NewReader(reader)
return &zstdReader{
z: z,
reader: reader,
}
} else {
r := x.(*zstdReader)
r.reader.Reset(data)
if err := r.z.Reset(r.reader); err != nil {
return nil
}
return r
}
}

func putZstdReader(r *zstdReader) {
if r == nil {
return
}
zstdReaderPool.Put(r)
}

func ZstdZip(data []byte) ([]byte, error) {
zw := getZstdWriter()
defer putZstdWriter(zw)

if _, err := zw.z.Write(data); err != nil {
return nil, err
}

if err := zw.z.Flush(); err != nil {
return nil, err
}

if err := zw.z.Close(); err != nil {
return nil, err
}

return zw.buf.Bytes(), nil
}

func UnZstdZip(data []byte) ([]byte, error) {
zr := getZstdReader(data)
defer putZstdReader(zr)

raw, err := io.ReadAll(zr.z)
if err != nil {
return nil, err
}

return raw, nil
}
Loading

0 comments on commit 4b5bcf3

Please sign in to comment.