Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store sticky pages separately #12198

Merged
merged 10 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 61 additions & 25 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ void TExecutor::Broken() {
void TExecutor::RecreatePageCollectionsCache() noexcept
{
PrivatePageCache = MakeHolder<TPrivatePageCache>();
StickyPagesMemory = 0;

Stats->PacksMetaBytes = 0;

Expand Down Expand Up @@ -611,6 +612,10 @@ void TExecutor::AddSingleCache(const TIntrusivePtr<TPrivatePageCache::TInfo> &in
{
PrivatePageCache->RegisterPageCollection(info);
Send(MakeSharedPageCacheId(), new NSharedCache::TEvAttach(info->PageCollection, SelfId()));

StickyPagesMemory += info->GetStickySize();

Counters->Simple()[TExecutorCounters::CACHE_TOTAL_STICKY] = StickyPagesMemory;
}

void TExecutor::DropCachesOfBundle(const NTable::TPart &part) noexcept
Expand All @@ -633,13 +638,20 @@ void TExecutor::DropCachesOfBundle(const NTable::TPart &part) noexcept

void TExecutor::DropSingleCache(const TLogoBlobID &label) noexcept
{
auto toActivate = PrivatePageCache->ForgetPageCollection(label);
auto pageCollection = PrivatePageCache->GetPageCollection(label);

ui64 stickySize = pageCollection->GetStickySize();
Y_ABORT_UNLESS(StickyPagesMemory >= stickySize);
StickyPagesMemory -= stickySize;

auto toActivate = PrivatePageCache->ForgetPageCollection(pageCollection);
ActivateWaitingTransactions(toActivate);
if (!PrivatePageCache->Info(label))
Send(MakeSharedPageCacheId(), new NSharedCache::TEvInvalidate(label));

Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize;
Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize;
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_STICKY] = StickyPagesMemory;
}

void TExecutor::TranslateCacheTouchesToSharedCache() {
Expand Down Expand Up @@ -667,6 +679,28 @@ void TExecutor::RequestInMemPagesForDatabase(bool pendingOnly) {
}
}

void TExecutor::StickInMemPages(NSharedCache::TEvResult *msg) {
const auto& scheme = Scheme();
for (auto& pr : scheme.Tables) {
const ui32 tid = pr.first;
auto subset = Database->Subset(tid, NTable::TEpoch::Max(), { } , { });
for (auto &partView : subset->Flatten) {
auto partStore = partView.As<NTable::TPartStore>();
for (auto &pageCollection : partStore->PageCollections) {
// Note: page collection search optimization seems useless
if (pageCollection->PageCollection == msg->Origin) {
ui64 stickySizeBefore = pageCollection->GetStickySize();
for (auto& loaded : msg->Loaded) {
pageCollection->AddSticky(loaded.PageId, loaded.Page);
}
StickyPagesMemory += pageCollection->GetStickySize() - stickySizeBefore;
}
}
}
}
// Note: the next call of ProvideBlock will also fill pages bodies
}

TExecutorCaches TExecutor::CleanupState() {
TExecutorCaches caches;

Expand Down Expand Up @@ -1347,14 +1381,8 @@ void TExecutor::RequestInMemPagesForPartStore(ui32 tableId, const NTable::TPartV

if (stickyGroup) {
auto req = partView.As<NTable::TPartStore>()->GetPages(groupIndex);

TPrivatePageCache::TInfo *info = PrivatePageCache->Info(req->PageCollection->Label());
Y_ABORT_UNLESS(info);
for (ui32 pageId : req->Pages)
PrivatePageCache->MarkSticky(pageId, info);

// TODO: only request missing pages
RequestFromSharedCache(req, NBlockIO::EPriority::Bkgr, EPageCollectionRequest::CacheSync);
RequestFromSharedCache(req, NBlockIO::EPriority::Bkgr, EPageCollectionRequest::InMemPages);
}
}
}
Expand Down Expand Up @@ -1792,13 +1820,17 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
}

void TExecutor::UnpinTransactionPages(TSeat &seat) {
Y_ABORT_UNLESS(TransactionPagesMemory >= seat.MemoryTouched);
TransactionPagesMemory -= seat.MemoryTouched;

size_t unpinnedPages = 0;
PrivatePageCache->UnpinPages(seat.Pinned, unpinnedPages);
seat.Pinned.clear();
seat.MemoryTouched = 0;

Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize;
Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize;
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_USED] = TransactionPagesMemory;
}

void TExecutor::ReleaseTxData(TSeat &seat, ui64 requested, const TActorContext &ctx)
Expand Down Expand Up @@ -1830,12 +1862,14 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &
ui64 prevTouched = seat->MemoryTouched;

PrivatePageCache->PinTouches(seat->Pinned, touchedPages, newPinnedPages, seat->MemoryTouched);
TransactionPagesMemory += seat->MemoryTouched - prevTouched;

ui32 newTouchedPages = newPinnedPages;
ui64 newTouchedBytes = seat->MemoryTouched - prevTouched;
prevTouched = seat->MemoryTouched;

PrivatePageCache->PinToLoad(seat->Pinned, newPinnedPages, seat->MemoryTouched);
TransactionPagesMemory += seat->MemoryTouched - prevTouched;

if (seat->AttachedMemory)
Memory->AttachMemory(*seat);
Expand Down Expand Up @@ -1978,6 +2012,7 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &

Counters->Simple()[TExecutorCounters::CACHE_PINNED_SET] = PrivatePageCache->GetStats().PinnedSetSize;
Counters->Simple()[TExecutorCounters::CACHE_PINNED_LOAD] = PrivatePageCache->GetStats().PinnedLoadSize;
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_USED] = TransactionPagesMemory;
}

void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &env,
Expand Down Expand Up @@ -2665,20 +2700,19 @@ void TExecutor::Handle(TEvents::TEvFlushLog::TPtr &ev) {
}

void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
const bool failed = (ev->Get()->Status != NKikimrProto::OK);
NSharedCache::TEvResult *msg = ev->Get();
const bool failed = (msg->Status != NKikimrProto::OK);

if (auto logl = Logger->Log(failed ? ELnLev::Info : ELnLev::Debug)) {
logl
<< NFmt::Do(*this) << " got result " << NFmt::Do(*ev->Get())
<< ", category " << ev->Cookie;
}

switch (EPageCollectionRequest(ev->Cookie)) {
switch (auto requestType = EPageCollectionRequest(ev->Cookie)) {
case EPageCollectionRequest::Cache:
case EPageCollectionRequest::CacheSync:
case EPageCollectionRequest::InMemPages:
{
auto *msg = ev->CastAsLocal<NSharedCache::TEvResult>();

TPrivatePageCache::TInfo *collectionInfo = PrivatePageCache->Info(msg->Origin->Label());
if (!collectionInfo) // collection could be outdated
return;
Expand All @@ -2695,6 +2729,9 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
return Broken();
}

if (requestType == EPageCollectionRequest::InMemPages) {
StickInMemPages(msg);
}
for (auto& loaded : msg->Loaded) {
TPrivatePageCache::TPage::TWaitQueuePtr transactionsToActivate = PrivatePageCache->ProvideBlock(std::move(loaded), collectionInfo);
ActivateWaitingTransactions(transactionsToActivate);
Expand All @@ -2704,8 +2741,6 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {

case EPageCollectionRequest::PendingInit:
{
auto *msg = ev->CastAsLocal<NSharedCache::TEvResult>();

const auto *pageCollection = msg->Origin.Get();
TPendingPartSwitch *foundSwitch = nullptr;
TPendingPartSwitch::TNewBundle *foundBundle = nullptr;
Expand Down Expand Up @@ -2756,6 +2791,7 @@ void TExecutor::Handle(NSharedCache::TEvResult::TPtr &ev) {
return;

default:
Y_DEBUG_ABORT_S("Unexpected request " << ev->Cookie);
break;
}
}
Expand Down Expand Up @@ -3460,8 +3496,8 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)

CompactionLogic->UpdateLogUsage(LogicRedo->GrabLogUsage());

const ui64 partSwitchCpuuS = ui64(1000000. * partSwitchCpuTimer.Passed());
Counters->Percentile()[TExecutorCounters::TX_PERCENTILE_PARTSWITCH_CPUTIME].IncrementFor(partSwitchCpuuS);
const ui64 partSwitchCpuUs = ui64(1000000. * partSwitchCpuTimer.Passed());
Counters->Percentile()[TExecutorCounters::TX_PERCENTILE_PARTSWITCH_CPUTIME].IncrementFor(partSwitchCpuUs);

if (msg->YellowMoveChannels || msg->YellowStopChannels) {
CheckYellow(std::move(msg->YellowMoveChannels), std::move(msg->YellowStopChannels));
Expand Down Expand Up @@ -3490,13 +3526,11 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)

void TExecutor::UpdateUsedTabletMemory() {
// Estimate memory usage for internal executor structures:
UsedTabletMemory = 50 << 10; // 50kb
UsedTabletMemory = 50_KB;

// Count the number of bytes kept in private cache (can't be offloaded right now):
if (PrivatePageCache) {
UsedTabletMemory += PrivatePageCache->GetStats().TotalPinnedBody;
UsedTabletMemory += PrivatePageCache->GetStats().PinnedLoadSize;
}
// Count the number of bytes that can't be offloaded right now:
UsedTabletMemory += StickyPagesMemory;
UsedTabletMemory += TransactionPagesMemory;

// Estimate memory used by internal database structures:
auto &counters = Database->Counters();
Expand Down Expand Up @@ -3589,8 +3623,9 @@ void TExecutor::UpdateCounters(const TActorContext &ctx) {
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_SHARED_BODY].Set(stats.TotalSharedBody);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_PINNED_BODY].Set(stats.TotalPinnedBody);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_EXCLUSIVE].Set(stats.TotalExclusive);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_STICKY].Set(stats.TotalSticky);
}
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_STICKY].Set(StickyPagesMemory);
Counters->Simple()[TExecutorCounters::CACHE_TOTAL_USED].Set(TransactionPagesMemory);

const auto &memory = Memory->Stats();

Expand Down Expand Up @@ -4108,7 +4143,8 @@ void TExecutor::RenderHtmlPage(NMon::TEvRemoteHttpInfo::TPtr &ev) const {
DIV_CLASS("row") {str << "Total bytes in shared cache: " << PrivatePageCache->GetStats().TotalSharedBody; }
DIV_CLASS("row") {str << "Total bytes in local cache: " << PrivatePageCache->GetStats().TotalPinnedBody; }
DIV_CLASS("row") {str << "Total bytes exclusive to local cache: " << PrivatePageCache->GetStats().TotalExclusive; }
DIV_CLASS("row") {str << "Total bytes marked as sticky: " << PrivatePageCache->GetStats().TotalSticky; }
DIV_CLASS("row") {str << "Total bytes marked as sticky: " << StickyPagesMemory; }
DIV_CLASS("row") {str << "Total bytes currently in use: " << TransactionPagesMemory; }

if (GcLogic) {
TAG(TH3) {str << "Gc logic:";}
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ struct TPendingPartSwitch {
enum class EPageCollectionRequest : ui64 {
Undefined = 0,
Cache = 1,
CacheSync,
InMemPages,
PendingInit,
BootLogic,
};
Expand Down Expand Up @@ -465,6 +465,8 @@ class TExecutor
size_t ReadyPartSwitches = 0;

ui64 UsedTabletMemory = 0;
ui64 StickyPagesMemory = 0;
ui64 TransactionPagesMemory = 0;

TActorContext OwnerCtx() const;

Expand Down Expand Up @@ -515,6 +517,7 @@ class TExecutor
void TranslateCacheTouchesToSharedCache();
void RequestInMemPagesForDatabase(bool pendingOnly = false);
void RequestInMemPagesForPartStore(ui32 tableId, const NTable::TPartView &partView, const THashSet<NTable::TTag> &stickyColumns);
void StickInMemPages(NSharedCache::TEvResult *msg);
THashSet<NTable::TTag> GetStickyColumns(ui32 tableId);
void RequestFromSharedCache(TAutoPtr<NPageCollection::TFetch> fetch,
NBlockIO::EPriority way, EPageCollectionRequest requestCategory);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/flat_executor_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace NTabletFlatExecutor {
XX(COMPACTION_READ_IN_FLY, "CompactionReadInFly") \
XX(DB_FLAT_INDEX_BYTES, "DbFlatIndexBytes") \
XX(DB_B_TREE_INDEX_BYTES, "DbBTreeIndexBytes") \
XX(CACHE_TOTAL_USED, "CacheTotalUsed") \

// don't change order!
#define FLAT_EXECUTOR_CUMULATIVE_COUNTERS_MAP(XX) \
Expand Down
Loading
Loading