Skip to content

Commit

Permalink
Activated garbage-collection for the tries
Browse files Browse the repository at this point in the history
  • Loading branch information
spoto committed May 29, 2024
1 parent 8f00363 commit a6a4a77
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -123,8 +125,9 @@ protected void moveToFinalStoreOf(T transaction) throws NodeException {
var rootAsBI = ByteIterable.fromBytes(getStore().getStateId());
env.executeInTransaction(txn -> setRootBranch(oldStore, rootAsBI, txn));

if (!storesToGC.offer(oldStore))
LOGGER.warning("could not enqueue old store for garbage collection: the queue is full!");
if (!isUsed(oldStore))
if (!storesToGC.offer(oldStore))
LOGGER.warning("could not enqueue old store for garbage collection: the queue is full!");
}
catch (StoreException | ExodusException e) {
throw new NodeException(e);
Expand All @@ -146,6 +149,30 @@ protected void closeResources() throws NodeException, InterruptedException {
}
}

private final ConcurrentMap<CheckableStore<?,?>, Integer> storeUsers = new ConcurrentHashMap<>();

@Override
protected void enter(S store) {
super.enter(store);
storeUsers.putIfAbsent(store, 0);
storeUsers.compute(store, (_store, old) -> old + 1);
}

@Override
protected void exit(S store) {
storeUsers.compute(store, (_store, old) -> old - 1);

if (!isUsed(store))
if (!storesToGC.offer(store))
LOGGER.warning("could not enqueue old store for garbage collection: the queue is full!");

super.exit(store);
}

private boolean isUsed(S store) {
return store == getStore() || storeUsers.getOrDefault(store, 0) > 0;
}

private void setRootBranch(S oldStore, ByteIterable rootAsBI, Transaction txn) {
storeOfNode.put(txn, ROOT, rootAsBI); // we set the root branch
//storeOfNode.put(txn, PAST_STORES, null); // we add the old store to the past stores list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,15 @@ public final Subscription subscribeToEvents(StorageReference creator, BiConsumer

@Override
public final ConsensusConfig<?,?> getConfig() throws NodeException {
S store = this.store;
enter(store);

try (var scope = mkScope()) {
return store.getConfig();
}
finally {
exit(store);
}
}

@Override
Expand All @@ -234,12 +240,18 @@ public final TransactionReference getTakamakaCode() throws NodeException {

@Override
public final StorageReference getManifest() throws NodeException {
S store = this.store;
enter(store);

try (var scope = mkScope()) {
return store.getManifest().orElseThrow(UninitializedNodeException::new);
}
catch (StoreException e) {
throw new NodeException(e);
}
finally {
exit(store);
}
}

@Override
Expand Down Expand Up @@ -267,16 +279,25 @@ public final TransactionResponse getPolledResponse(TransactionReference referenc

@Override
public final TransactionRequest<?> getRequest(TransactionReference reference) throws UnknownReferenceException, NodeException {
S store = this.store;
enter(store);

try (var scope = mkScope()) {
return store.getRequest(Objects.requireNonNull(reference));
}
catch (StoreException e) {
throw new NodeException(e);
}
finally {
exit(store);
}
}

@Override
public final TransactionResponse getResponse(TransactionReference reference) throws TransactionRejectedException, UnknownReferenceException, NodeException {
S store = this.store;
enter(store);

try (var scope = mkScope()) {
try {
return store.getResponse(Objects.requireNonNull(reference));
Expand All @@ -292,10 +313,16 @@ public final TransactionResponse getResponse(TransactionReference reference) thr
catch (StoreException e) {
throw new NodeException(e);
}
finally {
exit(store);
}
}

@Override
public final ClassTag getClassTag(StorageReference reference) throws UnknownReferenceException, NodeException {
S store = this.store;
enter(store);

try (var scope = mkScope()) {
Objects.requireNonNull(reference);

Expand All @@ -311,21 +338,30 @@ public final ClassTag getClassTag(StorageReference reference) throws UnknownRefe
catch (StoreException e) {
throw new NodeException(e);
}
finally {
exit(store);
}
}

@Override
public final Stream<Update> getState(StorageReference reference) throws UnknownReferenceException, NodeException {
S store = this.store;
enter(store);

try (var scope = mkScope()) {
try {
Stream<TransactionReference> history = store.getHistory(Objects.requireNonNull(reference));
var updates = new HashSet<Update>();
CheckRunnable.check(StoreException.class, () -> history.forEachOrdered(UncheckConsumer.uncheck(transaction -> addUpdatesCommitted(reference, transaction, updates))));
CheckRunnable.check(StoreException.class, () -> history.forEachOrdered(UncheckConsumer.uncheck(transaction -> addUpdatesCommitted(store, reference, transaction, updates))));
return updates.stream();
}
catch (StoreException e) {
throw new NodeException(e);
}
}
finally {
exit(store);
}
}

@Override
Expand Down Expand Up @@ -385,6 +421,9 @@ public final Optional<StorageValue> addStaticMethodCallTransaction(StaticMethodC

@Override
public final Optional<StorageValue> runInstanceMethodCallTransaction(InstanceMethodCallTransactionRequest request) throws TransactionRejectedException, TransactionException, CodeExecutionException, NodeException {
S store = this.store;
enter(store);

try (var scope = mkScope()) {
var reference = TransactionReferences.of(hasher.hash(request));
String referenceAsString = reference.toString();
Expand All @@ -396,10 +435,16 @@ public final Optional<StorageValue> runInstanceMethodCallTransaction(InstanceMet
catch (StoreException e) {
throw new NodeException(e);
}
finally {
exit(store);
}
}

@Override
public final Optional<StorageValue> runStaticMethodCallTransaction(StaticMethodCallTransactionRequest request) throws TransactionRejectedException, TransactionException, CodeExecutionException, NodeException {
S store = this.store;
enter(store);

try (var scope = mkScope()) {
var reference = TransactionReferences.of(hasher.hash(request));
String referenceAsString = reference.toString();
Expand All @@ -411,6 +456,9 @@ public final Optional<StorageValue> runStaticMethodCallTransaction(StaticMethodC
catch (StoreException e) {
throw new NodeException(e);
}
finally {
exit(store);
}
}

@Override
Expand Down Expand Up @@ -448,9 +496,25 @@ public final MethodFuture postStaticMethodCallTransaction(StaticMethodCallTransa
*/
protected void initWithEmptyStore() throws NodeException {
// the node is starting from scratch: the caches are left empty and the consensus is well-known
this.store = mkStore(executors, config, hasher);
store = mkStore(executors, config, hasher);
}

/**
* Called when this node is executing something that needs the given store.
* It can be used, for instance, to take note that the store cannot be
* garbage-collected from that moment.
*
* @param store the store
*/
protected void enter(S store) {}

/**
* Called when this node finished executing something that needed the given store.
*
* @param store the store
*/
protected void exit(S store) {}

/**
* Initializes the node with the last saved store, hence resuming a node
* from its saved state. The saved store must have been initialized, so that
Expand Down Expand Up @@ -481,12 +545,18 @@ protected void signalRejected(TransactionRequest<?> request, TransactionRejected
}

protected void checkTransaction(TransactionRequest<?> request) throws TransactionRejectedException, NodeException {
S store = this.store;
enter(store);

try {
store.checkTransaction(request);
}
catch (StoreException e) {
throw new NodeException(e);
}
finally {
exit(store);
}
}

protected T beginTransaction(long now) throws NodeException {
Expand Down Expand Up @@ -578,6 +648,9 @@ else if (request instanceof ConstructorCallTransactionRequest cctr)
else
LOGGER.info(reference + ": posting (" + request.getClass().getSimpleName() + ')');

S store = this.store;
enter(store);

try {
store.getResponse(reference);
// if the response is found, then no exception is thrown above and the request was repeated
Expand All @@ -589,11 +662,14 @@ else if (request instanceof ConstructorCallTransactionRequest cctr)
catch (UnknownReferenceException e) {
// this is fine: there was no previous request with the same reference so we register
// its semaphore and post the request for execution
createSemaphore(reference);
createSemaphore(store, reference);
postRequest(request);

return reference;
}
finally {
exit(store);
}
}

private static String trim(String s) {
Expand All @@ -609,7 +685,7 @@ private static String trim(String s) {
* @param updates the set where they must be added
* @throws StoreException
*/
private void addUpdatesCommitted(StorageReference object, TransactionReference referenceInHistory, Set<Update> updates) throws StoreException {
private void addUpdatesCommitted(S store, StorageReference object, TransactionReference referenceInHistory, Set<Update> updates) throws StoreException {
try {
if (store.getResponse(referenceInHistory) instanceof TransactionResponseWithUpdates trwu)
trwu.getUpdates()
Expand All @@ -629,7 +705,7 @@ private void addUpdatesCommitted(StorageReference object, TransactionReference r
* @param reference the reference of the transaction for the request
* @throws TransactionRejectedException
*/
private void createSemaphore(TransactionReference reference) throws TransactionRejectedException {
private void createSemaphore(S store, TransactionReference reference) throws TransactionRejectedException {
if (semaphores.putIfAbsent(reference, new Semaphore(0)) != null)
throw new TransactionRejectedException("Repeated request " + reference, store.getConfig());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,44 @@ protected S addDelta(StoreCache cache, LinkedHashMap<TransactionReference, Trans

try {
return CheckSupplier.check(StoreException.class, TrieException.class, () -> env.computeInTransaction(UncheckFunction.uncheck(txn -> {
System.out.println("added requests: " + addedRequests.entrySet().size());
var trieOfRequests = mkTrieOfRequests(txn);
for (var entry: addedRequests.entrySet())
for (var entry: addedRequests.entrySet()) {
trieOfRequests.incrementReferenceCountOfRoot();
var old = trieOfRequests;
trieOfRequests = trieOfRequests.put(entry.getKey(), entry.getValue());
old.free();
}

System.out.println("added responses: " + addedResponses.entrySet().size());
var trieOfResponses = mkTrieOfResponses(txn);
for (var entry: addedResponses.entrySet())
for (var entry: addedResponses.entrySet()) {
trieOfResponses.incrementReferenceCountOfRoot();
var old = trieOfResponses;
trieOfResponses = trieOfResponses.put(entry.getKey(), entry.getValue());

old.free();
}

System.out.println("added histories: " + addedHistories.entrySet().size());
var trieOfHistories = mkTrieOfHistories(txn);
for (var entry: addedHistories.entrySet())
for (var entry: addedHistories.entrySet()) {
trieOfHistories.incrementReferenceCountOfRoot();
var old = trieOfHistories;
trieOfHistories = trieOfHistories.put(entry.getKey(), Stream.of(entry.getValue()));
old.free();
}

var trieOfInfo = mkTrieOfInfo(txn);
trieOfInfo.incrementReferenceCountOfRoot();
var old = trieOfInfo;
trieOfInfo = trieOfInfo.increaseHeight();
if (addedManifest.isPresent())
old.free();
if (addedManifest.isPresent()) {
trieOfInfo.incrementReferenceCountOfRoot();
old = trieOfInfo;
trieOfInfo = trieOfInfo.setManifest(addedManifest.get());
old.free();
}

// we increment the reference count of the roots of the resulting tries, so that
// they do not get garbage collected until this store is freed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static Update from(UnmarshallingContext context) throws IOException {
return Updates.classTag(StorageValues.referenceWithoutSelectorFrom(context), (ClassType) StorageTypes.from(context), TransactionReferences.from(context));
}
catch (ClassCastException e) {
throw new IOException("Failed unmrshalling a class tag", e);
throw new IOException("Failed unmarshalling a class tag", e);
}
}
case UpdateOfBigIntegerImpl.SELECTOR_BALANCE: return Updates.ofBigInteger(StorageValues.referenceWithoutSelectorFrom(context), FieldSignatures.BALANCE_FIELD, context.readBigInteger());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ private static byte[] compactNibblesIntoBytes(byte[] nibbles, byte evenSelector,
return result;
}

private static long freed;
private static long allocated;

private static byte[] expandBytesIntoNibbles(byte[] bytes, byte evenSelector) {
byte[] nibbles;

Expand Down Expand Up @@ -441,6 +444,8 @@ protected final AbstractNode putInStore() throws TrieException {
}
catch (UnknownKeyException e) {
try {
allocated++;
System.out.printf("%d/%d: %.2f\n", freed, allocated, freed * 100.0 / allocated);
store.put(hash, toByteArray()); // we bind it to its hash in the store
incrementReferenceCountOfDescedants();
return this;
Expand All @@ -461,12 +466,14 @@ protected final AbstractNode putInStore() throws TrieException {
*/
protected void free(byte[] hash) throws TrieException {
AbstractNode replacement = withDecrementedReferenceCount();
//System.out.println(getClass().getSimpleName() + ": " + count + " -> " + replacement.count);
System.out.println(getClass().getSimpleName() + ": " + count + " -> " + replacement.count);

try {
if (replacement.count > 0)
store.put(hash, replacement.toByteArray()); // TODO: can we avoid to unmarshal and marshal again?
else {
freed++;
System.out.printf("%d/%d: %.2f\n", freed, allocated, freed * 100.0 / allocated);
store.remove(hash);
freeDescendants();
}
Expand Down
Loading

0 comments on commit a6a4a77

Please sign in to comment.