Skip to content

Commit

Permalink
IGNITE-22530 Add removal of destroyed caches from cacheList file
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Nadyktov authored and lordgarrish committed Nov 13, 2024
1 parent 2517c25 commit 7d30f0e
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
Expand Down Expand Up @@ -283,9 +284,46 @@ private boolean matchesFilters(String cacheName) {

/** {@inheritDoc} */
@Override public void onCacheDestroy(Iterator<Integer> caches) {
caches.forEachRemaining(e -> {
// Just skip. Handle of cache events not supported.
});
caches.forEachRemaining(this::deleteRegexpCacheIfPresent);
}

/**
* Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow.
*
* @param cacheId Cache id.
*/
private void deleteRegexpCacheIfPresent(Integer cacheId) {
try {
List<String> caches = loadCaches();

Optional<String> cacheName = caches.stream()
.filter(name -> CU.cacheId(name) == cacheId)
.findAny();

if (cacheName.isPresent()) {
String name = cacheName.get();

caches.remove(name);

Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

StringBuilder cacheList = new StringBuilder();

for (String cache : caches) {
cacheList.append(cache);

cacheList.append('\n');
}

Files.write(savedCachesPath, cacheList.toString().getBytes());

if (log.isInfoEnabled())
log.info("Cache has been removed from replication [cacheName=" + name + ']');
}
}
catch (IOException e) {
throw new IgniteException(e);
}
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -282,9 +283,46 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {

/** {@inheritDoc} */
@Override public void onCacheDestroy(Iterator<Integer> caches) {
caches.forEachRemaining(e -> {
// Just skip. Handle of cache events not supported.
});
caches.forEachRemaining(this::deleteRegexpCacheIfPresent);
}

/**
* Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow.
*
* @param cacheId Cache id.
*/
private void deleteRegexpCacheIfPresent(Integer cacheId) {
try {
List<String> caches = loadCaches();

Optional<String> cacheName = caches.stream()
.filter(name -> CU.cacheId(name) == cacheId)
.findAny();

if (cacheName.isPresent()) {
String name = cacheName.get();

caches.remove(name);

Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

StringBuilder cacheList = new StringBuilder();

for (String cache : caches) {
cacheList.append(cache);

cacheList.append('\n');
}

Files.write(savedCachesPath, cacheList.toString().getBytes());

if (log.isInfoEnabled())
log.info("Cache has been removed from replication [cacheName=" + name + ']');
}
}
catch (IOException e) {
throw new IgniteException(e);
}
}

/** Send marker(meta need to be updated) record to each partition of events topic. */
Expand Down

0 comments on commit 7d30f0e

Please sign in to comment.