Skip to content

Commit

Permalink
making partition state index read only with no copy. (#20699)
Browse files Browse the repository at this point in the history
1. making partition state index read-only with no copy.
2. reducing the heap allocation in partition state b-tree.

Approved by: @XuPeng-SH
  • Loading branch information
gouhongshen authored Dec 11, 2024
1 parent f1acf1d commit 1c91772
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 35 deletions.
22 changes: 11 additions & 11 deletions pkg/vm/engine/disttae/logtailreplay/blocks_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (p *PartitionState) newTombstoneObjectsIter(
snapshot types.TS,
onlyVisible bool) (ObjectsIter, error) {

iter := p.tombstoneObjectDTSIndex.Copy().Iter()
iter := p.tombstoneObjectDTSIndex.Iter()
if onlyVisible {
pivot := ObjectEntry{
ObjectInfo{
Expand All @@ -99,7 +99,7 @@ func (p *PartitionState) newTombstoneObjectsIter(
if !iter.Prev() && p.tombstoneObjectDTSIndex.Len() > 0 {
// reset iter only when seeked to the first item
iter.Release()
iter = p.tombstoneObjectDTSIndex.Copy().Iter()
iter = p.tombstoneObjectDTSIndex.Iter()
}
}

Expand All @@ -115,7 +115,7 @@ func (p *PartitionState) newDataObjectIter(
snapshot types.TS,
onlyVisible bool) (ObjectsIter, error) {

iter := p.dataObjectsNameIndex.Copy().Iter()
iter := p.dataObjectsNameIndex.Iter()
ret := &objectsIter{
onlyVisible: onlyVisible,
ts: snapshot,
Expand Down Expand Up @@ -165,7 +165,7 @@ func (p *PartitionState) HasTombstoneChanged(from, to types.TS) (exist bool) {
if p.tombstoneObjectDTSIndex.Len() == 0 {
return false
}
iter := p.tombstoneObjectDTSIndex.Copy().Iter()
iter := p.tombstoneObjectDTSIndex.Iter()
defer iter.Release()

// Created after from
Expand Down Expand Up @@ -195,7 +195,7 @@ func (p *PartitionState) GetChangedObjsBetween(
inserted = make(map[objectio.ObjectNameShort]struct{})
deleted = make(map[objectio.ObjectNameShort]struct{})

iter := p.dataObjectTSIndex.Copy().Iter()
iter := p.dataObjectTSIndex.Iter()
defer iter.Release()

for ok := iter.Seek(ObjectIndexByTSEntry{
Expand Down Expand Up @@ -223,7 +223,7 @@ func (p *PartitionState) GetChangedObjsBetween(
}

func (p *PartitionState) BlockPersisted(blockID *types.Blockid) bool {
iter := p.dataObjectsNameIndex.Copy().Iter()
iter := p.dataObjectsNameIndex.Iter()
defer iter.Release()

pivot := ObjectEntry{}
Expand All @@ -241,7 +241,7 @@ func (p *PartitionState) CollectObjectsBetween(
start, end types.TS,
) (insertList, deletedList []objectio.ObjectStats) {

iter := p.dataObjectTSIndex.Copy().Iter()
iter := p.dataObjectTSIndex.Iter()
defer iter.Release()

if !iter.Seek(ObjectIndexByTSEntry{
Expand All @@ -250,7 +250,7 @@ func (p *PartitionState) CollectObjectsBetween(
return
}

nameIdx := p.dataObjectsNameIndex.Copy()
nameIdx := p.dataObjectsNameIndex

for ok := true; ok; ok = iter.Next() {
entry := iter.Item()
Expand Down Expand Up @@ -303,9 +303,9 @@ func (p *PartitionState) CheckIfObjectDeletedBeforeTS(

var tree *btree.BTreeG[ObjectEntry]
if isTombstone {
tree = p.tombstoneObjectsNameIndex.Copy()
tree = p.tombstoneObjectsNameIndex
} else {
tree = p.dataObjectsNameIndex.Copy()
tree = p.dataObjectsNameIndex
}

var stats objectio.ObjectStats
Expand All @@ -324,7 +324,7 @@ func (p *PartitionState) CheckIfObjectDeletedBeforeTS(
}

func (p *PartitionState) GetObject(name objectio.ObjectNameShort) (ObjectInfo, bool) {
iter := p.dataObjectsNameIndex.Copy().Iter()
iter := p.dataObjectsNameIndex.Iter()
defer iter.Release()

pivot := ObjectEntry{}
Expand Down
8 changes: 4 additions & 4 deletions pkg/vm/engine/disttae/logtailreplay/change_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,17 +513,17 @@ func NewBaseHandler(state *PartitionState, changesHandle *ChangeHandler, start,
}
var iter btree.IterG[ObjectEntry]
if tombstone {
iter = state.tombstoneObjectsNameIndex.Copy().Iter()
iter = state.tombstoneObjectsNameIndex.Iter()
} else {
iter = state.dataObjectsNameIndex.Copy().Iter()
iter = state.dataObjectsNameIndex.Iter()
}
defer iter.Release()
if tombstone {
dataIter := state.dataObjectsNameIndex.Copy().Iter()
dataIter := state.dataObjectsNameIndex.Iter()
p.fillInSkipTS(dataIter, start, end)
dataIter.Release()
}
rowIter := state.rows.Copy().Iter()
rowIter := state.rows.Iter()
defer rowIter.Release()
p.inMemoryHandle = p.newBatchHandleWithRowIterator(ctx, rowIter, start, end, tombstone, mp)
aobj, cnObj := p.getObjectEntries(iter, start, end)
Expand Down
27 changes: 13 additions & 14 deletions pkg/vm/engine/disttae/logtailreplay/partition_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,17 +587,15 @@ func (p *PartitionState) Copy() *PartitionState {
rows: p.rows.Copy(),
dataObjectsNameIndex: p.dataObjectsNameIndex.Copy(),
tombstoneObjectsNameIndex: p.tombstoneObjectsNameIndex.Copy(),
//blockDeltas: p.blockDeltas.Copy(),
rowPrimaryKeyIndex: p.rowPrimaryKeyIndex.Copy(),
inMemTombstoneRowIdIndex: p.inMemTombstoneRowIdIndex.Copy(),
noData: p.noData,
//dirtyBlocks: p.dirtyBlocks.Copy(),
dataObjectTSIndex: p.dataObjectTSIndex.Copy(),
tombstoneObjectDTSIndex: p.tombstoneObjectDTSIndex.Copy(),
shared: p.shared,
lastFlushTimestamp: p.lastFlushTimestamp,
start: p.start,
end: p.end,
rowPrimaryKeyIndex: p.rowPrimaryKeyIndex.Copy(),
inMemTombstoneRowIdIndex: p.inMemTombstoneRowIdIndex.Copy(),
noData: p.noData,
dataObjectTSIndex: p.dataObjectTSIndex.Copy(),
tombstoneObjectDTSIndex: p.tombstoneObjectDTSIndex.Copy(),
shared: p.shared,
lastFlushTimestamp: p.lastFlushTimestamp,
start: p.start,
end: p.end,
}
if len(p.checkpoints) > 0 {
state.checkpoints = make([]string, len(p.checkpoints))
Expand Down Expand Up @@ -642,7 +640,8 @@ func NewPartitionState(
tid uint64,
) *PartitionState {
opts := btree.Options{
Degree: 64,
Degree: 32, // may good for heap alloc
NoLocks: true,
}
ps := &PartitionState{
service: service,
Expand Down Expand Up @@ -795,7 +794,7 @@ func (p *PartitionState) PKExistInMemBetween(
to types.TS,
keys [][]byte,
) (bool, bool) {
iter := p.rowPrimaryKeyIndex.Copy().Iter()
iter := p.rowPrimaryKeyIndex.Iter()
pivot := RowEntry{
Time: types.BuildTS(math.MaxInt64, math.MaxUint32),
}
Expand Down Expand Up @@ -952,7 +951,7 @@ func (p *PartitionState) ScanRows(
}

func (p *PartitionState) CheckRowIdDeletedInMem(ts types.TS, rowId types.Rowid) bool {
iter := p.rows.Copy().Iter()
iter := p.rows.Iter()
defer iter.Release()

if !iter.Seek(RowEntry{
Expand Down
12 changes: 6 additions & 6 deletions pkg/vm/engine/disttae/logtailreplay/rows_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func (p *primaryKeyDelIter) Next() bool {
}

func (p *PartitionState) NewRowsIter(ts types.TS, blockID *types.Blockid, iterDeleted bool) *rowsIter {
iter := p.rows.Copy().Iter()
iter := p.rows.Iter()
ret := &rowsIter{
ts: ts,
iter: iter,
Expand All @@ -606,13 +606,13 @@ func (p *PartitionState) NewPrimaryKeyIter(
ts types.TS,
spec PrimaryKeyMatchSpec,
) *primaryKeyIter {
index := p.rowPrimaryKeyIndex.Copy()
index := p.rowPrimaryKeyIndex
return &primaryKeyIter{
ts: ts,
spec: spec,
iter: index.Iter(),
primaryIndex: index,
rows: p.rows.Copy(),
rows: p.rows,
}
}

Expand All @@ -621,15 +621,15 @@ func (p *PartitionState) NewPrimaryKeyDelIter(
spec PrimaryKeyMatchSpec,
bid *types.Blockid,
) *primaryKeyDelIter {
index := p.rowPrimaryKeyIndex.Copy()
index := p.rowPrimaryKeyIndex
delIter := &primaryKeyDelIter{
primaryKeyIter: primaryKeyIter{
ts: *ts,
spec: spec,
primaryIndex: index,
iter: index.Iter(),
rows: p.rows.Copy(),
tombstoneRowIdIdx: p.inMemTombstoneRowIdIndex.Copy(),
rows: p.rows,
tombstoneRowIdIdx: p.inMemTombstoneRowIdIndex,
},
bid: *bid,
}
Expand Down

0 comments on commit 1c91772

Please sign in to comment.