Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22530 CDC: Add regex filters for cache names #286

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@

package org.apache.ignite.cdc;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
Expand All @@ -35,6 +44,8 @@
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.resources.LoggerResource;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;

/**
Expand Down Expand Up @@ -67,12 +78,33 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
/** */
public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to destination cluster";

/** File with saved names of caches added by cache masks. */
private static final String SAVED_CACHES_FILE = "caches";

/** Temporary file with saved names of caches added by cache masks. */
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";

/** CDC directory path. */
private Path cdcDir;

/** Handle only primary entry flag. */
private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;

/** Cache names. */
private Set<String> caches;

/** Include regex templates for cache names. */
private Set<String> includeTemplates = new HashSet<>();

/** Compiled include regex patterns for cache names. */
private Set<Pattern> includeFilters;

/** Exclude regex templates for cache names. */
private Set<String> excludeTemplates = new HashSet<>();

/** Compiled exclude regex patterns for cache names. */
private Set<Pattern> excludeFilters;

/** Cache IDs. */
protected Set<Integer> cachesIds;

Expand All @@ -99,14 +131,28 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
protected IgniteLogger log;

/** {@inheritDoc} */
@Override public void start(MetricRegistry reg) {
@Override public void start(MetricRegistry reg, Path cdcDir) {
A.notEmpty(caches, "caches");

this.cdcDir = cdcDir;

cachesIds = caches.stream()
.mapToInt(CU::cacheId)
.boxed()
.collect(Collectors.toSet());

prepareRegexFilters();

try {
loadCaches().stream()
.filter(this::matchesFilters)
.map(CU::cacheId)
.forEach(cachesIds::add);
}
catch (IOException e) {
throw new IgniteException(e);
}

MetricRegistryImpl mreg = (MetricRegistryImpl)reg;

this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC);
Expand Down Expand Up @@ -144,15 +190,147 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
/** {@inheritDoc} */
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
cacheEvents.forEachRemaining(e -> {
// Just skip. Handle of cache events not supported.
matchWithRegexTemplates(e.configuration().getName());
});
}

/**
* Finds match between cache name and user's regex templates.
* If match is found, adds this cache's id to id's list and saves cache name to file.
*
* @param cacheName Cache name.
*/
private void matchWithRegexTemplates(String cacheName) {
int cacheId = CU.cacheId(cacheName);

if (!cachesIds.contains(cacheId) && matchesFilters(cacheName)) {
cachesIds.add(cacheId);

try {
List<String> caches = loadCaches();

caches.add(cacheName);

save(caches);
}
catch (IOException e) {
throw new IgniteException(e);
}

if (log.isInfoEnabled())
log.info("Cache has been added to replication [cacheName=" + cacheName + "]");
}
}

/**
* Writes caches list to file
*
* @param caches Caches list.
*/
private void save(List<String> caches) throws IOException {
if (cdcDir == null) {
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
}
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);

StringBuilder cacheList = new StringBuilder();

for (String cache : caches) {
cacheList.append(cache);

cacheList.append('\n');
}

Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());

Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
}

/**
* Loads saved caches from file.
*
* @return List of saved caches names.
*/
private List<String> loadCaches() throws IOException {
if (cdcDir == null) {
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
}
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

if (Files.notExists(savedCachesPath)) {
Files.createFile(savedCachesPath);

if (log.isInfoEnabled())
log.info("Cache list created: " + savedCachesPath);
}

return Files.readAllLines(savedCachesPath);
}

/**
* Compiles regex patterns from user templates.
*
* @throws PatternSyntaxException If the template's syntax is invalid
*/
private void prepareRegexFilters() {
includeFilters = includeTemplates.stream()
.map(Pattern::compile)
.collect(Collectors.toSet());

excludeFilters = excludeTemplates.stream()
.map(Pattern::compile)
.collect(Collectors.toSet());
}

/**
* Matches cache name with compiled regex patterns.
*
* @param cacheName Cache name.
* @return True if cache name match include patterns and don't match exclude patterns.
*/
private boolean matchesFilters(String cacheName) {
boolean matchesInclude = includeFilters.stream()
.anyMatch(pattern -> pattern.matcher(cacheName).matches());

boolean notMatchesExclude = excludeFilters.stream()
.noneMatch(pattern -> pattern.matcher(cacheName).matches());

return matchesInclude && notMatchesExclude;
}

/** {@inheritDoc} */
@Override public void onCacheDestroy(Iterator<Integer> caches) {
caches.forEachRemaining(e -> {
// Just skip. Handle of cache events not supported.
});
caches.forEachRemaining(this::deleteRegexpCacheIfPresent);
}

/**
* Removes cache added by regexp from cache list, if this cache is present in file, to prevent file size overflow.
*
* @param cacheId Cache id.
*/
private void deleteRegexpCacheIfPresent(Integer cacheId) {
try {
List<String> caches = loadCaches();

Optional<String> cacheName = caches.stream()
.filter(name -> CU.cacheId(name) == cacheId)
.findAny();

if (cacheName.isPresent()) {
String name = cacheName.get();

caches.remove(name);

save(caches);

if (log.isInfoEnabled())
log.info("Cache has been removed from replication [cacheName=" + name + ']');
}
}
catch (IOException e) {
throw new IgniteException(e);
}
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -238,6 +416,30 @@ public AbstractIgniteCdcStreamer setCaches(Set<String> caches) {
return this;
}

/**
* Sets include regex patterns that participate in CDC.
*
* @param includeTemplates Include regex templates
* @return {@code this} for chaining.
*/
public AbstractIgniteCdcStreamer setIncludeTemplates(Set<String> includeTemplates) {
this.includeTemplates = includeTemplates;

return this;
}

/**
* Sets exclude regex patterns that participate in CDC.
*
* @param excludeTemplates Exclude regex templates
* @return {@code this} for chaining.
*/
public AbstractIgniteCdcStreamer setExcludeTemplates(Set<String> excludeTemplates) {
this.excludeTemplates = excludeTemplates;

return this;
}

/**
* Sets maximum batch size that will be applied to destination cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.ignite.cdc;

import java.nio.file.Path;

import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
Expand Down Expand Up @@ -61,8 +63,8 @@ public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer impleme
private volatile boolean alive = true;

/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
super.start(mreg);
@Override public void start(MetricRegistry mreg, Path cdcDir) {
super.start(mreg, cdcDir);

if (log.isInfoEnabled())
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.ignite.cdc.conflictresolve;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;

import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteEx;
Expand Down Expand Up @@ -65,6 +68,12 @@ public class CacheVersionConflictResolverPluginProvider<C extends PluginConfigur
/** Custom conflict resolver. */
private CacheVersionConflictResolver resolver;

/** Include regex templates for cache names. */
private Set<String> includeTemplates = new HashSet<>();

/** Exclude regex templates for cache names. */
private Set<String> excludeTemplates = new HashSet<>();

/** Log. */
private IgniteLogger log;

Expand Down Expand Up @@ -98,7 +107,7 @@ public CacheVersionConflictResolverPluginProvider() {
@Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
String cacheName = ctx.igniteCacheConfiguration().getName();

if (caches.contains(cacheName)) {
if (caches.contains(cacheName) || matchesFilters(cacheName)) {
log.info("ConflictResolver provider set for cache [cacheName=" + cacheName + ']');

return provider;
Expand Down Expand Up @@ -144,6 +153,16 @@ public void setConflictResolver(CacheVersionConflictResolver resolver) {
this.resolver = resolver;
}

/** @param includeTemplates Include regex templates */
public void setIncludeTemplates(Set<String> includeTemplates) {
this.includeTemplates = includeTemplates;
}

/** @param excludeTemplates Exclude regex templates */
public void setExcludeTemplates(Set<String> excludeTemplates) {
this.excludeTemplates = excludeTemplates;
}

/** {@inheritDoc} */
@Override public void start(PluginContext ctx) {
((IgniteEx)ctx.grid()).context().cache().context().versions().dataCenterId(clusterId);
Expand Down Expand Up @@ -178,4 +197,21 @@ public void setConflictResolver(CacheVersionConflictResolver resolver) {
@Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) {
return null;
}

/**
* Match cache name with regex patterns.
*
* @param cacheName Cache name.
*/
private boolean matchesFilters(String cacheName) {
boolean matchesInclude = includeTemplates.stream()
.map(Pattern::compile)
.anyMatch(pattern -> pattern.matcher(cacheName).matches());

boolean notMatchesExclude = excludeTemplates.stream()
.map(Pattern::compile)
.noneMatch(pattern -> pattern.matcher(cacheName).matches());

return matchesInclude && notMatchesExclude;
}
}
Loading