Skip to content

Commit

Permalink
Bigtable: update google proto files and allow configuration of max_me…
Browse files Browse the repository at this point in the history
…ssage_size (solana-labs#34740)

* Update proto files with tonic-build v0.9.2

* Manually ignore invalid doc-tests

* Add new ReadRowsRequest fields

* Add LedgerStorageConfig::max_message_size and default value

* Add BigtableConnection::max_message_size and use on client creation

* Add max_message_size to RpcBigtableConfig and make const pub

* Add solana-validator cli arg
  • Loading branch information
Tyera authored Jan 11, 2024
1 parent 13b8d02 commit 166be29
Show file tree
Hide file tree
Showing 11 changed files with 1,777 additions and 116 deletions.
1 change: 1 addition & 0 deletions ledger-tool/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ async fn get_bigtable(
credential_type: CredentialType::Filepath(Some(args.crediential_path.unwrap())),
instance_name: args.instance_name,
app_profile_id: args.app_profile_id,
max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE,
},
)
.await
Expand Down
2 changes: 2 additions & 0 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ pub struct RpcBigtableConfig {
pub bigtable_instance_name: String,
pub bigtable_app_profile_id: String,
pub timeout: Option<Duration>,
pub max_message_size: usize,
}

impl Default for RpcBigtableConfig {
Expand All @@ -181,6 +182,7 @@ impl Default for RpcBigtableConfig {
bigtable_instance_name,
bigtable_app_profile_id,
timeout: None,
max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ impl JsonRpcService {
ref bigtable_instance_name,
ref bigtable_app_profile_id,
timeout,
max_message_size,
}) = config.rpc_bigtable_config
{
let bigtable_config = solana_storage_bigtable::LedgerStorageConfig {
Expand All @@ -414,6 +415,7 @@ impl JsonRpcService {
credential_type: CredentialType::Filepath(None),
instance_name: bigtable_instance_name.clone(),
app_profile_id: bigtable_app_profile_id.clone(),
max_message_size,
};
runtime
.block_on(solana_storage_bigtable::LedgerStorage::new_with_config(
Expand Down
1,047 changes: 972 additions & 75 deletions storage-bigtable/proto/google.api.rs

Large diffs are not rendered by default.

788 changes: 751 additions & 37 deletions storage-bigtable/proto/google.bigtable.v2.rs

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions storage-bigtable/proto/google.rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Status {
/// The status code, which should be an enum value of \[google.rpc.Code][google.rpc.Code\].
/// The status code, which should be an enum value of
/// \[google.rpc.Code][google.rpc.Code\].
#[prost(int32, tag = "1")]
pub code: i32,
/// A developer-facing error message, which should be in English. Any
/// user-facing error message should be localized and sent in the
/// \[google.rpc.Status.details][google.rpc.Status.details\] field, or localized by the client.
/// \[google.rpc.Status.details][google.rpc.Status.details\] field, or localized
/// by the client.
#[prost(string, tag = "2")]
pub message: ::prost::alloc::string::String,
/// A list of messages that carry the error details. There is a common set of
Expand Down
27 changes: 25 additions & 2 deletions storage-bigtable/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub struct BigTableConnection {
table_prefix: String,
app_profile_id: String,
timeout: Option<Duration>,
max_message_size: usize,
}

impl BigTableConnection {
Expand All @@ -141,11 +142,18 @@ impl BigTableConnection {
read_only: bool,
timeout: Option<Duration>,
credential_type: CredentialType,
max_message_size: usize,
) -> Result<Self> {
match std::env::var("BIGTABLE_EMULATOR_HOST") {
Ok(endpoint) => {
info!("Connecting to bigtable emulator at {}", endpoint);
Self::new_for_emulator(instance_name, app_profile_id, &endpoint, timeout)
Self::new_for_emulator(
instance_name,
app_profile_id,
&endpoint,
timeout,
max_message_size,
)
}

Err(_) => {
Expand Down Expand Up @@ -210,6 +218,7 @@ impl BigTableConnection {
table_prefix,
app_profile_id: app_profile_id.to_string(),
timeout,
max_message_size,
})
}
}
Expand All @@ -220,6 +229,7 @@ impl BigTableConnection {
app_profile_id: &str,
endpoint: &str,
timeout: Option<Duration>,
max_message_size: usize,
) -> Result<Self> {
Ok(Self {
access_token: None,
Expand All @@ -229,6 +239,7 @@ impl BigTableConnection {
table_prefix: format!("projects/emulator/instances/{instance_name}/tables/"),
app_profile_id: app_profile_id.to_string(),
timeout,
max_message_size,
})
}

Expand All @@ -254,7 +265,9 @@ impl BigTableConnection {
}
Ok(req)
},
);
)
.max_decoding_message_size(self.max_message_size)
.max_encoding_message_size(self.max_message_size);
BigTable {
access_token: self.access_token.clone(),
client,
Expand Down Expand Up @@ -469,6 +482,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
],
})),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
Expand All @@ -494,6 +509,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
filter: Some(RowFilter {
filter: Some(row_filter::Filter::StripValueTransformer(true)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
Expand Down Expand Up @@ -545,6 +562,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
Expand Down Expand Up @@ -577,6 +596,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
Expand Down Expand Up @@ -610,6 +631,8 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> BigTable<F> {
// Only return the latest version of each cell
filter: Some(row_filter::Filter::CellsPerColumnLimitFilter(1)),
}),
request_stats_view: 0,
reversed: false,
})
.await?
.into_inner();
Expand Down
6 changes: 6 additions & 0 deletions storage-bigtable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {

pub const DEFAULT_INSTANCE_NAME: &str = "solana-ledger";
pub const DEFAULT_APP_PROFILE_ID: &str = "default";
pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64MB

#[derive(Debug)]
pub enum CredentialType {
Expand All @@ -395,6 +396,7 @@ pub struct LedgerStorageConfig {
pub credential_type: CredentialType,
pub instance_name: String,
pub app_profile_id: String,
pub max_message_size: usize,
}

impl Default for LedgerStorageConfig {
Expand All @@ -405,6 +407,7 @@ impl Default for LedgerStorageConfig {
credential_type: CredentialType::Filepath(None),
instance_name: DEFAULT_INSTANCE_NAME.to_string(),
app_profile_id: DEFAULT_APP_PROFILE_ID.to_string(),
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
}
}
}
Expand Down Expand Up @@ -471,6 +474,7 @@ impl LedgerStorage {
app_profile_id,
endpoint,
timeout,
LedgerStorageConfig::default().max_message_size,
)?,
stats,
})
Expand All @@ -484,13 +488,15 @@ impl LedgerStorage {
instance_name,
app_profile_id,
credential_type,
max_message_size,
} = config;
let connection = bigtable::BigTableConnection::new(
instance_name.as_str(),
app_profile_id.as_str(),
read_only,
timeout,
credential_type,
max_message_size,
)
.await?;
Ok(Self { stats, connection })
Expand Down
1 change: 1 addition & 0 deletions validator/src/bin/solana-test-validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ fn main() {
String
),
timeout: None,
..RpcBigtableConfig::default()
})
} else {
None
Expand Down
12 changes: 12 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.default_value(&default_args.rpc_bigtable_app_profile_id)
.help("Bigtable application profile id to use in requests")
)
.arg(
Arg::with_name("rpc_bigtable_max_message_size")
.long("rpc-bigtable-max-message-size")
.value_name("BYTES")
.validator(is_parsable::<usize>)
.takes_value(true)
.default_value(&default_args.rpc_bigtable_max_message_size)
.help("Max encoding and decoding message size used in Bigtable Grpc client"),
)
.arg(
Arg::with_name("rpc_pubsub_worker_threads")
.long("rpc-pubsub-worker-threads")
Expand Down Expand Up @@ -1925,6 +1934,7 @@ pub struct DefaultArgs {
pub rpc_bigtable_timeout: String,
pub rpc_bigtable_instance_name: String,
pub rpc_bigtable_app_profile_id: String,
pub rpc_bigtable_max_message_size: String,
pub rpc_max_request_body_size: String,
pub rpc_pubsub_worker_threads: String,

Expand Down Expand Up @@ -2010,6 +2020,8 @@ impl DefaultArgs {
rpc_bigtable_instance_name: solana_storage_bigtable::DEFAULT_INSTANCE_NAME.to_string(),
rpc_bigtable_app_profile_id: solana_storage_bigtable::DEFAULT_APP_PROFILE_ID
.to_string(),
rpc_bigtable_max_message_size: solana_storage_bigtable::DEFAULT_MAX_MESSAGE_SIZE
.to_string(),
rpc_pubsub_worker_threads: "4".to_string(),
accountsdb_repl_threads: num_cpus::get().to_string(),
maximum_full_snapshot_archives_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN
Expand Down
1 change: 1 addition & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,7 @@ pub fn main() {
timeout: value_t!(matches, "rpc_bigtable_timeout", u64)
.ok()
.map(Duration::from_secs),
max_message_size: value_t_or_exit!(matches, "rpc_bigtable_max_message_size", usize),
})
} else {
None
Expand Down

0 comments on commit 166be29

Please sign in to comment.