Skip to content

Commit

Permalink
IGNITE-22530 CDC: Add regex filters for cache names
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Nadyktov authored and lordgarrish committed Nov 13, 2024
1 parent be0abc7 commit c862d18
Show file tree
Hide file tree
Showing 16 changed files with 910 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,19 @@

package org.apache.ignite.cdc;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
Expand Down Expand Up @@ -67,12 +77,30 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
/** */
public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to destination cluster";

/** File with saved names of caches added by cache masks. */
private static final String SAVED_CACHES_FILE = "caches";

/** CDC directory path. */
private Path cdcDir;

/** Handle only primary entry flag. */
private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;

/** Cache names. */
private Set<String> caches;

/** Include regex templates for cache names. */
private Set<String> includeTemplates = new HashSet<>();

/** Compiled include regex patterns for cache names. */
private Set<Pattern> includeFilters;

/** Exclude regex templates for cache names. */
private Set<String> excludeTemplates = new HashSet<>();

/** Compiled exclude regex patterns for cache names. */
private Set<Pattern> excludeFilters;

/** Cache IDs. */
protected Set<Integer> cachesIds;

Expand All @@ -99,14 +127,28 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
protected IgniteLogger log;

/** {@inheritDoc} */
@Override public void start(MetricRegistry reg) {
@Override public void start(MetricRegistry reg, Path cdcDir) {
A.notEmpty(caches, "caches");

this.cdcDir = cdcDir;

cachesIds = caches.stream()
.mapToInt(CU::cacheId)
.boxed()
.collect(Collectors.toSet());

prepareRegexFilters();

try {
loadCaches().stream()
.filter(this::matchesFilters)
.map(CU::cacheId)
.forEach(cachesIds::add);
}
catch (IOException e) {
throw new IgniteException(e);
}

MetricRegistryImpl mreg = (MetricRegistryImpl)reg;

this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC);
Expand Down Expand Up @@ -144,10 +186,101 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
/** {@inheritDoc} */
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
cacheEvents.forEachRemaining(e -> {
// Just skip. Handle of cache events not supported.
matchWithRegexTemplates(e.configuration().getName());
});
}

/**
* Finds match between cache name and user's regex templates.
* If match found, adds this cache's id to id's list and saves cache name to file.
*
* @param cacheName Cache name.
*/
private void matchWithRegexTemplates(String cacheName) {
int cacheId = CU.cacheId(cacheName);

if (!cachesIds.contains(cacheId) && matchesFilters(cacheName)) {
cachesIds.add(cacheId);

try {
saveCache(cacheName);
}
catch (IOException e) {
throw new IgniteException(e);
}

if (log.isInfoEnabled())
log.info("Cache has been added to replication [cacheName=" + cacheName + "]");
}
}

/**
* Writes cache name to file
*
* @param cacheName Cache name.
*/
private void saveCache(String cacheName) throws IOException {
if (cdcDir != null) {
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

String cn = cacheName + '\n';

Files.write(savedCachesPath, cn.getBytes(), StandardOpenOption.APPEND);
}
}

/**
* Loads saved caches from file.
*
* @return List of saved caches names.
*/
private List<String> loadCaches() throws IOException {
if (cdcDir != null) {
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

if (Files.notExists(savedCachesPath)) {
Files.createFile(savedCachesPath);

if (log.isInfoEnabled())
log.info("Cache list created: " + savedCachesPath);
}

return Files.readAllLines(savedCachesPath);
}
return Collections.emptyList();
}

/**
* Compiles regex patterns from user templates.
*
* @throws PatternSyntaxException If the template's syntax is invalid
*/
private void prepareRegexFilters() {
includeFilters = includeTemplates.stream()
.map(Pattern::compile)
.collect(Collectors.toSet());

excludeFilters = excludeTemplates.stream()
.map(Pattern::compile)
.collect(Collectors.toSet());
}

/**
* Matches cache name with compiled regex patterns.
*
* @param cacheName Cache name.
* @return True if cache name match include patterns and don't match exclude patterns.
*/
private boolean matchesFilters(String cacheName) {
boolean matchesInclude = includeFilters.stream()
.anyMatch(pattern -> pattern.matcher(cacheName).matches());

boolean notMatchesExclude = excludeFilters.stream()
.noneMatch(pattern -> pattern.matcher(cacheName).matches());

return matchesInclude && notMatchesExclude;
}

/** {@inheritDoc} */
@Override public void onCacheDestroy(Iterator<Integer> caches) {
caches.forEachRemaining(e -> {
Expand Down Expand Up @@ -238,6 +371,30 @@ public AbstractIgniteCdcStreamer setCaches(Set<String> caches) {
return this;
}

/**
* Sets include regex patterns that participate in CDC.
*
* @param includeTemplates Include regex templates
* @return {@code this} for chaining.
*/
public AbstractIgniteCdcStreamer setIncludeTemplates(Set<String> includeTemplates) {
this.includeTemplates = includeTemplates;

return this;
}

/**
* Sets exclude regex patterns that participate in CDC.
*
* @param excludeTemplates Exclude regex templates
* @return {@code this} for chaining.
*/
public AbstractIgniteCdcStreamer setExcludeTemplates(Set<String> excludeTemplates) {
this.excludeTemplates = excludeTemplates;

return this;
}

/**
* Sets maximum batch size that will be applied to destination cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.ignite.cdc;

import java.nio.file.Path;

import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
Expand Down Expand Up @@ -61,8 +63,8 @@ public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer impleme
private volatile boolean alive = true;

/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
super.start(mreg);
@Override public void start(MetricRegistry mreg, Path cdcDir) {
super.start(mreg, cdcDir);

if (log.isInfoEnabled())
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.ignite.cdc.conflictresolve;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;

import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
Expand Down Expand Up @@ -65,6 +68,12 @@ public class CacheVersionConflictResolverPluginProvider<C extends PluginConfigur
/** Custom conflict resolver. */
private CacheVersionConflictResolver resolver;

/** Include regex templates for cache names. */
private Set<String> includeTemplates = new HashSet<>();

/** Exclude regex templates for cache names. */
private Set<String> excludeTemplates = new HashSet<>();

/** Log. */
private IgniteLogger log;

Expand Down Expand Up @@ -98,7 +107,7 @@ public CacheVersionConflictResolverPluginProvider() {
@Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
String cacheName = ctx.igniteCacheConfiguration().getName();

if (caches.contains(cacheName)) {
if (caches.contains(cacheName) || matchesFilters(cacheName)) {
log.info("ConflictResolver provider set for cache [cacheName=" + cacheName + ']');

return provider;
Expand Down Expand Up @@ -144,6 +153,16 @@ public void setConflictResolver(CacheVersionConflictResolver resolver) {
this.resolver = resolver;
}

/** @param includeTemplates Include regex templates */
public void setIncludeTemplates(Set<String> includeTemplates) {
this.includeTemplates = includeTemplates;
}

/** @param excludeTemplates Exclude regex templates */
public void setExcludeTemplates(Set<String> excludeTemplates) {
this.excludeTemplates = excludeTemplates;
}

/** {@inheritDoc} */
@Override public void start(PluginContext ctx) {
((IgniteEx)ctx.grid()).context().cache().context().versions().dataCenterId(clusterId);
Expand Down Expand Up @@ -178,4 +197,21 @@ public void setConflictResolver(CacheVersionConflictResolver resolver) {
@Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) {
return null;
}

/**
* Match cache name with regex patterns.
*
* @param cacheName Cache name.
*/
private boolean matchesFilters(String cacheName) {
boolean matchesInclude = includeTemplates.stream()
.map(Pattern::compile)
.anyMatch(pattern -> pattern.matcher(cacheName).matches());

boolean notMatchesExclude = excludeTemplates.stream()
.map(Pattern::compile)
.noneMatch(pattern -> pattern.matcher(cacheName).matches());

return matchesInclude && notMatchesExclude;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ protected void runAppliers() {
caches,
metaUpdr,
stopped,
metrics
metrics,
this
);

addAndStart("applier-thread-" + cntr++, applier);
Expand Down Expand Up @@ -252,6 +253,13 @@ private <T extends AutoCloseable & Runnable> void addAndStart(String threadName,
/** Checks that configured caches exist in a destination cluster. */
protected abstract void checkCaches(Collection<String> caches);

/**
* Get cache names from client.
*
* @return Cache names.
* */
protected abstract Collection<String> getCaches();

/** */
private void ackAsciiLogo(IgniteLogger log) {
String ver = "ver. " + ACK_VER_STR;
Expand Down
Loading

0 comments on commit c862d18

Please sign in to comment.