Skip to content

Commit

Permalink
[WIP] Add gcs store
Browse files Browse the repository at this point in the history
  • Loading branch information
SchahinRohani committed Dec 8, 2024
1 parent d75d20d commit 24291e9
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 0 deletions.
19 changes: 19 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub enum StoreSpec {
///
memory(MemorySpec),

/// TODO(SchahinRohani): Add documentation.
experimental_gcs_store(GCSSpec),

/// S3 store will use Amazon's S3 service as a backend to store
/// the files. This configuration can be used to share files
/// across multiple instances.
Expand Down Expand Up @@ -724,6 +727,22 @@ pub struct EvictionPolicy {
pub max_count: u64,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[serde(deny_unknown_fields)]
pub struct GCSSpec {
/// Google Cloud Storage region.
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
pub region: String,

/// If you wish to prefix the location on Google cloud storage. If None, no prefix will be used.
#[serde(default)]
pub key_prefix: Option<String>,

/// Bucket name to use as the backend.
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
pub bucket: String,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[serde(deny_unknown_fields)]
pub struct S3Spec {
Expand Down
2 changes: 2 additions & 0 deletions nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::dedup_store::DedupStore;
use crate::existence_cache_store::ExistenceCacheStore;
use crate::fast_slow_store::FastSlowStore;
use crate::filesystem_store::FilesystemStore;
use crate::gcs_store::GCSStore;
use crate::grpc_store::GrpcStore;
use crate::memory_store::MemoryStore;
use crate::noop_store::NoopStore;
Expand All @@ -51,6 +52,7 @@ pub fn store_factory<'a>(
let store: Arc<dyn StoreDriver> = match backend {
StoreSpec::memory(spec) => MemoryStore::new(spec),
StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?,
StoreSpec::experimental_gcs_store(spec) => GCSStore::new(spec, SystemTime::now).await?,
StoreSpec::redis_store(spec) => RedisStore::new(spec.clone())?,
StoreSpec::verify(spec) => VerifyStore::new(
spec,
Expand Down
91 changes: 91 additions & 0 deletions nativelink-store/src/gcs_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;
use std::time::SystemTime;

use async_trait::async_trait;
// use googleapis_tonic_google_storage_v2::google::storage::v2::storage_client::StorageClient;
use nativelink_config::stores::GCSSpec;
use nativelink_error::{make_err, Code, Error}; //ResultExt
use nativelink_metric::{
MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent,
};
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};
// use tonic::transport::Channel;

/// Represents a Google Cloud Platform (GCP) Store.
#[derive(Default)]
pub struct GCSStore {
// spec: GCPSpec,
}

impl MetricsComponent for GCSStore {
fn publish(
&self,
_kind: MetricKind,
_field_metadata: MetricFieldData,
) -> Result<MetricPublishKnownKindData, nativelink_metric::Error> {
Ok(MetricPublishKnownKindData::Component)
}
}

impl GCSStore {
pub async fn new(spec: &GCSSpec, current_time: fn() -> SystemTime) -> Result<Arc<Self>, Error> {
// Print the spec
println!("GCSStore spec: {spec:?}");

// Get and print the current time
let now = current_time();
println!("Current time: {now:?}");

Ok(Arc::new(Self {}))
}
}

#[async_trait]
impl StoreDriver for GCSStore {
async fn has_with_results(
self: Pin<&Self>,
_keys: &[StoreKey<'_>],
_results: &mut [Option<u64>],
) -> Result<(), Error> {
// results.iter_mut().for_each(|r| *r = None);
Ok(())
}

async fn update(
self: Pin<&Self>,
_key: StoreKey<'_>,
mut _reader: DropCloserReadHalf,
_size_info: UploadSizeInfo,
) -> Result<(), Error> {
// reader.drain().await.err_tip(|| "In GCPStore::update")?;
Ok(())
}

async fn get_part(
self: Pin<&Self>,
_key: StoreKey<'_>,
_writer: &mut DropCloserWriteHalf,
_offset: u64,
_length: Option<u64>,
) -> Result<(), Error> {
Err(make_err!(Code::NotFound, "Not found in GCP store"))
}

fn inner_store(&self, _key: Option<StoreKey>) -> &dyn StoreDriver {
self
}

fn as_any<'a>(&'a self) -> &'a (dyn std::any::Any + Sync + Send + 'static) {
self
}

fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Sync + Send + 'static> {
self
}
}

default_health_status_indicator!(GCSStore);
1 change: 1 addition & 0 deletions nativelink-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod default_store_factory;
pub mod existence_cache_store;
pub mod fast_slow_store;
pub mod filesystem_store;
pub mod gcs_store;
pub mod grpc_store;
pub mod memory_store;
pub mod noop_store;
Expand Down

0 comments on commit 24291e9

Please sign in to comment.