Skip to content

Commit

Permalink
fix: use a single Storage
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Nov 21, 2023
1 parent 224ecb6 commit 56b2b2c
Showing 1 changed file with 14 additions and 17 deletions.
31 changes: 14 additions & 17 deletions src/main/java/io/kestra/storage/gcs/GcsStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@
@Introspected
public class GcsStorage implements StorageInterface {
@Inject
GcsClientFactory factory;
Storage storage;

@Inject
GcsConfig config;

private Storage client() {
return factory.of(config);
}

private BlobId blob(String tenantId, URI uri) {
String path = getPath(tenantId, uri);
Expand Down Expand Up @@ -76,7 +73,7 @@ private void parentTraversalGuard(URI uri) {
@Override
public InputStream get(String tenantId, URI uri) throws IOException {
try {
Blob blob = this.client().get(this.blob(tenantId, URI.create(uri.getPath())));
Blob blob = this.storage.get(this.blob(tenantId, URI.create(uri.getPath())));

if (blob == null || !blob.exists()) {
throw new FileNotFoundException(uri + " (File not found)");
Expand All @@ -93,7 +90,7 @@ public InputStream get(String tenantId, URI uri) throws IOException {
public List<FileAttributes> list(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
String prefix = (path.endsWith("/")) ? path : path + "/";
Page<Blob> blobs = this.client().list(config.bucket, Storage.BlobListOption.prefix(prefix),
Page<Blob> blobs = this.storage.list(config.bucket, Storage.BlobListOption.prefix(prefix),
Storage.BlobListOption.currentDirectory());
List<FileAttributes> list = blobs.streamAll()
.filter(blob -> {
Expand All @@ -113,7 +110,7 @@ public List<FileAttributes> list(String tenantId, URI uri) throws IOException {
@Override
public boolean exists(String tenantId, URI uri) {
try {
Blob blob = this.client().get(this.blob(tenantId, URI.create(uri.getPath())));
Blob blob = this.storage.get(this.blob(tenantId, URI.create(uri.getPath())));
return blob != null && blob.exists();
} catch (StorageException e) {
return false;
Expand All @@ -123,7 +120,7 @@ public boolean exists(String tenantId, URI uri) {
@Override
public Long size(String tenantId,URI uri) throws IOException {
try {
Blob blob = this.client().get(this.blob(tenantId, URI.create(uri.getPath())));
Blob blob = this.storage.get(this.blob(tenantId, URI.create(uri.getPath())));

if (blob == null || !blob.exists()) {
throw new FileNotFoundException(uri + " (File not found)");
Expand All @@ -138,7 +135,7 @@ public Long size(String tenantId,URI uri) throws IOException {
@Override
public Long lastModifiedTime(String tenantId,URI uri) throws IOException {
try {
Blob blob = this.client().get(this.blob(tenantId, URI.create(uri.getPath())));
Blob blob = this.storage.get(this.blob(tenantId, URI.create(uri.getPath())));

if (blob == null || !blob.exists()) {
throw new FileNotFoundException(uri + " (File not found)");
Expand All @@ -156,7 +153,7 @@ public FileAttributes getAttributes(String tenantId, URI uri) throws IOException
if (!exists(tenantId, uri)) {
path = path + "/";
}
Blob blob = this.client().get(this.blob(path));
Blob blob = this.storage.get(this.blob(path));
if (blob == null) {
throw new FileNotFoundException("%s not found.".formatted(uri));
}
Expand All @@ -183,7 +180,7 @@ public URI put(String tenantId, URI uri, InputStream data) throws IOException {
.newBuilder(this.blob(tenantId, uri))
.build();

try (WriteChannel writer = this.client().writer(blobInfo)) {
try (WriteChannel writer = this.storage.writer(blobInfo)) {
byte[] buffer = new byte[10_240];

int limit;
Expand All @@ -210,7 +207,7 @@ private void mkdirs(String path) {
BlobInfo blobInfo = BlobInfo
.newBuilder(this.blob(aggregatedPath.toString()))
.build();
this.client().create(blobInfo);
this.storage.create(blobInfo);
}
}

Expand All @@ -231,7 +228,7 @@ public URI createDirectory(String tenantId, URI uri) {
@Override
public URI move(String tenantId, URI from, URI to) throws IOException {
String path = getPath(tenantId, from);
StorageBatch batch = this.client().batch();
StorageBatch batch = this.storage.batch();

if (getAttributes(tenantId, from).getType() == FileAttributes.FileType.File) {
// move just a file
Expand All @@ -242,7 +239,7 @@ public URI move(String tenantId, URI from, URI to) throws IOException {
// move directories
String prefix = (!path.endsWith("/")) ? path + "/" : path;

Page<Blob> list = client().list(config.bucket, Storage.BlobListOption.prefix(prefix));
Page<Blob> list = this.storage.list(config.bucket, Storage.BlobListOption.prefix(prefix));
list.streamAll().forEach(blob -> {
BlobId target = blob(getPath(tenantId, to) + "/" + blob.getName().substring(prefix.length()));
moveFile(blob.getBlobId(), target, batch);
Expand All @@ -253,19 +250,19 @@ public URI move(String tenantId, URI from, URI to) throws IOException {
}

private void moveFile(BlobId source, BlobId target, StorageBatch batch) {
client().copy(Storage.CopyRequest.newBuilder().setSource(source).setTarget(target).build());
this.storage.copy(Storage.CopyRequest.newBuilder().setSource(source).setTarget(target).build());
batch.delete(source);
}

@Override
public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException {
try {
StorageBatch batch = this.client().batch();
StorageBatch batch = this.storage.batch();
Map<URI, StorageBatchResult<Boolean>> results = new HashMap<>();

String prefix = getPath(tenantId, storagePrefix);

Page<Blob> blobs = this.client()
Page<Blob> blobs = this.storage
.list(this.config.getBucket(),
Storage.BlobListOption.prefix(prefix)
);
Expand Down

0 comments on commit 56b2b2c

Please sign in to comment.