Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory store leader election - REAP-2 #1533

Merged
merged 8 commits into from
Dec 18, 2024
Merged

Conversation

adejanovski
Copy link
Contributor

@adejanovski adejanovski commented Dec 12, 2024

Fixes #1519

The memory storage backend didn't implement any if the IDistributedStorage leader election methods.
This was ok until we started using leader election for segment scheduling, which doesn't require to run in a distributed mode.
As a consequence, Reaper using the mem store would schedule one new segment at each poll, even if the replicas were already busy processing another segment.

This PR introduces a class which manages locks on replicas for segments and moves the required methods from IDistributedStorage to IStorage so they can be implemented in the memory storage implementation.

@adejanovski adejanovski changed the title Memory store leader election Memory store leader election - REAP-2 Dec 12, 2024
@Miles-Garnsey
Copy link
Contributor

@adejanovski this is very tasty but your CI is failing at the moment, and can you remind me how to manually test this :)

Copy link
Contributor

@Miles-Garnsey Miles-Garnsey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put a bunch of feedback as a first review. I'll likely want to consider this more - this isn't an easy PR so nice work on getting something together. It feels like its very close.

I have a bunch of comments around naming since I had a bit of a tricky time understanding some of what was going on at first. These are always nits, but given the functionality is complex I'd love to see some of them straightened out for future readers.

I also think we could handle the locking we're doing much better, and suggest we revisit that now given some of the use cases we want to put this into. For a first cut this is great, but I think a bit of improvement there might do us a world of good. Improvements might include:

  1. Better segmentation of what needs to be locked, since the global lock you're using isn't scalable.
  2. Being more discerning about where you're locking. You probably only need to lock on writes, but you're often acquiring the lock before reads at entry into the method.

Again, this is kind of a minor tweak but I think it'll avoid headaches down the road.

try {
long currentTime = System.currentTimeMillis();
// Check if any replica is already locked by another runId
for (String replica : replicas) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think the pattern in this codebase is to the use the streams API instead of for loops where possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point, I'm refactoring this to use streams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might have missed this one in your latest commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're good now, no more loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is still a loop here for (String replica : replicas) { was this something you wanted to change?


private final ConcurrentHashMap<String, LockInfo> replicaLocks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<UUID, Set<UUID>> repairRunToSegmentLocks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<UUID, ReentrantLock> runIdLocks = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: I thought you said you weren't going to divide the locks up by runID? 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I tried but then rolled back. Not far enough it seems :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want to roll this back? I thought we were reverting to using a global lock.

@Miles-Garnsey
Copy link
Contributor

Miles-Garnsey commented Dec 16, 2024

I'm starting to struggle with the length of this review, so let's propose a sketch to do just one method without external locks:

  public boolean lockRunningRepairsForNodes(UUID runId, UUID segmentId, Set<String> replicas) {
      // For each replica, check if a lock is held by another RepairRun.
      for (String replica : replicas) {
        replicaLocks.compute(getReplicaLockKey(replica, runId), (k,lockInfo) -> {
          long currentTime = System.currentTimeMillis();
          if (lockInfo != null && lockInfo.expirationTime > currentTime && lockInfo.runId.equals(runId)) { // lockInfo.runId.equals(runId) looks wrong to me?
            return lockInfo; // Replica is locked by another runId and not expired, return existing value.
          }
          long expirationTime = currentTime + (ttlSeconds * 1000);
          // Lock the replicas for the given runId and segmentId
          LockInfo newLockInfo = new LockInfo(runId, expirationTime);
          newLockInfo.segmentLocks.add(segmentId);
          return newLockInfo;
        });
      }
      return true;
  }

This also requires an update:

private static class LockInfo {
    UUID runId;
    long expirationTime;
    Set<UUID> segmentLocks;
    LockInfo(UUID runId, long expirationTime) {
      this.runId = runId;
      this.expirationTime = expirationTime;
      this.segmentLocks = new HashSet<>();
    }
  }

What this does:

  1. Uses the ConcurrentHashMaps internal locking (which I think should be per key) instead of a global lock.
  2. Moves the segmentLocks into LockInfo which makes clear that they exist within a given RunID (I hope I've understood this correctly, otherwise, I'll have broken things).
  3. Eliminates all external locking requirements. This will just iterate through the replicaLocks independently and update them if possible. It might be good to have a logic branch here for the case lockInfo.expirationTime < currentTime so that this can be removed without waiting for the TTL cleanup to happen, but maybe that can be handled with a simple retry after the other threadpool cleans the entry up.

@Miles-Garnsey
Copy link
Contributor

Miles-Garnsey commented Dec 16, 2024

From discussions with Alex earlier:

  1. The proposal I have above where we move Set<UUID> segmentLocks inside LockInfo won't work because segments are not unique to replicas.
  2. This means that the same segment can be running on multiple replicas (in fact, in RF 3 it will be running on all three), and all replicas involved in a particular segment need to be locked.
  3. We discussed the possibility of keying a map by segment with replica ID as the value instead, but Alex pointed out that even then, you may be able to lock one replica, but if you fail to lock all three then you need to roll back any locked ones before proceeding.
  4. Because the Hashmap's mutex can't be used to lock all three replicas at the same time, the unlocking process on one replica then might also fail, which could lead to deadlocks.
  5. Effectively, what we'd need is a way to do a lookup for the three replicas the segment is supposed to run on, then lock all three atomically. I don't think any Map implementation gives us the ability to lock multiple entries at the same time, which is the critical issue here.

I feel like there's some concurrent bimap that might resolve this whole state, but given that I've been reminded that we will currently only be managing one cluster per instance in the proposed use case we have, we can probably defer this more complicated implementation for later.

Copy link
Contributor

@Miles-Garnsey Miles-Garnsey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put more comments as it seems you've said that you've made certain updates in comments but then they aren't all reflected in the code. You may have rolled some back accidentally while rolling back other changes (esp the use of a map of locks).

I'm approving anyway since the tests pass and I don't want you to be blocked from merging pending my review. Whether you make the rest of the changes we've discussed is up to you, since I think they are all non-blocking and this is functionally correct AFAICT.

.map(replica -> replicaLocks.get(getReplicaLockKey(replica, runId)))
.anyMatch(lockInfo -> lockInfo != null
&& lockInfo.expirationTime > currentTime && lockInfo.runId.equals(runId));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: This looks wrong, you're saying to return false only if the value of this entry has runId equal to the runId you're trying to lock. Don't you want to return false regardless of who holds the lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we allow concurrency on replicas across repair runs. Its within a repair run that we disallow it.


private final ConcurrentHashMap<String, LockInfo> replicaLocks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<UUID, Set<UUID>> repairRunToSegmentLocks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<UUID, ReentrantLock> runIdLocks = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want to roll this back? I thought we were reverting to using a global lock.

try {
long currentTime = System.currentTimeMillis();
// Check if any replica is already locked by another runId
for (String replica : replicas) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is still a loop here for (String replica : replicas) { was this something you wanted to change?

@adejanovski adejanovski merged commit 664236b into master Dec 18, 2024
24 checks passed
adejanovski added a commit that referenced this pull request Jan 6, 2025
The memory storage backend didn't implement any if the IDistributedStorage leader election methods.
This was ok until we started using leader election for segment scheduling, which doesn't require to run in a distributed mode.
As a consequence, Reaper using the mem store would schedule one new segment at each poll, even if the replicas were already busy processing another segment.

This PR introduces a class which manages locks on replicas for segments and moves the required methods from IDistributedStorage to IStorage so they can be implemented in the memory storage implementation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

The memory store allows multiple segments to run per node for the same repair
2 participants