Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add cassandra file on disk. #64

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -919,6 +921,18 @@ private int read(IntSupplier supplier) {

@Override
public Optional<InputStream> readBinaryData(String nodeId, String name) {
final Optional<InputStream> inputStream = readBinaryDataFromColumns(nodeId, name);
if (inputStream.isPresent() || config.getRootDir() == null) {
return inputStream;
}
final Path resolve = config.getRootDir().resolve(fileSystemName).resolve(nodeId).resolve(name);
if (Files.exists(resolve)) {
return Optional.of(new CassandraDiskFileInputStream(resolve.toFile()));
}
return Optional.empty();
}

private Optional<InputStream> readBinaryDataFromColumns(String nodeId, String name) {
UUID nodeUuid = checkNodeId(nodeId);
Objects.requireNonNull(name);

Expand Down Expand Up @@ -1040,6 +1054,9 @@ public OutputStream writeBinaryData(String nodeId, String name) {
// flush buffer to keep change order
changeBuffer.flush();
pushEvent(new NodeDataUpdated(nodeId, name), APPSTORAGE_NODE_TOPIC);
if (config.getRootDir() != null) {
return new CassandraDiskFileOutputStream(config.getRootDir(), fileSystemName, nodeId, name);
}
return new BinaryDataOutputStream(nodeUuid, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import com.powsybl.commons.config.ModuleConfig;
import com.powsybl.commons.config.PlatformConfig;

import java.nio.file.Path;
import java.util.Objects;

/**
* @author Geoffroy Jamgotchian <geoffroy.jamgotchian at rte-france.com>
*/
Expand All @@ -30,6 +33,8 @@ public class CassandraAppStorageConfig {

private int binaryDataChunkSize;

private FileOnDiskConfig fileOnDiskConfig;

public static CassandraAppStorageConfig load() {
return load(PlatformConfig.defaultConfig());
}
Expand All @@ -40,16 +45,24 @@ public static CassandraAppStorageConfig load(PlatformConfig platformConfig) {
int doubleQueryPartitionSize = DEFAULT_DOUBLE_QUERY_PARTITION_SIZE;
int stringQueryPartitionSize = DEFAULT_STRING_QUERY_PARTITION_SIZE;
int binaryDataChunkSize = DEFAULT_BINARY_DATA_CHUNK_SIZE;
boolean fileOnDisk = false;
FileOnDiskConfig fodConfig = null;
ModuleConfig moduleConfig = platformConfig.getOptionalModuleConfig("cassandra-app-storage").orElse(null);
if (moduleConfig != null) {
flushMaximumChange = moduleConfig.getIntProperty("flush-maximum-change", DEFAULT_FLUSH_MAXIMUM_CHANGE);
flushMaximumSize = moduleConfig.getLongProperty("flush-maximum-size", DEFAULT_FLUSH_MAXIMUM_SIZE);
doubleQueryPartitionSize = moduleConfig.getIntProperty("double-query-partition-size", DEFAULT_DOUBLE_QUERY_PARTITION_SIZE);
stringQueryPartitionSize = moduleConfig.getIntProperty("string-query-partition-size", DEFAULT_STRING_QUERY_PARTITION_SIZE);
binaryDataChunkSize = moduleConfig.getIntProperty("binary-data-chunk-size", DEFAULT_BINARY_DATA_CHUNK_SIZE);
fileOnDisk = moduleConfig.getBooleanProperty("file-on-disk", false);
if (fileOnDisk) {
final ModuleConfig fodModuleConfig = platformConfig.getModuleConfig("cassandra-file-on-disk");
final Path fodRootDir = fodModuleConfig.getPathProperty("root-dir");
fodConfig = new FileOnDiskConfig(fodRootDir);
}
}
return new CassandraAppStorageConfig(flushMaximumChange, flushMaximumSize, doubleQueryPartitionSize,
stringQueryPartitionSize, binaryDataChunkSize);
stringQueryPartitionSize, binaryDataChunkSize, fodConfig);
}

private static int checkFlushMaximumChange(int flushMaximumChange) {
Expand Down Expand Up @@ -82,16 +95,17 @@ private static int checkBinaryDataChunkSize(int binaryDataChunkSize) {

public CassandraAppStorageConfig() {
this(DEFAULT_FLUSH_MAXIMUM_CHANGE, DEFAULT_FLUSH_MAXIMUM_SIZE, DEFAULT_DOUBLE_QUERY_PARTITION_SIZE,
DEFAULT_STRING_QUERY_PARTITION_SIZE, DEFAULT_BINARY_DATA_CHUNK_SIZE);
DEFAULT_STRING_QUERY_PARTITION_SIZE, DEFAULT_BINARY_DATA_CHUNK_SIZE, null);
}

public CassandraAppStorageConfig(int flushMaximumChange, long flushMaximumSize, int doubleQueryPartitionSize,
int stringQueryPartitionSize, int binaryDataChunkSize) {
int stringQueryPartitionSize, int binaryDataChunkSize, FileOnDiskConfig fileOnDiskConfig) {
this.flushMaximumChange = checkFlushMaximumChange(flushMaximumChange);
this.flushMaximumSize = checkFlushMaximumSize(flushMaximumSize);
this.doubleQueryPartitionSize = checkQueryPartitionSize(doubleQueryPartitionSize);
this.stringQueryPartitionSize = checkQueryPartitionSize(stringQueryPartitionSize);
this.binaryDataChunkSize = checkBinaryDataChunkSize(binaryDataChunkSize);
this.fileOnDiskConfig = fileOnDiskConfig;
}

public int getFlushMaximumChange() {
Expand Down Expand Up @@ -138,4 +152,20 @@ public CassandraAppStorageConfig setBinaryDataChunkSize(int binaryDataChunkSize)
this.binaryDataChunkSize = checkBinaryDataChunkSize(binaryDataChunkSize);
return this;
}

static class FileOnDiskConfig {

private final Path rootDir;

FileOnDiskConfig(Path rootDir) {
this.rootDir = Objects.requireNonNull(rootDir);
}
}

public Path getRootDir() {
if (fileOnDiskConfig == null) {
return null;
}
return fileOnDiskConfig.rootDir;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) 2020, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.powsybl.afs.cassandra;

import java.io.*;

/**
* @author Yichen TANG <yichen.tang at rte-france.com>
*/
public class CassandraDiskFileInputStream extends InputStream {

private final FileInputStream fis;

CassandraDiskFileInputStream(File inputFile) {
try {
fis = new FileInputStream(inputFile);
} catch (FileNotFoundException e) {
throw new UncheckedIOException(e);
}
}

@Override
public int read() throws IOException {
return fis.read();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright (c) 2020, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.powsybl.afs.cassandra;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;

/**
* @author Yichen TANG <yichen.tang at rte-france.com>
*/
class CassandraDiskFileOutputStream extends OutputStream {

private final FileOutputStream fos;

CassandraDiskFileOutputStream(Path rootDir, String fileSystemName, String nodeId, String name) {
final Path resolve = rootDir.resolve(fileSystemName).resolve(nodeId);
try {
Files.createDirectories(resolve);
this.fos = new FileOutputStream(resolve.resolve(name).toFile());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void write(int b) throws IOException {
fos.write(b);
}
}