Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(v2): background compaction cleanup #3694

Merged
merged 38 commits into from
Nov 25, 2024

Conversation

kolesnikovae
Copy link
Collaborator

@kolesnikovae kolesnikovae commented Nov 15, 2024

The change moves storage cleanup to the compaction worker service and alters the compaction orchestration: now we explicitly replicate state updates. The change is still being tested; however, the PR is ready for review.

Please refer to the README for details.

I'm going to improve the way compaction strategy is configured, and add compaction metrics (in compaction planner/scheduler/worker) before merging the PR.

@kolesnikovae kolesnikovae force-pushed the feat/compaction-background-cleanup branch from 49ff50d to 000525c Compare November 18, 2024 08:56
# Conflicts:
#	api/gen/proto/go/metastore/v1/compactor.pb.go
#	api/gen/proto/go/metastore/v1/index.pb.go
#	api/gen/proto/go/metastore/v1/index_vtproto.pb.go
#	api/gen/proto/go/metastore/v1/metastorev1connect/index.connect.go
#	api/gen/proto/go/metastore/v1/types.pb.go
#	api/gen/proto/go/metastore/v1/types_vtproto.pb.go
#	api/metastore/v1/index.proto
#	api/metastore/v1/types.proto
#	pkg/experiment/metastore/cleaner_raft_handler.go
#	pkg/experiment/metastore/cleaner_service.go
#	pkg/experiment/metastore/client/methods.go
#	pkg/experiment/metastore/compaction_planner.go
#	pkg/experiment/metastore/compaction_raft_handler.go
#	pkg/experiment/metastore/compaction_service.go
#	pkg/experiment/metastore/fsm/fsm.go
#	pkg/experiment/metastore/index/index.go
#	pkg/experiment/metastore/index/store.go
#	pkg/experiment/metastore/index_service.go
#	pkg/experiment/metastore/markers/deletion_markers.go
#	pkg/experiment/metastore/metastore.go
#	pkg/experiment/metastore/raftnode/node.go
@kolesnikovae kolesnikovae marked this pull request as ready for review November 19, 2024 11:13
@kolesnikovae kolesnikovae requested a review from a team as a code owner November 19, 2024 11:13
@kolesnikovae kolesnikovae requested a review from a team as a code owner November 20, 2024 10:28
@kolesnikovae
Copy link
Collaborator Author

NB: in the latest optimization I broke block cleanup – fixing it now

Copy link
Contributor

@aleks-p aleks-p left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of replicating the state of the compaction plan explicitly, it should dramatically reduce the likelihood of the state being inconsistent between replicas. It also solves the state issues when rolling out code changes to replicas.

I am not entirely sold on the implementation itself. I think we can proceed with merging this, but I would try to see if we can simplify a few things in a future iteration. My main concern is that this will be harder to maintain. We introduce many new concepts (a scheduler, planner, etc.), multiple layers of queues and many types representing different representations of compaction jobs. I fear that some parts (e.g., the compaction queue) will be hard to reason about when the knowledge about the internal workings is not as fresh as now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of these types seem to be used internally, can this be moved out of the api folder?

Comment on lines 40 to 41
workers int
free atomic.Int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type is called Worker and it is not immediately obvious what are these 2 fields for. Are there other names that can communicate that more clearly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to threads and capactity correspondingly.

ctx context.Context
cancel context.CancelFunc
*metastorev1.CompactionJob
source []*metastorev1.BlockMeta
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have sourceBlocks already, maybe we can call this resolvedBlocks or sourceMetas or similar?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an interesting read on the topic. I agree with the author and strive to follow the principles outlined in the article.

The version I decided to go with:

compacted, err := block.Compact(ctx, job.blocks)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how this is relevant, I was referring to the "source" field in the struct which is now called blocks.

My point is that source or blocks are named too similarly to sourceBlocks in *metastorev1.CompactionJob. If someone is working with a compactionJob object they have "sourceBlocks" and "blocks" to decide between. They both represent source blocks, one of them (blocks) is initialized later than the other (resolved via a metastore client call), hence why I suggested we use a name with a qualifier.

Copy link
Collaborator Author

@kolesnikovae kolesnikovae Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly recommend this semi-official style guide and this piece of wisdom for further reading, in addition to the link I already shared.

Yes, I understand your point. I cannot agree.

You are right that source might not be the best name, but for a slightly different reason. It's not possible to confuse or misuse job.SourceBlocks with job.source (now job.blocks) since they have different types and visibility levels. I also believe the purpose of the members is very clear and specific in the context.

I just think blocks is the clearest and most concise option here – perfect qualities for a variable name.

block.Compact(ctx, job.blocks)         // I settled on this version.
block.Compact(ctx, job.source)         // Works well but might be ambiguous for a reader who has no idea what a compaction job is and what the job source is.
block.Compact(ctx, job.sourceBlocks)   // Quite good, but matches the input job.SourceBlocks and can be shortened without any clarity loss: any block we give at input is the source.
block.Compact(ctx, job.resolvedBlocks) // Irrelevant details. Besides, this is the only place in the whole codebase where we mention "resolved block".
block.Compact(ctx, job.sourceMetas)    // Looks like we're compacting metas. There's no reason to include the type name in the variable name.

go func() {
defer w.wg.Done()
w.jobsLoop(ctx)
level.Info(w.logger).Log("msg", "compaction worker thead started")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, typo in "thread"

Comment on lines +107 to +108
for created := 0; created < capacity; created++ {
plan, err := planner.CreateJob()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand why we try to create this number (capacity) of jobs. Is it just a relatively low number to avoid having a large response here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a number of reasons:

  • We do want to have small raft messages (where the planned job will end up).
  • We don't want to create jobs ahead of time. This allows altering the planner, scheduler, and workers' configs at any time, with almost instant effect.

The rate I want our system to maintain is at least 1GB/s, which is roughly 256 segments (500ms) per second, or around 1M metadata entries and 50K compaction jobs per hour.

Consider the case when no workers are available (e.g., due to infrastructure issues, misconfiguration, or bugs), or when workers do not have enough capacity to handle all the jobs.

Consider also the case when the metastore is unavailable for a period (e.g., due to infrastructure issues, misconfiguration, or bugs), and our DLQ accumulates a substantial number of entries to process (e.g., 1M, which is a reasonable number). While we will pace the processing, many new blocks (and thus jobs) are to be added over a short period of time.

If there is no limit on the job queue size (which is TODO, BTW), and no limit on how many jobs we can create at once, ingestion could be blocked for a long period of time when the metastore or workers come back online (or added). However, if we produce no more jobs than we can actually handle, this will not happen: the service will adapt to the capacity of the worker fleet, and all jobs will eventually be created and scheduled.

In addition, the underlying implementation of the block queue can handle millions of entries without major issues. While the job queue is simpler, it may cause performance issues if not handled carefully – something we may want to address in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the choice of capacity as the limit, not saying that we should not have a limit. We already assign jobs above so the capacity might be "spent" already.

Copy link
Collaborator Author

@kolesnikovae kolesnikovae Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The answer is here:

However, if we produce no more jobs than we can actually handle, this will not happen: the service will adapt to the capacity of the worker fleet, and all jobs will eventually be created and scheduled.

I think I need to expand on this and add it to the documentation. The challenge is that we don't know the capacity of our worker fleet in advance, and we have no control over them – they can appear and disappear at any time. Therefore, we need an adaptive approach if we want to keep our queue short and workers busy.

I considered a few options:

  1. We produce the assigned number of jobs: no workers are assigned any jobs at all. There might be jobs ready for scheduling, but we do not add them to the scheduler because workers are left without assignments due to no jobs being in the scheduler queue. Loop.

  2. We produce the capacity-assigned number of jobs: we never utilize capacity fully. I'd call this strategy "greedy worker" – the intuition here is that everyone only cares about itself, but in the end, everyone starves.

  3. We produce the capacity number of jobs: we create jobs for another compaction worker instance. Essentially, we have an evidence that this number of jobs will eventually be handled.

  4. We ensure that our queue has at least max_worker_capacity number of assignable jobs in the queue. That works but it is trickier to implement and requires a parameter, which can be dynamic.

There's an unaddressed risk however: if all the workers just abandon all the jobs they take, it will inflate the queue. This is the reason why we need a hard cap and a displacement policy, regardless of the scheduling principles.

@kolesnikovae
Copy link
Collaborator Author

Thank you for the review Aleks!

I am not entirely sold on the implementation itself. I think we can proceed with merging this, but I would try to see if we can simplify a few things in a future iteration.

I'd like to request more specific, actionable feedback. If you have tangible suggestions for simplification or areas where you see unnecessary complexity, I'm happy to discuss and explore adjustments. I suspect that one of us may be missing some nuances of the system's operation and failure modes. Let's discuss it next time we meet.

My main concern is that this will be harder to maintain. We introduce many new concepts (a scheduler, planner, etc.),

The current design introduces the following components:

  • The compactor that accepts blocks for compaction and owns the compaction queue.
  • The planner is responsible for creating job plans (defining the compaction jobs).
  • The scheduler oversees job priorities, assignments, and status transitions.

Each of the components has a well-defined set of responsibilities. This separation of concerns is intentional to ensure the system remains maintainable as it evolves. If you believe the earlier version (and a couple of pieces here and there) was simpler and preferable, I'm open to a discussion.

I fear that some parts (e.g., the compaction queue) will be hard to reason about when the knowledge about the internal workings is not as fresh as now.

The data structures involved – priority queues and linked lists – are standard and should be familiar to most developers. However, if specific parts of the codebase seem unclear, I'd be happy to add documentation or comments.

and many types representing different representations of compaction jobs

Apparently, you're referring to:

message CompactionPlanUpdate {
  repeated NewCompactionJob new_jobs = 1;
  repeated AssignedCompactionJob assigned_jobs = 2;
  repeated UpdatedCompactionJob updated_jobs = 3;
  repeated CompletedCompactionJob completed_jobs = 4;
}

A job has different attributes depending on its status (e.g., UpdatedCompactionJob never includes the job plan unlike AssignedCompactionJob, while CompletedCompactionJob includes the job results). Strict typing will protect against mistakes. If you have concerns about this approach, let's discuss alternatives that still maintain clarity and safety.

@aleks-p
Copy link
Contributor

aleks-p commented Nov 21, 2024

I'd like to request more specific, actionable feedback. If you have tangible suggestions for simplification or areas where you see unnecessary complexity, I'm happy to discuss and explore adjustments. I suspect that one of us may be missing some nuances of the system's operation and failure modes. Let's discuss it next time we meet.

Sorry I can't provide more specific feedback right away, I was sharing my initial feeling about the changes. Things are a bit more clear after a second look. We are making a large shift from creating immutable jobs when we consume blocks to creating jobs on demand. We are also opening up the possibility of partial completion of "batches" which then requires extra care. As I said in my previous comment and as discussed offline, lets move forward with this and act later if needed.

@kolesnikovae kolesnikovae force-pushed the feat/compaction-background-cleanup branch from 08d9387 to 65562d1 Compare November 24, 2024 13:49
@kolesnikovae kolesnikovae merged commit 00f81b9 into main Nov 25, 2024
29 checks passed
@kolesnikovae kolesnikovae deleted the feat/compaction-background-cleanup branch November 25, 2024 13:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants