diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java index a2f830f2..1fd6cba2 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -283,9 +284,46 @@ private boolean matchesFilters(String cacheName) { /** {@inheritDoc} */ @Override public void onCacheDestroy(Iterator caches) { - caches.forEachRemaining(e -> { - // Just skip. Handle of cache events not supported. - }); + caches.forEachRemaining(this::deleteRegexpCacheIfPresent); + } + + /** + * Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow. + * + * @param cacheId Cache id. + */ + private void deleteRegexpCacheIfPresent(Integer cacheId) { + try { + List caches = loadCaches(); + + Optional cacheName = caches.stream() + .filter(name -> CU.cacheId(name) == cacheId) + .findAny(); + + if (cacheName.isPresent()) { + String name = cacheName.get(); + + caches.remove(name); + + Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE); + + StringBuilder cacheList = new StringBuilder(); + + for (String cache : caches) { + cacheList.append(cache); + + cacheList.append('\n'); + } + + Files.write(savedCachesPath, cacheList.toString().getBytes()); + + if (log.isInfoEnabled()) + log.info("Cache has been removed from replication [cacheName=" + name + ']'); + } + } + catch (IOException e) { + throw new IgniteException(e); + } } /** {@inheritDoc} */ diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java index 9d60cbe0..8baf5a0b 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -282,9 +283,46 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer { /** {@inheritDoc} */ @Override public void onCacheDestroy(Iterator caches) { - caches.forEachRemaining(e -> { - // Just skip. Handle of cache events not supported. - }); + caches.forEachRemaining(this::deleteRegexpCacheIfPresent); + } + + /** + * Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow. + * + * @param cacheId Cache id. + */ + private void deleteRegexpCacheIfPresent(Integer cacheId) { + try { + List caches = loadCaches(); + + Optional cacheName = caches.stream() + .filter(name -> CU.cacheId(name) == cacheId) + .findAny(); + + if (cacheName.isPresent()) { + String name = cacheName.get(); + + caches.remove(name); + + Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE); + + StringBuilder cacheList = new StringBuilder(); + + for (String cache : caches) { + cacheList.append(cache); + + cacheList.append('\n'); + } + + Files.write(savedCachesPath, cacheList.toString().getBytes()); + + if (log.isInfoEnabled()) + log.info("Cache has been removed from replication [cacheName=" + name + ']'); + } + } + catch (IOException e) { + throw new IgniteException(e); + } } /** Send marker(meta need to be updated) record to each partition of events topic. */