From c5fe4106c49abd0632a2b28b6ecb9030bbb3a36a Mon Sep 17 00:00:00 2001 From: yichen88 Date: Fri, 11 Dec 2020 12:25:16 +0100 Subject: [PATCH] Add cassandra file on disk. Signed-off-by: yichen88 --- .../afs/cassandra/CassandraAppStorage.java | 17 +++++++++ .../cassandra/CassandraAppStorageConfig.java | 36 ++++++++++++++++-- .../CassandraDiskFileInputStream.java | 30 +++++++++++++++ .../CassandraDiskFileOutputStream.java | 37 +++++++++++++++++++ 4 files changed, 117 insertions(+), 3 deletions(-) create mode 100644 afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraDiskFileInputStream.java create mode 100644 afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraDiskFileOutputStream.java diff --git a/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorage.java b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorage.java index eb5367e5..e4cda97f 100644 --- a/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorage.java +++ b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorage.java @@ -22,6 +22,8 @@ import java.io.*; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.ZonedDateTime; import java.util.*; import java.util.concurrent.TimeUnit; @@ -919,6 +921,18 @@ private int read(IntSupplier supplier) { @Override public Optional readBinaryData(String nodeId, String name) { + final Optional inputStream = readBinaryDataFromColumns(nodeId, name); + if (inputStream.isPresent() || config.getRootDir() == null) { + return inputStream; + } + final Path resolve = config.getRootDir().resolve(fileSystemName).resolve(nodeId).resolve(name); + if (Files.exists(resolve)) { + return Optional.of(new CassandraDiskFileInputStream(resolve.toFile())); + } + return Optional.empty(); + } + + private Optional readBinaryDataFromColumns(String nodeId, String name) { UUID nodeUuid = checkNodeId(nodeId); Objects.requireNonNull(name); @@ -1040,6 +1054,9 @@ public OutputStream writeBinaryData(String nodeId, String name) { // flush buffer to keep change order changeBuffer.flush(); pushEvent(new NodeDataUpdated(nodeId, name), APPSTORAGE_NODE_TOPIC); + if (config.getRootDir() != null) { + return new CassandraDiskFileOutputStream(config.getRootDir(), fileSystemName, nodeId, name); + } return new BinaryDataOutputStream(nodeUuid, name); } diff --git a/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorageConfig.java b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorageConfig.java index e3ce2056..955d2d67 100644 --- a/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorageConfig.java +++ b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraAppStorageConfig.java @@ -9,6 +9,9 @@ import com.powsybl.commons.config.ModuleConfig; import com.powsybl.commons.config.PlatformConfig; +import java.nio.file.Path; +import java.util.Objects; + /** * @author Geoffroy Jamgotchian */ @@ -30,6 +33,8 @@ public class CassandraAppStorageConfig { private int binaryDataChunkSize; + private FileOnDiskConfig fileOnDiskConfig; + public static CassandraAppStorageConfig load() { return load(PlatformConfig.defaultConfig()); } @@ -40,6 +45,8 @@ public static CassandraAppStorageConfig load(PlatformConfig platformConfig) { int doubleQueryPartitionSize = DEFAULT_DOUBLE_QUERY_PARTITION_SIZE; int stringQueryPartitionSize = DEFAULT_STRING_QUERY_PARTITION_SIZE; int binaryDataChunkSize = DEFAULT_BINARY_DATA_CHUNK_SIZE; + boolean fileOnDisk = false; + FileOnDiskConfig fodConfig = null; ModuleConfig moduleConfig = platformConfig.getOptionalModuleConfig("cassandra-app-storage").orElse(null); if (moduleConfig != null) { flushMaximumChange = moduleConfig.getIntProperty("flush-maximum-change", DEFAULT_FLUSH_MAXIMUM_CHANGE); @@ -47,9 +54,15 @@ public static CassandraAppStorageConfig load(PlatformConfig platformConfig) { doubleQueryPartitionSize = moduleConfig.getIntProperty("double-query-partition-size", DEFAULT_DOUBLE_QUERY_PARTITION_SIZE); stringQueryPartitionSize = moduleConfig.getIntProperty("string-query-partition-size", DEFAULT_STRING_QUERY_PARTITION_SIZE); binaryDataChunkSize = moduleConfig.getIntProperty("binary-data-chunk-size", DEFAULT_BINARY_DATA_CHUNK_SIZE); + fileOnDisk = moduleConfig.getBooleanProperty("file-on-disk", false); + if (fileOnDisk) { + final ModuleConfig fodModuleConfig = platformConfig.getModuleConfig("cassandra-file-on-disk"); + final Path fodRootDir = fodModuleConfig.getPathProperty("root-dir"); + fodConfig = new FileOnDiskConfig(fodRootDir); + } } return new CassandraAppStorageConfig(flushMaximumChange, flushMaximumSize, doubleQueryPartitionSize, - stringQueryPartitionSize, binaryDataChunkSize); + stringQueryPartitionSize, binaryDataChunkSize, fodConfig); } private static int checkFlushMaximumChange(int flushMaximumChange) { @@ -82,16 +95,17 @@ private static int checkBinaryDataChunkSize(int binaryDataChunkSize) { public CassandraAppStorageConfig() { this(DEFAULT_FLUSH_MAXIMUM_CHANGE, DEFAULT_FLUSH_MAXIMUM_SIZE, DEFAULT_DOUBLE_QUERY_PARTITION_SIZE, - DEFAULT_STRING_QUERY_PARTITION_SIZE, DEFAULT_BINARY_DATA_CHUNK_SIZE); + DEFAULT_STRING_QUERY_PARTITION_SIZE, DEFAULT_BINARY_DATA_CHUNK_SIZE, null); } public CassandraAppStorageConfig(int flushMaximumChange, long flushMaximumSize, int doubleQueryPartitionSize, - int stringQueryPartitionSize, int binaryDataChunkSize) { + int stringQueryPartitionSize, int binaryDataChunkSize, FileOnDiskConfig fileOnDiskConfig) { this.flushMaximumChange = checkFlushMaximumChange(flushMaximumChange); this.flushMaximumSize = checkFlushMaximumSize(flushMaximumSize); this.doubleQueryPartitionSize = checkQueryPartitionSize(doubleQueryPartitionSize); this.stringQueryPartitionSize = checkQueryPartitionSize(stringQueryPartitionSize); this.binaryDataChunkSize = checkBinaryDataChunkSize(binaryDataChunkSize); + this.fileOnDiskConfig = fileOnDiskConfig; } public int getFlushMaximumChange() { @@ -138,4 +152,20 @@ public CassandraAppStorageConfig setBinaryDataChunkSize(int binaryDataChunkSize) this.binaryDataChunkSize = checkBinaryDataChunkSize(binaryDataChunkSize); return this; } + + static class FileOnDiskConfig { + + private final Path rootDir; + + FileOnDiskConfig(Path rootDir) { + this.rootDir = Objects.requireNonNull(rootDir); + } + } + + public Path getRootDir() { + if (fileOnDiskConfig == null) { + return null; + } + return fileOnDiskConfig.rootDir; + } } diff --git a/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraDiskFileInputStream.java b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraDiskFileInputStream.java new file mode 100644 index 00000000..725e4052 --- /dev/null +++ b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraDiskFileInputStream.java @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2020, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.powsybl.afs.cassandra; + +import java.io.*; + +/** + * @author Yichen TANG + */ +public class CassandraDiskFileInputStream extends InputStream { + + private final FileInputStream fis; + + CassandraDiskFileInputStream(File inputFile) { + try { + fis = new FileInputStream(inputFile); + } catch (FileNotFoundException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public int read() throws IOException { + return fis.read(); + } +} diff --git a/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraDiskFileOutputStream.java b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraDiskFileOutputStream.java new file mode 100644 index 00000000..e5c22580 --- /dev/null +++ b/afs-cassandra/src/main/java/com/powsybl/afs/cassandra/CassandraDiskFileOutputStream.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2020, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.powsybl.afs.cassandra; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; + +/** + * @author Yichen TANG + */ +class CassandraDiskFileOutputStream extends OutputStream { + + private final FileOutputStream fos; + + CassandraDiskFileOutputStream(Path rootDir, String fileSystemName, String nodeId, String name) { + final Path resolve = rootDir.resolve(fileSystemName).resolve(nodeId); + try { + Files.createDirectories(resolve); + this.fos = new FileOutputStream(resolve.resolve(name).toFile()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void write(int b) throws IOException { + fos.write(b); + } +}