From 969abf3da274052a843032704a590efb94fce6af Mon Sep 17 00:00:00 2001 From: davidby-influx <72418212+davidby-influx@users.noreply.github.com> Date: Fri, 8 Dec 2023 15:46:11 -0800 Subject: [PATCH 1/3] fix: avoid SIGBUS when reading non-std series segment files (#24509) Some series files which are smaller than the standard sizes cause SIGBUS in influx_inspect and influxd, because entry iteration walks onto mapped memory not backed by the the file. Avoid walking off the end of the file while iterating series entries in oddly sized files. closes https://github.com/influxdata/influxdb/issues/24508 Co-authored-by: Geoffrey Wossum --- .../verify/seriesfile/verify.go | 5 +- tsdb/series_file.go | 4 +- tsdb/series_partition.go | 2 +- tsdb/series_segment.go | 30 +++++-- tsdb/series_segment_test.go | 85 +++++++++++-------- 5 files changed, 78 insertions(+), 48 deletions(-) diff --git a/cmd/influx_inspect/verify/seriesfile/verify.go b/cmd/influx_inspect/verify/seriesfile/verify.go index d9af26327d9..3e9610a1df9 100644 --- a/cmd/influx_inspect/verify/seriesfile/verify.go +++ b/cmd/influx_inspect/verify/seriesfile/verify.go @@ -184,7 +184,7 @@ func (v Verify) VerifySegment(segmentPath string, ids map[uint64]IDData) (valid v.Logger = v.Logger.With(zap.String("segment", segmentName)) v.Logger.Info("Verifying segment") - // Open up the segment and grab it's data. + // Open up the segment and grab its data. segmentID, err := tsdb.ParseSeriesSegmentFilename(segmentName) if err != nil { return false, err @@ -195,7 +195,8 @@ func (v Verify) VerifySegment(segmentPath string, ids map[uint64]IDData) (valid return false, nil } defer segment.Close() - buf := newBuffer(segment.Data()) + // Only walk the file as it exists, not the whole mapping which may be bigger than the file. + buf := newBuffer(segment.Data()[:segment.Size()]) defer func() { if rec := recover(); rec != nil { diff --git a/tsdb/series_file.go b/tsdb/series_file.go index d663825de11..8838e588d80 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -366,9 +366,9 @@ func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte { } // ReadSeriesKey returns the series key from the beginning of the buffer. -func ReadSeriesKey(data []byte) (key, remainder []byte) { +func ReadSeriesKey(data []byte) (key []byte) { sz, n := binary.Uvarint(data) - return data[:int(sz)+n], data[int(sz)+n:] + return data[:int(sz)+n] } func ReadSeriesKeyLen(data []byte) (sz int, remainder []byte) { diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index 9d2796dc593..a202781f446 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -517,7 +517,7 @@ func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte { continue } - key, _ := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize)) + key := ReadSeriesKey(segment.Slice(pos + SeriesEntryHeaderSize)) return key } diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go index d0a90d01f31..4dc5a4913b0 100644 --- a/tsdb/series_segment.go +++ b/tsdb/series_segment.go @@ -92,6 +92,12 @@ func CreateSeriesSegment(id uint16, path string) (*SeriesSegment, error) { // Open memory maps the data file at the file's path. func (s *SeriesSegment) Open() error { if err := func() (err error) { + st, err := os.Stat(s.path) + if err != nil { + return fmt.Errorf("cannot stat %s: %w", s.path, err) + } + s.size = uint32(st.Size()) + // Memory map file data. if s.data, err = mmap.Map(s.path, int64(SeriesSegmentSize(s.id))); err != nil { return err @@ -120,14 +126,16 @@ func (s *SeriesSegment) Path() string { return s.path } // InitForWrite initializes a write handle for the segment. // This is only used for the last segment in the series file. func (s *SeriesSegment) InitForWrite() (err error) { - // Only calculcate segment data size if writing. - for s.size = uint32(SeriesSegmentHeaderSize); s.size < uint32(len(s.data)); { - flag, _, _, sz := ReadSeriesEntry(s.data[s.size:]) + // Only recalculate segment data size if writing. + var size uint32 + for size = uint32(SeriesSegmentHeaderSize); size < s.size; { + flag, _, _, sz := ReadSeriesEntry(s.data[size:s.size]) if !IsValidSeriesEntryFlag(flag) { break } - s.size += uint32(sz) + size += uint32(sz) } + s.size = size // Open file handler for writing & seek to end of data. if s.file, err = os.OpenFile(s.path, os.O_WRONLY|os.O_CREATE, 0666); err != nil { @@ -243,8 +251,8 @@ func (s *SeriesSegment) MaxSeriesID() uint64 { // ForEachEntry executes fn for every entry in the segment. func (s *SeriesSegment) ForEachEntry(fn func(flag uint8, id uint64, offset int64, key []byte) error) error { - for pos := uint32(SeriesSegmentHeaderSize); pos < uint32(len(s.data)); { - flag, id, key, sz := ReadSeriesEntry(s.data[pos:]) + for pos := uint32(SeriesSegmentHeaderSize); pos < s.size; { + flag, id, key, sz := ReadSeriesEntry(s.data[pos:s.size]) if !IsValidSeriesEntryFlag(flag) { break } @@ -337,7 +345,7 @@ func ReadSeriesKeyFromSegments(a []*SeriesSegment, offset int64) []byte { return nil } buf := segment.Slice(pos) - key, _ := ReadSeriesKey(buf) + key := ReadSeriesKey(buf) return key } @@ -416,16 +424,22 @@ func (hdr *SeriesSegmentHeader) WriteTo(w io.Writer) (n int64, err error) { } func ReadSeriesEntry(data []byte) (flag uint8, id uint64, key []byte, sz int64) { + if len(data) <= 0 { + return 0, 0, nil, 1 + } // If flag byte is zero then no more entries exist. flag, data = uint8(data[0]), data[1:] if !IsValidSeriesEntryFlag(flag) { return 0, 0, nil, 1 } + if len(data) < 8 { + return 0, 0, nil, 1 + } id, data = binary.BigEndian.Uint64(data), data[8:] switch flag { case SeriesEntryInsertFlag: - key, _ = ReadSeriesKey(data) + key = ReadSeriesKey(data) } return flag, id, key, int64(SeriesEntryHeaderSize + len(key)) } diff --git a/tsdb/series_segment_test.go b/tsdb/series_segment_test.go index a98eab06ddf..d9f59a69d4b 100644 --- a/tsdb/series_segment_test.go +++ b/tsdb/series_segment_test.go @@ -4,6 +4,7 @@ import ( "bytes" "os" "path/filepath" + "strconv" "testing" "github.com/google/go-cmp/cmp" @@ -142,45 +143,59 @@ func TestSeriesSegmentHeader(t *testing.T) { } func TestSeriesSegment_PartialWrite(t *testing.T) { - dir, cleanup := MustTempDir() - defer cleanup() + for extraSegs := uint64(2000); extraSegs < 4000; extraSegs++ { + func() { + dir, cleanup := MustTempDir() + defer cleanup() - // Create a new initial segment (4mb) and initialize for writing. - segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000")) - if err != nil { - t.Fatal(err) - } else if err := segment.InitForWrite(); err != nil { - t.Fatal(err) - } - defer segment.Close() + // Create a new initial segment (4mb) and initialize for writing. + segment, err := tsdb.CreateSeriesSegment(0, filepath.Join(dir, "0000")) + if err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } + defer segment.Close() - // Write two entries. - if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil { - t.Fatal(err) - } else if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil { - t.Fatal(err) - } - sz := segment.Size() - entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2, tsdb.AppendSeriesKey(nil, []byte("B"), nil))) + // Write two entries. + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1, tsdb.AppendSeriesKey(nil, []byte("A"), nil))); err != nil { + t.Fatal(err) + } - // Close segment. - if err := segment.Close(); err != nil { - t.Fatal(err) - } + // Adding intermediary segments in between "A" and "B" is to try and induce a SIGBUS + // when the file truncation backs over a page. + for i := uint64(0); i < extraSegs; i++ { + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 1+i, tsdb.AppendSeriesKey(nil, []byte(strconv.Itoa(int(i))), nil))); err != nil { + t.Fatal(err) + } + } - // Truncate at each point and reopen. - for i := entrySize; i > 0; i-- { - if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil { - t.Fatal(err) - } - segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000")) - if err := segment.Open(); err != nil { - t.Fatal(err) - } else if err := segment.InitForWrite(); err != nil { - t.Fatal(err) - } else if err := segment.Close(); err != nil { - t.Fatal(err) - } + if _, err := segment.WriteLogEntry(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2+extraSegs, tsdb.AppendSeriesKey(nil, []byte("B"), nil))); err != nil { + t.Fatal(err) + } + sz := segment.Size() + entrySize := len(tsdb.AppendSeriesEntry(nil, tsdb.SeriesEntryInsertFlag, 2+extraSegs, tsdb.AppendSeriesKey(nil, []byte("B"), nil))) + + // Close segment. + if err := segment.Close(); err != nil { + t.Fatal(err) + } + + // Truncate at each point and reopen. + for i := entrySize; i > 0; i-- { + if err := os.Truncate(filepath.Join(dir, "0000"), sz-int64(entrySize-i)); err != nil { + t.Fatal(err) + } + segment := tsdb.NewSeriesSegment(0, filepath.Join(dir, "0000")) + if err := segment.Open(); err != nil { + t.Fatal(err) + } else if err := segment.InitForWrite(); err != nil { + t.Fatal(err) + } else if err := segment.Close(); err != nil { + t.Fatal(err) + } + } + }() } } From 7bd3f89d181fdb789e87f9380961f42d7790de5c Mon Sep 17 00:00:00 2001 From: Geoffrey Wossum Date: Thu, 4 Jan 2024 11:59:28 -0600 Subject: [PATCH 2/3] fix: prevent retention service creating orphaned shard files (#24530) * fix: prevent retention service creating orphaned shard files Under certain circumstances, the retention service can fail to delete shards from the store in a timely manner. When the shard groups are pruned based on age, this leaves orphaned shard files on the disk. The retention service will then not attempt to remove the obsolete shard files because the meta store does not know about them. This can cause excessive disk space usage for some users. This corrects that by requiring shards files be deleted before they can be removed from the meta store. fixes: #24529 --- cmd/influxd/run/server.go | 1 + go.mod | 20 +- go.sum | 39 +-- services/meta/client.go | 15 +- services/meta/client_test.go | 2 + services/meta/data.go | 27 +- services/retention/helpers/test_helpers.go | 55 +++ services/retention/service.go | 226 +++++++++---- services/retention/service_test.go | 375 +++++++++++++++++---- toml/toml.go | 3 +- 10 files changed, 583 insertions(+), 180 deletions(-) create mode 100644 services/retention/helpers/test_helpers.go diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 805f5ecc6e7..e8b747b4659 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -290,6 +290,7 @@ func (s *Server) appendRetentionPolicyService(c retention.Config) { srv := retention.NewService(c) srv.MetaClient = s.MetaClient srv.TSDBStore = s.TSDBStore + srv.DropShardMetaRef = retention.OSSDropShardMetaRef(s.MetaClient) s.Services = append(s.Services, srv) } diff --git a/go.mod b/go.mod index 4aff3ad56b1..fe48ff52165 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/golang-jwt/jwt v3.2.1+incompatible github.com/golang/mock v1.5.0 github.com/golang/snappy v0.0.4 - github.com/google/go-cmp v0.5.7 + github.com/google/go-cmp v0.5.8 github.com/influxdata/flux v0.194.4 github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256 @@ -43,13 +43,14 @@ require ( github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6 go.uber.org/multierr v1.6.0 go.uber.org/zap v1.16.0 - golang.org/x/crypto v0.14.0 - golang.org/x/sync v0.4.0 - golang.org/x/sys v0.13.0 - golang.org/x/term v0.13.0 - golang.org/x/text v0.13.0 + golang.org/x/crypto v0.16.0 + golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 + golang.org/x/sync v0.5.0 + golang.org/x/sys v0.15.0 + golang.org/x/term v0.15.0 + golang.org/x/text v0.14.0 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba - golang.org/x/tools v0.14.0 + golang.org/x/tools v0.16.0 google.golang.org/grpc v1.44.0 google.golang.org/protobuf v1.28.1 ) @@ -148,10 +149,9 @@ require ( github.com/willf/bitset v1.1.9 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.7.0 // indirect - golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect - golang.org/x/mod v0.13.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/mod v0.14.0 // indirect + golang.org/x/net v0.19.0 // indirect golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect gonum.org/v1/gonum v0.11.0 // indirect diff --git a/go.sum b/go.sum index 154dfbc32a6..fc0ee0c7b3b 100644 --- a/go.sum +++ b/go.sum @@ -494,8 +494,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= -github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -1044,8 +1044,8 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1061,8 +1061,9 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 h1:/5Bs7sWi0i3rOVO5KnM55OwugpsD4bRW1zywKoZjbkI= golang.org/x/exp v0.0.0-20211216164055-b2b84827b756/go.mod h1:b9TAUYHmRtqA6klRHApnXMnj+OyLce4yF5cZCUbk2ps= +golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 h1:qCEDpW1G+vcj3Y7Fy52pEM1AWm3abj8WimGYejI3SC4= +golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1101,8 +1102,8 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= -golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= -golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1162,8 +1163,8 @@ golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211118161319-6a13c67c3ce4/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1190,8 +1191,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= -golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1278,13 +1279,13 @@ golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1295,8 +1296,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1383,8 +1384,8 @@ golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= -golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= -golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= +golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM= +golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/services/meta/client.go b/services/meta/client.go index 4497b54e5c7..b337a6b8a17 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -678,24 +678,11 @@ func (c *Client) TruncateShardGroups(t time.Time) error { // PruneShardGroups remove deleted shard groups from the data store. func (c *Client) PruneShardGroups() error { - var changed bool expiration := time.Now().Add(ShardGroupDeletedExpiration) c.mu.Lock() defer c.mu.Unlock() data := c.cacheData.Clone() - for i, d := range data.Databases { - for j, rp := range d.RetentionPolicies { - var remainingShardGroups []ShardGroupInfo - for _, sgi := range rp.ShardGroups { - if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) { - remainingShardGroups = append(remainingShardGroups, sgi) - continue - } - changed = true - } - data.Databases[i].RetentionPolicies[j].ShardGroups = remainingShardGroups - } - } + changed := data.PruneShardGroups(expiration) if changed { return c.commit(data) } diff --git a/services/meta/client_test.go b/services/meta/client_test.go index 109e3b210ae..6cf72b42cec 100644 --- a/services/meta/client_test.go +++ b/services/meta/client_test.go @@ -1075,7 +1075,9 @@ func TestMetaClient_PruneShardGroups(t *testing.T) { data := c.Data() data.Databases[1].RetentionPolicies[0].ShardGroups[0].DeletedAt = expiration + data.Databases[1].RetentionPolicies[0].ShardGroups[0].Shards = nil data.Databases[1].RetentionPolicies[0].ShardGroups[1].DeletedAt = expiration + data.Databases[1].RetentionPolicies[0].ShardGroups[1].Shards = nil if err := c.SetData(&data); err != nil { t.Fatal(err) diff --git a/services/meta/data.go b/services/meta/data.go index 99df2089602..d5cc4a19a31 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -292,8 +292,11 @@ func (data *Data) DropShard(id uint64) { data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Shards = append(shards[:found], shards[found+1:]...) if len(shards) == 1 { - // We just deleted the last shard in the shard group. - data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now() + // We just deleted the last shard in the shard group, but make sure we don't overwrite the timestamp if it + // was already deleted. + if !data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Deleted() { + data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now() + } } return } @@ -447,6 +450,26 @@ func (data *Data) DeleteShardGroup(database, policy string, id uint64) error { return ErrShardGroupNotFound } +// PruneShardGroups removes any shards deleted before expiration and that have no remaining owners. +// Returns true if data is modified. +func (data *Data) PruneShardGroups(expiration time.Time) bool { + var changed bool + for i, d := range data.Databases { + for j, rp := range d.RetentionPolicies { + var remainingShardGroups []ShardGroupInfo + for _, sgi := range rp.ShardGroups { + if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) || len(sgi.Shards) > 0 { + remainingShardGroups = append(remainingShardGroups, sgi) + continue + } + changed = true + } + data.Databases[i].RetentionPolicies[j].ShardGroups = remainingShardGroups + } + } + return changed +} + // CreateContinuousQuery adds a named continuous query to a database. func (data *Data) CreateContinuousQuery(database, name, query string) error { di := data.Database(database) diff --git a/services/retention/helpers/test_helpers.go b/services/retention/helpers/test_helpers.go new file mode 100644 index 00000000000..b20a6559143 --- /dev/null +++ b/services/retention/helpers/test_helpers.go @@ -0,0 +1,55 @@ +package helpers + +import ( + "fmt" + "time" + + "golang.org/x/exp/slices" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/services/meta" +) + +// DataDeleteShardGroup deletes the shard group specified by database, policy, and id from targetData. +// It does this by setting the shard group's DeletedAt time to now. We have to reimplement DeleteShardGroup +// instead of using data's so that the DeletedAt time will be deterministic. We are also not testing +// the functionality of DeleteShardGroup. We are testing if DeleteShardGroup gets called correctly. +func DataDeleteShardGroup(targetData *meta.Data, now time.Time, database, policy string, id uint64) error { + rpi, err := targetData.RetentionPolicy(database, policy) + + if err != nil { + return err + } else if rpi == nil { + return influxdb.ErrRetentionPolicyNotFound(policy) + } + + // Find shard group by ID and set its deletion timestamp. + for i := range rpi.ShardGroups { + if rpi.ShardGroups[i].ID == id { + rpi.ShardGroups[i].DeletedAt = now + return nil + } + } + + return meta.ErrShardGroupNotFound +} + +// DataNukeShardGroup unconditionally removes the shard group identified by targetDB, targetRP, and targetID +// from targetData. There's no meta.Data method to directly remove a shard group, only to mark it deleted and +// then prune it. We can't use the functionality we're testing to generate the expected result. +func DataNukeShardGroup(targetData *meta.Data, targetDB, targetRP string, targetID uint64) error { + rpi, err := targetData.RetentionPolicy(targetDB, targetRP) + if err != nil { + return err + } else if rpi == nil { + return fmt.Errorf("no retention policy found for %q, %q, %d", targetDB, targetRP, targetID) + } + isTargetShardGroup := func(sgi meta.ShardGroupInfo) bool { + return sgi.ID == targetID + } + if !slices.ContainsFunc(rpi.ShardGroups, isTargetShardGroup) { + return fmt.Errorf("shard not found for %q, %q, %d", targetDB, targetRP, targetID) + } + rpi.ShardGroups = slices.DeleteFunc(rpi.ShardGroups, isTargetShardGroup) + return nil +} diff --git a/services/retention/service.go b/services/retention/service.go index 842157f986f..9bc8b56ad0c 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -2,29 +2,46 @@ package retention // import "github.com/influxdata/influxdb/services/retention" import ( + "errors" + "fmt" "sync" "time" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/services/meta" + "github.com/influxdata/influxdb/tsdb" "go.uber.org/zap" ) +type MetaClient interface { + Databases() []meta.DatabaseInfo + DeleteShardGroup(database, policy string, id uint64) error + DropShard(id uint64) error + PruneShardGroups() error +} + // Service represents the retention policy enforcement service. type Service struct { - MetaClient interface { - Databases() []meta.DatabaseInfo - DeleteShardGroup(database, policy string, id uint64) error - PruneShardGroups() error - } + MetaClient TSDBStore interface { ShardIDs() []uint64 DeleteShard(shardID uint64) error } + // DropShardRef is a function that takes a shard ID and removes the + // "reference" to it in the meta data. For OSS, this would be a DropShard + // operation. For Enterprise, this would be a RemoveShardOwner operation. + // Also provided is owners, the list of node IDs of the shard owners + // according to the meta store. For OSS, owners will always be empty. + // Enterprise can use owners to optimize out calls to RemoveShardOwner + // if the current node doesn't actually own the shardID. This prevents + // a lot of unnecessary RPC calls. + DropShardMetaRef func(shardID uint64, owners []uint64) error + config Config - wg sync.WaitGroup - done chan struct{} + + wg sync.WaitGroup + done chan struct{} logger *zap.Logger } @@ -37,12 +54,23 @@ func NewService(c Config) *Service { } } +// OSSDropShardMetaRef creates a closure appropriate for OSS to use as DropShardMetaRef. +func OSSDropShardMetaRef(mc MetaClient) func(uint64, []uint64) error { + return func(shardID uint64, owners []uint64) error { + return mc.DropShard(shardID) + } +} + // Open starts retention policy enforcement. func (s *Service) Open() error { if !s.config.Enabled || s.done != nil { return nil } + if s.DropShardMetaRef == nil { + return fmt.Errorf("invalid nil for retention service DropShardMetaRef") + } + s.logger.Info("Starting retention policy enforcement service", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) s.done = make(chan struct{}) @@ -80,83 +108,139 @@ func (s *Service) run() { return case <-ticker.C: - log, logEnd := logger.NewOperation(s.logger, "Retention policy deletion check", "retention_delete_check") + s.DeletionCheck() + } + } +} - type deletionInfo struct { - db string - rp string +func (s *Service) DeletionCheck() { + log, logEnd := logger.NewOperation(s.logger, "Retention policy deletion check", "retention_delete_check") + defer logEnd() + + type deletionInfo struct { + db string + rp string + owners []uint64 + } + newDeletionInfo := func(db, rp string, si meta.ShardInfo) deletionInfo { + owners := make([]uint64, len(si.Owners)) + for i, o := range si.Owners { + owners[i] = o.NodeID + } + return deletionInfo{db: db, rp: rp, owners: owners} + } + deletedShardIDs := make(map[uint64]deletionInfo) + + dropShardMetaRef := func(id uint64, info deletionInfo) error { + if err := s.DropShardMetaRef(id, info.owners); err != nil { + log.Error("Failed to drop shard meta reference", + logger.Database(info.db), + logger.Shard(id), + logger.RetentionPolicy(info.rp), + zap.Error(err)) + return err + } + return nil + } + + // Mark down if an error occurred during this function so we can inform the + // user that we will try again on the next interval. + // Without the message, they may see the error message and assume they + // have to do it manually. + var retryNeeded bool + dbs := s.MetaClient.Databases() + for _, d := range dbs { + for _, r := range d.RetentionPolicies { + // Build list of already deleted shards. + for _, g := range r.DeletedShardGroups() { + for _, sh := range g.Shards { + deletedShardIDs[sh.ID] = newDeletionInfo(d.Name, r.Name, sh) + } } - deletedShardIDs := make(map[uint64]deletionInfo) - - // Mark down if an error occurred during this function so we can inform the - // user that we will try again on the next interval. - // Without the message, they may see the error message and assume they - // have to do it manually. - var retryNeeded bool - dbs := s.MetaClient.Databases() - for _, d := range dbs { - for _, r := range d.RetentionPolicies { - // Build list of already deleted shards. - for _, g := range r.DeletedShardGroups() { - for _, sh := range g.Shards { - deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name} - } - } - - // Determine all shards that have expired and need to be deleted. - for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { - if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { - log.Info("Failed to delete shard group", - logger.Database(d.Name), - logger.ShardGroup(g.ID), - logger.RetentionPolicy(r.Name), - zap.Error(err)) - retryNeeded = true - continue - } - - log.Info("Deleted shard group", - logger.Database(d.Name), - logger.ShardGroup(g.ID), - logger.RetentionPolicy(r.Name)) - - // Store all the shard IDs that may possibly need to be removed locally. - for _, sh := range g.Shards { - deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name} - } - } + + // Determine all shards that have expired and need to be deleted. + for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { + if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { + log.Info("Failed to delete shard group", + logger.Database(d.Name), + logger.ShardGroup(g.ID), + logger.RetentionPolicy(r.Name), + zap.Error(err)) + retryNeeded = true + continue + } + + log.Info("Deleted shard group", + logger.Database(d.Name), + logger.ShardGroup(g.ID), + logger.RetentionPolicy(r.Name)) + + // Store all the shard IDs that may possibly need to be removed locally. + for _, sh := range g.Shards { + deletedShardIDs[sh.ID] = newDeletionInfo(d.Name, r.Name, sh) } } + } + } - // Remove shards if we store them locally - for _, id := range s.TSDBStore.ShardIDs() { - if info, ok := deletedShardIDs[id]; ok { - if err := s.TSDBStore.DeleteShard(id); err != nil { - log.Info("Failed to delete shard", - logger.Database(info.db), - logger.Shard(id), - logger.RetentionPolicy(info.rp), - zap.Error(err)) - retryNeeded = true - continue - } - log.Info("Deleted shard", + // Remove shards if we store them locally + for _, id := range s.TSDBStore.ShardIDs() { + if info, ok := deletedShardIDs[id]; ok { + delete(deletedShardIDs, id) + log.Info("Attempting deletion of shard from store", + logger.Database(info.db), + logger.Shard(id), + logger.RetentionPolicy(info.rp)) + if err := s.TSDBStore.DeleteShard(id); err != nil { + log.Error("Failed to delete shard", + logger.Database(info.db), + logger.Shard(id), + logger.RetentionPolicy(info.rp), + zap.Error(err)) + if errors.Is(err, tsdb.ErrShardNotFound) { + // At first you wouldn't think this could happen, we're iterating over shards + // in the store. However, if this has been a very long running operation the + // shard could have been dropped from the store while we were working on other shards. + log.Warn("Shard does not exist in store, continuing retention removal", logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp)) + } else { + retryNeeded = true + continue } } - - if err := s.MetaClient.PruneShardGroups(); err != nil { - log.Info("Problem pruning shard groups", zap.Error(err)) + log.Info("Deleted shard", + logger.Database(info.db), + logger.Shard(id), + logger.RetentionPolicy(info.rp)) + if err := dropShardMetaRef(id, info); err != nil { + // removeShardMetaReference already logged the error. retryNeeded = true + continue } + } + } - if retryNeeded { - log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) - } - - logEnd() + // Check for expired phantom shards that exist in the metadata but not in the store. + for id, info := range deletedShardIDs { + log.Error("Expired phantom shard detected during retention check, removing from metadata", + logger.Database(info.db), + logger.Shard(id), + logger.RetentionPolicy(info.rp)) + if err := dropShardMetaRef(id, info); err != nil { + // removeShardMetaReference already logged the error. + retryNeeded = true + continue } } + + if err := s.MetaClient.PruneShardGroups(); err != nil { + log.Info("Problem pruning shard groups", zap.Error(err)) + retryNeeded = true + } + + if retryNeeded { + log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) + } } diff --git a/services/retention/service_test.go b/services/retention/service_test.go index f118215d381..db98ce4feb2 100644 --- a/services/retention/service_test.go +++ b/services/retention/service_test.go @@ -3,16 +3,22 @@ package retention_test import ( "bytes" "fmt" + "math" "reflect" "sync" "testing" "time" + "golang.org/x/exp/maps" + "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/services/retention" + "github.com/influxdata/influxdb/services/retention/helpers" "github.com/influxdata/influxdb/toml" + "github.com/influxdata/influxdb/tsdb" + "github.com/stretchr/testify/require" ) func TestService_OpenDisabled(t *testing.T) { @@ -57,6 +63,201 @@ func TestService_OpenClose(t *testing.T) { } } +func TestRetention_DeletionCheck(t *testing.T) { + cfg := retention.Config{ + Enabled: true, + + // This test runs DeletionCheck manually for the test cases. It is about checking + // the results of DeletionCheck, not if it is run properly on the timer. + // Set a long check interval so the deletion check won't run on its own during the test. + CheckInterval: toml.Duration(time.Hour * 24), + } + + now := time.Now().UTC() + shardDuration := time.Hour * 24 * 14 + shardGroupDuration := time.Hour * 24 + foreverShard := uint64(1003) // a shard that can't be deleted + phantomShard := uint64(1006) + dataUT := &meta.Data{ + Users: []meta.UserInfo{}, + Databases: []meta.DatabaseInfo{ + { + Name: "servers", + DefaultRetentionPolicy: "autogen", + RetentionPolicies: []meta.RetentionPolicyInfo{ + { + Name: "autogen", + ReplicaN: 2, + Duration: shardDuration, + ShardGroupDuration: shardGroupDuration, + ShardGroups: []meta.ShardGroupInfo{ + // Shard group 1 is deleted and expired group with a single shard. + { + ID: 1, + StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 0*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + DeletedAt: now.Truncate(time.Hour * 24).Add(-1 * shardDuration).Add(meta.ShardGroupDeletedExpiration), + Shards: []meta.ShardInfo{ + { + ID: 101, + }, + }, + }, + // Shard group 2 is deleted and expired with no shards. + // Note a shard group with no shards should not exist anyway. + { + ID: 2, + StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 2*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + DeletedAt: now.Truncate(time.Hour * 24).Add(-1 * shardDuration).Add(meta.ShardGroupDeletedExpiration), + }, + // Shard group 3 is deleted and expired, but its shard can not be deleted. + { + ID: 3, + StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 2*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + DeletedAt: now.Truncate(time.Hour * 24).Add(-1 * shardDuration).Add(meta.ShardGroupDeletedExpiration), + Shards: []meta.ShardInfo{ + { + ID: foreverShard, + }, + }, + }, + // Shard group 4 is deleted, but not expired with a single shard. + { + ID: 4, + StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 0*shardGroupDuration), + EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration), + DeletedAt: now.Truncate(time.Hour * 24), + Shards: []meta.ShardInfo{ + { + ID: 104, + }, + }, + }, + // Shard group 5 is active and should not be touched. + { + ID: 5, + StartTime: now.Truncate(time.Hour * 24).Add(0 * shardGroupDuration), + EndTime: now.Truncate(time.Hour * 24).Add(1 * shardGroupDuration), + Shards: []meta.ShardInfo{ + { + ID: 105, + }, + }, + }, + // Shard group 6 is a deleted and expired shard group with a phantom shard that doesn't exist in the store. + { + ID: 6, + StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 0*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration), + DeletedAt: now.Truncate(time.Hour * 24).Add(-1 * shardDuration).Add(meta.ShardGroupDeletedExpiration), + Shards: []meta.ShardInfo{ + { + ID: phantomShard, + }, + }, + }, + }, + }, + }, + }, + }, + } + expData := dataUT.Clone() + + databasesFn := func() []meta.DatabaseInfo { + return dataUT.Databases + } + deleteShardGroupFn := func(database, policy string, id uint64) error { + return helpers.DataDeleteShardGroup(dataUT, now, database, policy, id) + } + dropShardFn := func(id uint64) error { + dataUT.DropShard(id) + return nil + } + pruneShardGroupsFn := func() error { + // PruneShardGroups is the core functionality we are testing. We must use meta.Data's version. + dataUT.PruneShardGroups(now.Add(meta.ShardGroupDeletedExpiration)) + return nil + } + mc := &internal.MetaClientMock{ + DatabasesFn: databasesFn, + DeleteShardGroupFn: deleteShardGroupFn, + DropShardFn: dropShardFn, + PruneShardGroupsFn: pruneShardGroupsFn, + } + + collectShards := func(d *meta.Data) map[uint64]struct{} { + s := map[uint64]struct{}{} + for _, db := range d.Databases { + for _, rp := range db.RetentionPolicies { + for _, sg := range rp.ShardGroups { + for _, sh := range sg.Shards { + s[sh.ID] = struct{}{} + } + } + } + } + return s + } + + // All these shards are yours except phantomShard. Attempt no deletion there. + shards := collectShards(dataUT) + delete(shards, phantomShard) + + shardIDs := func() []uint64 { + return maps.Keys(shards) + } + deleteShard := func(shardID uint64) error { + if _, ok := shards[shardID]; !ok { + return tsdb.ErrShardNotFound + } + if shardID == foreverShard { + return fmt.Errorf("unknown error deleting shard files for shard %d", shardID) + } + delete(shards, shardID) + return nil + } + store := &internal.TSDBStoreMock{ + DeleteShardFn: deleteShard, + ShardIDsFn: shardIDs, + } + + s := retention.NewService(cfg) + s.MetaClient = mc + s.TSDBStore = store + s.DropShardMetaRef = retention.OSSDropShardMetaRef(s.MetaClient) + require.NoError(t, s.Open()) + s.DeletionCheck() + + // Adjust expData to make it look like we expect. + require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 1)) + require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 2)) + expData.DropShard(104) + require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 6)) + + require.Equal(t, expData, dataUT) + require.Equal(t, collectShards(expData), shards) + + // Check that multiple duplicate calls to DeletionCheck don't make further changes. + // This is mostly for our friend foreverShard. + for i := 0; i < 10; i++ { + s.DeletionCheck() + require.Equal(t, expData, dataUT) + require.Equal(t, collectShards(expData), shards) + } + + // Our heroic support team hos fixed the issue with foreverShard. + foreverShard = math.MaxUint64 + s.DeletionCheck() + require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 3)) + require.Equal(t, expData, dataUT) + require.Equal(t, collectShards(expData), shards) + + require.NoError(t, s.Close()) +} + func TestService_CheckShards(t *testing.T) { now := time.Now() // Account for any time difference that could cause some of the logic in @@ -68,43 +269,44 @@ func TestService_CheckShards(t *testing.T) { time.Sleep(100 * time.Millisecond) } - data := []meta.DatabaseInfo{ - { - Name: "db0", - - DefaultRetentionPolicy: "rp0", - RetentionPolicies: []meta.RetentionPolicyInfo{ - { - Name: "rp0", - ReplicaN: 1, - Duration: time.Hour, - ShardGroupDuration: time.Hour, - ShardGroups: []meta.ShardGroupInfo{ - { - ID: 1, - StartTime: now.Truncate(time.Hour).Add(-2 * time.Hour), - EndTime: now.Truncate(time.Hour).Add(-1 * time.Hour), - Shards: []meta.ShardInfo{ - {ID: 2}, - {ID: 3}, + data := meta.Data{ + Databases: []meta.DatabaseInfo{ + { + Name: "db0", + DefaultRetentionPolicy: "rp0", + RetentionPolicies: []meta.RetentionPolicyInfo{ + { + Name: "rp0", + ReplicaN: 1, + Duration: time.Hour, + ShardGroupDuration: time.Hour, + ShardGroups: []meta.ShardGroupInfo{ + { + ID: 1, + StartTime: now.Truncate(time.Hour).Add(-2 * time.Hour), + EndTime: now.Truncate(time.Hour).Add(-1 * time.Hour), + Shards: []meta.ShardInfo{ + {ID: 2}, + {ID: 3}, + }, }, - }, - { - ID: 4, - StartTime: now.Truncate(time.Hour).Add(-1 * time.Hour), - EndTime: now.Truncate(time.Hour), - Shards: []meta.ShardInfo{ - {ID: 5}, - {ID: 6}, + { + ID: 4, + StartTime: now.Truncate(time.Hour).Add(-1 * time.Hour), + EndTime: now.Truncate(time.Hour), + Shards: []meta.ShardInfo{ + {ID: 5}, + {ID: 6}, + }, }, - }, - { - ID: 7, - StartTime: now.Truncate(time.Hour), - EndTime: now.Truncate(time.Hour).Add(time.Hour), - Shards: []meta.ShardInfo{ - {ID: 8}, - {ID: 9}, + { + ID: 7, + StartTime: now.Truncate(time.Hour), + EndTime: now.Truncate(time.Hour).Add(time.Hour), + Shards: []meta.ShardInfo{ + {ID: 8}, + {ID: 9}, + }, }, }, }, @@ -117,13 +319,13 @@ func TestService_CheckShards(t *testing.T) { config.CheckInterval = toml.Duration(10 * time.Millisecond) s := NewService(config) s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo { - return data + return data.Databases } done := make(chan struct{}) deletedShardGroups := make(map[string]struct{}) s.MetaClient.DeleteShardGroupFn = func(database, policy string, id uint64) error { - for _, dbi := range data { + for _, dbi := range data.Databases { if dbi.Name == database { for _, rpi := range dbi.RetentionPolicies { if rpi.Name == policy { @@ -148,6 +350,25 @@ func TestService_CheckShards(t *testing.T) { return nil } + dropShardDone := make(chan struct{}) + droppedShards := make(map[uint64]struct{}) + s.MetaClient.DropShardFn = func(id uint64) error { + data.DropShard(id) + if _, ok := droppedShards[id]; ok { + t.Errorf("duplicate DropShard") + } + droppedShards[id] = struct{}{} + if got, want := droppedShards, map[uint64]struct{}{ + 2: struct{}{}, + 3: struct{}{}, + }; reflect.DeepEqual(got, want) { + close(dropShardDone) + } else if len(got) > len(want) { + t.Errorf("dropped too many shards") + } + return nil + } + pruned := false closing := make(chan struct{}) s.MetaClient.PruneShardGroupsFn = func() error { @@ -162,11 +383,21 @@ func TestService_CheckShards(t *testing.T) { return nil } + activeShards := map[uint64]struct{}{ + 2: struct{}{}, + 3: struct{}{}, + 5: struct{}{}, + 6: struct{}{}, + } deletedShards := make(map[uint64]struct{}) s.TSDBStore.ShardIDsFn = func() []uint64 { - return []uint64{2, 3, 5, 6} + return maps.Keys(activeShards) } s.TSDBStore.DeleteShardFn = func(shardID uint64) error { + if _, ok := activeShards[shardID]; !ok { + return tsdb.ErrShardNotFound + } + delete(activeShards, shardID) deletedShards[shardID] = struct{}{} return nil } @@ -182,6 +413,14 @@ func TestService_CheckShards(t *testing.T) { timer := time.NewTimer(100 * time.Millisecond) select { + case <-dropShardDone: + timer.Stop() + case <-timer.C: + t.Errorf("timeout waiting for shard to be dropped") + } + + timer = time.NewTimer(100 * time.Millisecond) + select { case <-done: timer.Stop() case <-timer.C: @@ -239,30 +478,32 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) var mu sync.Mutex shards := []uint64{3, 5, 8, 9, 11, 12} localShards := []uint64{3, 5, 8, 9, 11, 12} - databases := []meta.DatabaseInfo{ - { - Name: "db0", - RetentionPolicies: []meta.RetentionPolicyInfo{ - { - Name: "autogen", - Duration: 24 * time.Hour, - ShardGroupDuration: 24 * time.Hour, - ShardGroups: []meta.ShardGroupInfo{ - { - ID: 1, - StartTime: time.Date(1980, 1, 1, 0, 0, 0, 0, time.UTC), - EndTime: time.Date(1981, 1, 1, 0, 0, 0, 0, time.UTC), - Shards: []meta.ShardInfo{ - {ID: 3}, {ID: 9}, + data := meta.Data{ + Databases: []meta.DatabaseInfo{ + { + Name: "db0", + RetentionPolicies: []meta.RetentionPolicyInfo{ + { + Name: "autogen", + Duration: 24 * time.Hour, + ShardGroupDuration: 24 * time.Hour, + ShardGroups: []meta.ShardGroupInfo{ + { + ID: 1, + StartTime: time.Date(1980, 1, 1, 0, 0, 0, 0, time.UTC), + EndTime: time.Date(1981, 1, 1, 0, 0, 0, 0, time.UTC), + Shards: []meta.ShardInfo{ + {ID: 3}, {ID: 9}, + }, }, - }, - { - ID: 2, - StartTime: time.Now().Add(-1 * time.Hour), - EndTime: time.Now(), - DeletedAt: time.Now(), - Shards: []meta.ShardInfo{ - {ID: 11}, {ID: 12}, + { + ID: 2, + StartTime: time.Now().Add(-1 * time.Hour), + EndTime: time.Now(), + DeletedAt: time.Now(), + Shards: []meta.ShardInfo{ + {ID: 11}, {ID: 12}, + }, }, }, }, @@ -281,7 +522,7 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo { mu.Lock() defer mu.Unlock() - return databases + return data.Databases } s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error { @@ -305,11 +546,18 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) } } shards = newShards - databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = time.Now().UTC() + data.Databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = time.Now().UTC() mu.Unlock() return nil } + s.MetaClient.DropShardFn = func(shardID uint64) error { + mu.Lock() + defer mu.Unlock() + data.DropShard(shardID) + return nil + } + s.MetaClient.PruneShardGroupsFn = func() error { // When this is called all shards that have been deleted from the meta // store (expired) should also have been deleted from disk. @@ -393,5 +641,6 @@ func NewService(c retention.Config) *Service { s.Service.MetaClient = s.MetaClient s.Service.TSDBStore = s.TSDBStore + s.Service.DropShardMetaRef = retention.OSSDropShardMetaRef(s.Service.MetaClient) return s } diff --git a/toml/toml.go b/toml/toml.go index 645069cd306..c12faa5d5de 100644 --- a/toml/toml.go +++ b/toml/toml.go @@ -117,8 +117,9 @@ func (m *FileMode) UnmarshalText(text []byte) error { func (m FileMode) MarshalText() (text []byte, err error) { if m != 0 { return []byte(fmt.Sprintf("%04o", m)), nil + } else { + return []byte(""), nil } - return nil, nil } type Group int From 7dec23b4119f51c69cdabd82d73cdeedbf38045a Mon Sep 17 00:00:00 2001 From: Jack <56563911+jdockerty@users.noreply.github.com> Date: Tue, 9 Jan 2024 21:16:03 +0000 Subject: [PATCH 3/3] docs: update contributing.md (#24561) --- CONTRIBUTING.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a98179bc4b2..7937d9fa5c3 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -69,7 +69,7 @@ second to sign our CLA, which can be found Installing Go ------------- -InfluxDB requires Go 1.15. +InfluxDB requires Go 1.20. At InfluxDB we find gvm, a Go version manager, useful for installing Go. For instructions on how to install it see [the gvm page on github](https://github.com/moovweb/gvm). @@ -77,8 +77,12 @@ on how to install it see [the gvm page on github](https://github.com/moovweb/gvm After installing gvm you can install and set the default go version by running the following: - gvm install go1.15 - gvm use go1.15 --default + # Retrieve the version in use from the go.mod file. + INFLUXDB_GO_VERSION=$(go mod edit -json | jq -r .Go) + + # Use gvm to install the correct version + gvm install go${INFLUXDB_GO_VERSION} + gvm use go${INFLUXDB_GO_VERSION} --default Revision Control Systems -------------