Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TTL support for compressedBucketWrite fn #2930

Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,16 @@ public boolean update(String path, DataUpdater<T> updater, int options) {
* sync update
*/
public AccessResult doUpdate(String path, DataUpdater<T> updater, int options) {
return doUpdate(path, updater, options, ZkClient.TTL_NOT_SET);
}

/**
* sync update with ttl
*
* ttl is only used when creating new znode, hence if znode is already created with a ttl, further
* update operations will not update the znode ttl even if ttl is provided in the options
*/
public AccessResult doUpdate(String path, DataUpdater<T> updater, int options, long ttl) {
jacoblukose marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally helper functions with this naming convention should be private.

Copy link
Author

@jacoblukose jacoblukose Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked other fns (ex: doCreate, doSet and also the other invocation of doUpdate) and they are having public as the scope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally if the fuc is not used outside class, we would set as private.
Also more importantly, it is a bit odd to allow user to set ttl but not actually update ttl in update. If user provide a new ttl value and the node already exit, this function will return RetCode.OK without actually change anything.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I said earlier there are several instances of doCreate, doSet, doUpdate which are declared public. I followed the similar convention. I am not familiar with the helix code base, this is my first PR. If you can guide me what exact change to make here, happy to correct it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on @xyuanlu.

I can understand TTL for create. But why we need TTL for update? The right operation is to let the user delete the node and recreate with a TTL.

I am not confortable with this feature. Before we have strong justification. I dont think we should let this in.

@jacoblukose It would be good to figure out why you would like to support this.

Copy link
Contributor

@xyuanlu xyuanlu Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this might not be the best approach, but I am OK as long as @junkaixue feels OK. :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not buying this. As I said, when a node is created, we should not update its property and change it from a persistent node to TTL node.

If you need to do this, then it should be the application logic to make that smoothly migrate from Non TTL to TTL. Also, since this is the API support Helix. I dont see any use cases in HELIX to leverage this.

Accessor is Helix internal thing. If you would like to leverage for company specific feature and not necessary help Helix, please fork the repo and implement it internally.

Copy link
Author

@jacoblukose jacoblukose Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xyuanlu thanks for reponding and on the ack. It will be helpful if you can also callout alternative approach here to improve the craft if you thinks thats seen comprised. Happy to correct it.

@junkaixue thanks for responding, would you be kind enough to highlight the code part where the said concern: I am not buying this. As I said, when a node is created, we should not update its property and change it from a persistent node to TTL node. is. doUpdate fn is not going to update ttl for an existing node, only when underlying create is called, it leverage ttl passed if any. And also, if you see, public facing doUpdate doesnt have ttl option exposed. It would be great if you can suggest an alternative on how this need to handled as well.

Also I think there is some misunderstanding, the usecase is not to change znodes created with persistent mode to migrate to use ttl mode. Idea is to just expand the scope of ZkBucketDataAccessor to leverage zk ttl feature when the same is being initialized.

Copy link
Contributor

@xyuanlu xyuanlu Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I think more time is needed when design the ZK update API to keep consistency. I dont have an solid idea in mind how. But I think the fundamental rule is keep the behavior simple, consistent and not ambiguous.

  • The Update API has a TTL as parameter, but the updated ZNode may or may not have the passed in TTL value, is not a consistent behavior. However, I don't have an idea of how to achieve this, I think this is the reason why we don't have update with TTL in the first place. More effort is needed when design this - if ever needed.

Copy link
Author

@jacoblukose jacoblukose Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @xyuanlu the only reason why doUpdate need a ttl value to be passed in is because the current code is using a doCreate fn inside the doUpdate. If there is no doCreate call made within doUpdate then there is no need to pass in the ttl as well, reason being, ttl can only be enforced during creation of znode.

the related q is, is it a correct behaviour on the existing doUpdate fn code to call a doCreate internally? If not, this should be removed, and application need to handle it by issuing an explicit doCreate call. if its correct, then comes to the challenge on how to pass in ttl value to the internal doCreate fn inside doUpdate

AccessResult result = new AccessResult();
CreateMode mode = AccessOption.getMode(options);
if (mode == null) {
Expand Down Expand Up @@ -452,7 +462,7 @@ public AccessResult doUpdate(String path, DataUpdater<T> updater, int options) {
T newData = updater.update(null);
RetCode rc;
if (newData != null) {
AccessResult res = doCreate(path, newData, options);
AccessResult res = doCreate(path, newData, options, ttl);
result._pathCreated.addAll(res._pathCreated);
rc = res._retCode;
} else {
Expand Down Expand Up @@ -979,11 +989,29 @@ public boolean[] setChildren(List<String> paths, List<T> records, int options) {
return set(paths, records, null, null, options);
}

/**
* async setChildren with TTL
*/
public boolean[] setChildren(List<String> paths, List<T> records, int options, long ttl) {
return set(paths, records, null, null, options, ttl);
}
jacoblukose marked this conversation as resolved.
Show resolved Hide resolved

/**
* async set, give up on error other than NoNode
*/
boolean[] set(List<String> paths, List<T> records, List<List<String>> pathsCreated,
List<Stat> stats, int options) {
return set(paths, records, pathsCreated, stats, options, ZkClient.TTL_NOT_SET);
}

/**
* async set with ttl, give up on error other than NoNode
*
* ttl is only used when creating new znode, hence if znode is already created with a ttl, further
* set operations will not update the znode ttl even if ttl is provided in the options
*/
boolean[] set(List<String> paths, List<T> records, List<List<String>> pathsCreated,
jacoblukose marked this conversation as resolved.
Show resolved Hide resolved
List<Stat> stats, int options, long ttl) {
if (paths == null || paths.size() == 0) {
return new boolean[0];
}
Expand Down Expand Up @@ -1051,7 +1079,7 @@ boolean[] set(List<String> paths, List<T> records, List<List<String>> pathsCreat
// if failOnNoNode, try create
if (failOnNoNode) {
boolean[] needCreate = Arrays.copyOf(needSet, needSet.length);
createCbList = create(paths, records, needCreate, pathsCreated, options);
createCbList = create(paths, records, needCreate, pathsCreated, options, ttl);
for (int i = 0; i < createCbList.length; i++) {
ZkAsyncCallbacks.CreateCallbackHandler createCb = createCbList[i];
if (createCb == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {

private final int _bucketSize;
private final long _versionTTLms;
private long _znodeTTLms;
private int _accessOption;
private final ZkSerializer _zkSerializer;
private final RealmAwareZkClient _zkClient;
private final ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
Expand All @@ -89,25 +92,31 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
* @param versionTTLms in ms
*/
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
this(createRealmAwareZkClient(zkAddr), bucketSize, versionTTLms, false);
this(createRealmAwareZkClient(zkAddr), bucketSize, versionTTLms, false, ZkClient.TTL_NOT_SET);
}

public ZkBucketDataAccessor(RealmAwareZkClient zkClient) {
this(zkClient, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL, true);
this(zkClient, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL, true, ZkClient.TTL_NOT_SET);
}

public ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize, long versionTTLms) {
this(zkClient, bucketSize, versionTTLms, true);
this(zkClient, bucketSize, versionTTLms, true, ZkClient.TTL_NOT_SET);
}

public ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize, long versionTTLms, long znodeTTLms) {
this(zkClient, bucketSize, versionTTLms, true, znodeTTLms);
}

private ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize, long versionTTLms,
boolean usesExternalZkClient) {
boolean usesExternalZkClient, long znodeTTLms) {
_zkClient = zkClient;
_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
_zkSerializer = new ZNRecordJacksonSerializer();
_bucketSize = bucketSize;
_versionTTLms = versionTTLms;
_usesExternalZkClient = usesExternalZkClient;
_znodeTTLms = znodeTTLms;
_accessOption = getAccessOption(znodeTTLms);
}

/**
Expand Down Expand Up @@ -156,10 +165,9 @@ public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath,
lastWriteVersion++;
return String.valueOf(lastWriteVersion).getBytes();
};

// 1. Increment lastWriteVersion using DataUpdater
ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor.doUpdate(
rootPath + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, AccessOption.PERSISTENT);
Copy link
Author

@jacoblukose jacoblukose Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note(myself): based on how pr discussion concludes, we might need to use explicit create for first time and then update calls from second time onwards.

rootPath + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, _accessOption, _znodeTTLms);
if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
throw new HelixException(
String.format("Failed to write the write version at path: %s!", rootPath));
Expand Down Expand Up @@ -206,7 +214,7 @@ public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath,
buckets.add(binaryMetadata);

// Do an async set to ZK
boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, _accessOption, _znodeTTLms);
// Exception and fail the write if any failed
for (boolean s : success) {
if (!s) {
Expand All @@ -233,7 +241,7 @@ public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath,
}
};
if (!_zkBaseDataAccessor.update(rootPath + "/" + LAST_SUCCESSFUL_WRITE_KEY,
Copy link
Author

@jacoblukose jacoblukose Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note(myself): I think this update() calls also need to be changed to either create if first time and then update for later updates. Or move to doUpdate fn with ttl support, based on how the PR discussion concludes.

lastSuccessfulWriteVersionUpdater, AccessOption.PERSISTENT)) {
lastSuccessfulWriteVersionUpdater, _accessOption)) {
throw new HelixException(String
.format("Failed to write the last successful write metadata at path: %s!", rootPath));
}
Expand All @@ -251,7 +259,7 @@ public <T extends HelixProperty> HelixProperty compressedBucketRead(String path,

@Override
public void compressedBucketDelete(String path) {
if (!_zkBaseDataAccessor.remove(path, AccessOption.PERSISTENT)) {
if (!_zkBaseDataAccessor.remove(path, _accessOption)) {
throw new HelixException(String.format("Failed to delete the bucket data! Path: %s", path));
}
synchronized (this) {
Expand All @@ -272,7 +280,7 @@ private HelixProperty compressedBucketRead(String path) {

// 2. Get the metadata map
byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead + "/" + METADATA_KEY,
null, AccessOption.PERSISTENT);
null, _accessOption);
if (binaryMetadata == null) {
throw new ZkNoNodeException(
String.format("Metadata ZNode does not exist for path: %s", path));
Expand Down Expand Up @@ -309,7 +317,7 @@ private HelixProperty compressedBucketRead(String path) {
}

// Async get
List<byte[]> buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true);
List<byte[]> buckets = _zkBaseDataAccessor.get(paths, null, _accessOption, true);

// Combine buckets into one byte array
int copyPtr = 0;
Expand Down Expand Up @@ -348,6 +356,14 @@ public void finalize() {
close();
}

int getAccessOption(long znodeTTLms) {
jacoblukose marked this conversation as resolved.
Show resolved Hide resolved
if(znodeTTLms > 0) {
return AccessOption.PERSISTENT_WITH_TTL;
} else {
return AccessOption.PERSISTENT;
}
}

private synchronized void scheduleStaleVersionGC(String rootPath) {
// If GC already scheduled, return early
if (_gcTaskFutureMap.containsKey(rootPath)) {
Expand All @@ -373,7 +389,7 @@ private void deleteStaleVersions(String rootPath) {
String currentVersionStr = getLastSuccessfulWriteVersion(rootPath);

// Get all children names under path
List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT);
List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, _accessOption);
if (children == null || children.isEmpty()) {
// The whole path has been deleted so return immediately
return;
Expand All @@ -382,7 +398,7 @@ private void deleteStaleVersions(String rootPath) {
getPathsToDelete(rootPath, filterChildrenNames(children, Long.parseLong(currentVersionStr)));
for (String pathToDelete : pathsToDelete) {
// TODO: Should be batch delete but it doesn't work. It's okay since this runs async
_zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT);
_zkBaseDataAccessor.remove(pathToDelete, _accessOption);
}
}

Expand Down Expand Up @@ -430,7 +446,7 @@ private List<String> getPathsToDelete(String path, List<String> staleVersions) {

private String getLastSuccessfulWriteVersion(String path) {
byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY,
null, AccessOption.PERSISTENT);
null, _accessOption);
if (binaryVersionToRead == null) {
throw new ZkNoNodeException(
String.format("Last successful write ZNode does not exist for path: %s", path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,47 @@ record = new ZNRecord("msg_1");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

/**
* Test to ensure async setChildren fn with ttl configured works as expected. Test verifies general setChildren functionality
* and also verifies if the znodes created are configured with correct TTL.
*/
@Test
public void testAsyncSetChildrenWithTTL() {
jacoblukose marked this conversation as resolved.
Show resolved Hide resolved
System.setProperty("zookeeper.extendedTypesEnabled", "true");
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;

System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));

String root = _rootPath;
long ttl = 4000L;
List<ZNRecord> records = new ArrayList<>();
List<String> paths = new ArrayList<>();
ZkBaseDataAccessor<ZNRecord> accessor = Mockito.spy(new ZkBaseDataAccessor<ZNRecord>(_gZkClient));

for (int i = 0; i < 10; i++) {
String msgId = "msg_" + i;
// Example path: /TestZkBaseDataAccessor/INSTANCES/host_1/MESSAGES/msg_id
paths.add(PropertyPathBuilder.instanceMessage(root, "host_1", msgId));
ZNRecord newRecord = new ZNRecord(msgId);
newRecord.setSimpleField("key1", "value1");
records.add(newRecord);
}
boolean[] success = accessor.setChildren(paths, records, AccessOption.PERSISTENT_WITH_TTL, ttl);
for (int i = 0; i < 10; i++) {
String msgId = "msg_" + i;
Assert.assertTrue(success[i], "Should succeed in set " + msgId);
}

// Verify if all 5 subpaths to be created are configured with TTL
Mockito.verify(accessor, Mockito.times(5)).create(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(AccessOption.PERSISTENT_WITH_TTL), Mockito.eq(ttl));

System.clearProperty("zookeeper.extendedTypesEnabled");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testSyncDoSet() {
String className = TestHelper.getTestClassName();
Expand Down Expand Up @@ -420,6 +461,51 @@ public ZNRecord update(ZNRecord currentData) {
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

/**
* Test to ensure sync doUpdate fn with ttl configured works as expected. Test verifies general doUpdate functionality
* and also verifies if znode created are configured with correct TTL.
*/
@Test
public void testSyncDoUpdateWithTTL() {
System.setProperty("zookeeper.extendedTypesEnabled", "true");
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;

System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));

long ttl = 4000L;
String path = String.format("/%s/%s", _rootPath, "msg_0");
ZNRecord record = new ZNRecord("msg_0");
ZkBaseDataAccessor<ZNRecord> accessor = Mockito.spy(new ZkBaseDataAccessor<ZNRecord>(_gZkClient));

AccessResult result = accessor.doUpdate(path, new ZNRecordUpdater(record), AccessOption.PERSISTENT_WITH_TTL);
// Fails as ttl is not provided when AccessOption.PERSISTENT_WITH_TTL is used
Assert.assertEquals(result._retCode, RetCode.ERROR);

result = accessor.doUpdate(path, new ZNRecordUpdater(record), AccessOption.PERSISTENT_WITH_TTL, ttl);
Assert.assertEquals(result._retCode, RetCode.OK);
ZNRecord getRecord = _gZkClient.readData(path);
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getId(), "msg_0");
Mockito.verify(accessor, Mockito.times(1)).doCreate(Mockito.anyString(), Mockito.any(ZNRecord.class), Mockito.eq(AccessOption.PERSISTENT_WITH_TTL), Mockito.eq(ttl));

Mockito.reset(accessor);

record.setSimpleField("key0", "value0");
result = accessor.doUpdate(path, new ZNRecordUpdater(record), AccessOption.PERSISTENT_WITH_TTL, ttl);
Assert.assertEquals(result._retCode, RetCode.OK);
getRecord = _gZkClient.readData(path);
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getSimpleFields().size(), 1);
Assert.assertNotNull(getRecord.getSimpleField("key0"));
Assert.assertEquals(getRecord.getSimpleField("key0"), "value0");
Mockito.verify(accessor, Mockito.times(0)).doCreate(Mockito.anyString(), Mockito.any(ZNRecord.class), Mockito.eq(AccessOption.PERSISTENT_WITH_TTL), Mockito.eq(ttl));

System.clearProperty("zookeeper.extendedTypesEnabled");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testSyncMultiSet() {
String className = TestHelper.getTestClassName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -59,7 +60,7 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
private final ZNRecord record = new ZNRecord(NAME_KEY);

private HelixZkClient _zkClient;
private BucketDataAccessor _bucketDataAccessor;
private ZkBucketDataAccessor _bucketDataAccessor;
jacoblukose marked this conversation as resolved.
Show resolved Hide resolved
private BaseDataAccessor<byte[]> _zkBaseDataAccessor;
private BucketDataAccessor _fastGCBucketDataAccessor;

Expand Down Expand Up @@ -219,6 +220,27 @@ public void testGCScheduler() throws IOException, InterruptedException {
System.out.print("Children after GC: " + children);
}

/**
* Test to ensure that the correct AccessOption is returned based on the znodeTTLms value.
*/
@Test
public void testGetAccessOption() {
long ttl = 1000L; // Example of a positive TTL
int result = _bucketDataAccessor.getAccessOption(ttl);
Assert.assertEquals(AccessOption.PERSISTENT_WITH_TTL, result,
"Expected PERSISTENT_WITH_TTL for positive znodeTTLms");

ttl = 0L; // Example of a zero TTL
result = _bucketDataAccessor.getAccessOption(ttl);
Assert.assertEquals(AccessOption.PERSISTENT, result,
"Expected PERSISTENT for zero znodeTTLms");

ttl = -100L; // Example of a negative TTL
result = _bucketDataAccessor.getAccessOption(ttl);
Assert.assertEquals(AccessOption.PERSISTENT, result,
"Expected PERSISTENT for negative znodeTTLms");
}

private HelixProperty createLargeHelixProperty(String name, int numEntries) {
HelixProperty property = new HelixProperty(name);
for (int i = 0; i < numEntries; i++) {
Expand Down
Loading