Skip to content

Commit

Permalink
IGNITE-22530 Add atomic write to caches file
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgarrish committed Nov 13, 2024
1 parent 7d30f0e commit 0218957
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -46,6 +44,8 @@
import org.apache.ignite.metric.MetricRegistry;
import org.apache.ignite.resources.LoggerResource;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;

/**
Expand Down Expand Up @@ -81,6 +81,9 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
/** File with saved names of caches added by cache masks. */
private static final String SAVED_CACHES_FILE = "caches";

/** Temporary file with saved names of caches added by cache masks. */
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";

/** CDC directory path. */
private Path cdcDir;

Expand Down Expand Up @@ -193,7 +196,7 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {

/**
* Finds match between cache name and user's regex templates.
* If match found, adds this cache's id to id's list and saves cache name to file.
* If match is found, adds this cache's id to id's list and saves cache name to file.
*
* @param cacheName Cache name.
*/
Expand All @@ -204,7 +207,11 @@ private void matchWithRegexTemplates(String cacheName) {
cachesIds.add(cacheId);

try {
saveCache(cacheName);
List<String> caches = loadCaches();

caches.add(cacheName);

save(caches);
}
catch (IOException e) {
throw new IgniteException(e);
Expand All @@ -216,18 +223,28 @@ private void matchWithRegexTemplates(String cacheName) {
}

/**
* Writes cache name to file
* Writes caches list to file
*
* @param cacheName Cache name.
* @param caches Caches list.
*/
private void saveCache(String cacheName) throws IOException {
if (cdcDir != null) {
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
private void save(List<String> caches) throws IOException {
if (cdcDir == null) {
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
}
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);

String cn = cacheName + '\n';
StringBuilder cacheList = new StringBuilder();

Files.write(savedCachesPath, cn.getBytes(), StandardOpenOption.APPEND);
for (String cache : caches) {
cacheList.append(cache);

cacheList.append('\n');
}

Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());

Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
}

/**
Expand All @@ -236,19 +253,19 @@ private void saveCache(String cacheName) throws IOException {
* @return List of saved caches names.
*/
private List<String> loadCaches() throws IOException {
if (cdcDir != null) {
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

if (Files.notExists(savedCachesPath)) {
Files.createFile(savedCachesPath);
if (cdcDir == null) {
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
}
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

if (log.isInfoEnabled())
log.info("Cache list created: " + savedCachesPath);
}
if (Files.notExists(savedCachesPath)) {
Files.createFile(savedCachesPath);

return Files.readAllLines(savedCachesPath);
if (log.isInfoEnabled())
log.info("Cache list created: " + savedCachesPath);
}
return Collections.emptyList();

return Files.readAllLines(savedCachesPath);
}

/**
Expand Down Expand Up @@ -305,17 +322,7 @@ private void deleteRegexpCacheIfPresent(Integer cacheId) {

caches.remove(name);

Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

StringBuilder cacheList = new StringBuilder();

for (String cache : caches) {
cacheList.append(cache);

cacheList.append('\n');
}

Files.write(savedCachesPath, cacheList.toString().getBytes());
save(caches);

if (log.isInfoEnabled())
log.info("Cache has been removed from replication [cacheName=" + name + ']');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -66,6 +64,8 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerSerializer;

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
Expand Down Expand Up @@ -163,6 +163,9 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** File with saved names of caches added by cache masks. */
private static final String SAVED_CACHES_FILE = "caches";

/** Temporary file with saved names of caches added by cache masks. */
private static final String SAVED_CACHES_TMP_FILE = "caches_tmp";

/** CDC directory path. */
private Path cdcDir;

Expand Down Expand Up @@ -304,17 +307,7 @@ private void deleteRegexpCacheIfPresent(Integer cacheId) {

caches.remove(name);

Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

StringBuilder cacheList = new StringBuilder();

for (String cache : caches) {
cacheList.append(cache);

cacheList.append('\n');
}

Files.write(savedCachesPath, cacheList.toString().getBytes());
save(caches);

if (log.isInfoEnabled())
log.info("Cache has been removed from replication [cacheName=" + name + ']');
Expand Down Expand Up @@ -481,19 +474,19 @@ private void prepareRegexFilters() {
* @return List of saved caches names.
*/
private List<String> loadCaches() throws IOException {
if (cdcDir != null) {
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

if (Files.notExists(savedCachesPath)) {
Files.createFile(savedCachesPath);
if (cdcDir == null) {
throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
}
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);

if (log.isInfoEnabled())
log.info("Cache list created: " + savedCachesPath);
}
if (Files.notExists(savedCachesPath)) {
Files.createFile(savedCachesPath);

return Files.readAllLines(savedCachesPath);
if (log.isInfoEnabled())
log.info("Cache list created: " + savedCachesPath);
}
return Collections.emptyList();

return Files.readAllLines(savedCachesPath);
}

/**
Expand All @@ -514,7 +507,7 @@ private boolean matchesFilters(String cacheName) {

/**
* Finds match between cache name and user's regex templates.
* If match found, adds this cache's id to id's list and saves cache name to file.
* If match is found, adds this cache's id to id's list and saves cache name to file.
*
* @param cacheName Cache name.
*/
Expand All @@ -525,7 +518,11 @@ private void matchWithRegexTemplates(String cacheName) {
cachesIds.add(cacheId);

try {
saveCache(cacheName);
List<String> caches = loadCaches();

caches.add(cacheName);

save(caches);
}
catch (IOException e) {
throw new IgniteException(e);
Expand All @@ -537,18 +534,28 @@ private void matchWithRegexTemplates(String cacheName) {
}

/**
* Writes cache name to file.
* Writes caches list to file
*
* @param cacheName Cache name.
* @param caches Caches list.
*/
private void saveCache(String cacheName) throws IOException {
if (cdcDir != null) {
Path savedCaches = cdcDir.resolve(SAVED_CACHES_FILE);
private void save(List<String> caches) throws IOException {
if (cdcDir == null) {
throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null");
}
Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE);
Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE);

StringBuilder cacheList = new StringBuilder();

String cn = cacheName + '\n';
for (String cache : caches) {
cacheList.append(cache);

Files.write(savedCaches, cn.getBytes(), StandardOpenOption.APPEND);
cacheList.append('\n');
}

Files.write(tmpSavedCachesPath, cacheList.toString().getBytes());

Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
for (IgniteEx ex : srcCluster) {
int idx = getTestIgniteInstanceIndex(ex.name());

futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache, includeTemplates, excludeTemplates, "ignite-src-to-kafka-" + idx));
futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache, includeTemplates,
excludeTemplates, "ignite-src-to-kafka-" + idx));
}

for (int i = 0; i < destCluster.length; i++) {
Expand Down Expand Up @@ -149,15 +150,15 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
for (IgniteEx ex : srcCluster) {
int idx = getTestIgniteInstanceIndex(ex.name());

futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE, includeTemplates,
excludeTemplates, "ignite-src-to-kafka-" + idx));
futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE,
includeTemplates, excludeTemplates, "ignite-src-to-kafka-" + idx));
}

for (IgniteEx ex : destCluster) {
int idx = getTestIgniteInstanceIndex(ex.name());

futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE, includeTemplates,
excludeTemplates, "ignite-dest-to-kafka-" + idx));
futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE,
includeTemplates, excludeTemplates, "ignite-dest-to-kafka-" + idx));
}

futs.add(kafkaToIgnite(
Expand Down

0 comments on commit 0218957

Please sign in to comment.