Skip to content

Commit

Permalink
pageserver: shard splitting (#6379)
Browse files Browse the repository at this point in the history
## Problem

One doesn't know at tenant creation time how large the tenant will grow.
We need to be able to dynamically adjust the shard count at runtime.
This is implemented as "splitting" of shards into smaller child shards,
which cover a subset of the keyspace that the parent covered.

Refer to RFC: #6358

Part of epic: #6278

## Summary of changes

This PR implements the happy path (does not cleanly recover from a crash
mid-split, although won't lose any data), without any optimizations
(e.g. child shards re-download their own copies of layers that the
parent shard already had on local disk)

- Add `/v1/tenant/:tenant_shard_id/shard_split` API to pageserver: this
copies the shard's index to the child shards' paths, instantiates child
`Tenant` object, and tears down parent `Tenant` object.
- Add `splitting` column to `tenant_shards` table. This is written into
an existing migration because we haven't deployed yet, so don't need to
cleanly upgrade.
- Add `/control/v1/tenant/:tenant_id/shard_split` API to
attachment_service,
- Add `test_sharding_split_smoke` test. This covers the happy path:
future PRs will add tests that exercise failure cases.
  • Loading branch information
jcsp authored Feb 8, 2024
1 parent 43eae17 commit af91a28
Showing 19 changed files with 1,088 additions and 24 deletions.
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -100,6 +100,11 @@ RUN mkdir -p /data/.neon/ && chown -R neon:neon /data/.neon/ \
-c "listen_pg_addr='0.0.0.0:6400'" \
-c "listen_http_addr='0.0.0.0:9898'"

# When running a binary that links with libpq, default to using our most recent postgres version. Binaries
# that want a particular postgres version will select it explicitly: this is just a default.
ENV LD_LIBRARY_PATH /usr/local/v16/lib


VOLUME ["/data"]
USER neon
EXPOSE 6400
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ CREATE TABLE tenant_shards (
generation INTEGER NOT NULL,
generation_pageserver BIGINT NOT NULL,
placement_policy VARCHAR NOT NULL,
splitting SMALLINT NOT NULL,
-- config is JSON encoded, opaque to the database.
config TEXT NOT NULL
);
19 changes: 18 additions & 1 deletion control_plane/attachment_service/src/http.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,8 @@ use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
use hyper::{Body, Request, Response};
use hyper::{StatusCode, Uri};
use pageserver_api::models::{
TenantCreateRequest, TenantLocationConfigRequest, TimelineCreateRequest,
TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
@@ -292,6 +293,19 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::OK, state.service.node_configure(config_req)?)
}

async fn handle_tenant_shard_split(
service: Arc<Service>,
mut req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;

json_response(
StatusCode::OK,
service.tenant_shard_split(tenant_id, split_req).await?,
)
}

async fn handle_tenant_shard_migrate(
service: Arc<Service>,
mut req: Request<Body>,
@@ -391,6 +405,9 @@ pub fn make_router(
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
tenant_service_handler(r, handle_tenant_shard_migrate)
})
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
tenant_service_handler(r, handle_tenant_shard_split)
})
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
102 changes: 94 additions & 8 deletions control_plane/attachment_service/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub(crate) mod split_state;
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;

use self::split_state::SplitState;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
@@ -363,19 +365,101 @@ impl Persistence {
Ok(())
}

// TODO: when we start shard splitting, we must durably mark the tenant so that
// on restart, we know that we must go through recovery (list shards that exist
// and pick up where we left off and/or revert to parent shards).
// When we start shard splitting, we must durably mark the tenant so that
// on restart, we know that we must go through recovery.
//
// We create the child shards here, so that they will be available for increment_generation calls
// if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
#[allow(dead_code)]
pub(crate) async fn begin_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> {
todo!();
pub(crate) async fn begin_shard_split(
&self,
old_shard_count: ShardCount,
split_tenant_id: TenantId,
parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
conn.transaction(|conn| -> DatabaseResult<()> {
// Mark parent shards as splitting
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(old_shard_count.0 as i32))
.set((splitting.eq(1),))
.execute(conn)?;
if ShardCount(updated.try_into().map_err(|_| DatabaseError::Logical(format!("Overflow existing shard count {} while splitting", updated)))?) != old_shard_count {
// Perhaps a deletion or another split raced with this attempt to split, mutating
// the parent shards that we intend to split. In this case the split request should fail.
return Err(DatabaseError::Logical(
format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {old_shard_count:?})")
));
}

// FIXME: spurious clone to sidestep closure move rules
let parent_to_children = parent_to_children.clone();

// Insert child shards
for (parent_shard_id, children) in parent_to_children {
let mut parent = crate::schema::tenant_shards::table
.filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(parent_shard_id.shard_count.0 as i32))
.load::<TenantShardPersistence>(conn)?;
let parent = if parent.len() != 1 {
return Err(DatabaseError::Logical(format!(
"Parent shard {parent_shard_id} not found"
)));
} else {
parent.pop().unwrap()
};
for mut shard in children {
// Carry the parent's generation into the child
shard.generation = parent.generation;

debug_assert!(shard.splitting == SplitState::Splitting);
diesel::insert_into(tenant_shards)
.values(shard)
.execute(conn)?;
}
}

Ok(())
})?;

Ok(())
})
.await
}

// TODO: when we finish shard splitting, we must atomically clean up the old shards
// When we finish shard splitting, we must atomically clean up the old shards
// and insert the new shards, and clear the splitting marker.
#[allow(dead_code)]
pub(crate) async fn complete_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> {
todo!();
pub(crate) async fn complete_shard_split(
&self,
split_tenant_id: TenantId,
old_shard_count: ShardCount,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
conn.transaction(|conn| -> QueryResult<()> {
// Drop parent shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(old_shard_count.0 as i32))
.execute(conn)?;

// Clear sharding flag
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.set((splitting.eq(0),))
.execute(conn)?;
debug_assert!(updated > 0);

Ok(())
})?;

Ok(())
})
.await
}
}

@@ -403,6 +487,8 @@ pub(crate) struct TenantShardPersistence {
#[serde(default)]
pub(crate) placement_policy: String,
#[serde(default)]
pub(crate) splitting: SplitState,
#[serde(default)]
pub(crate) config: String,
}

46 changes: 46 additions & 0 deletions control_plane/attachment_service/src/persistence/split_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use diesel::pg::{Pg, PgValue};
use diesel::{
deserialize::FromSql, deserialize::FromSqlRow, expression::AsExpression, serialize::ToSql,
sql_types::Int2,
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, FromSqlRow, AsExpression)]
#[diesel(sql_type = SplitStateSQLRepr)]
#[derive(Deserialize, Serialize)]
pub enum SplitState {
Idle = 0,
Splitting = 1,
}

impl Default for SplitState {
fn default() -> Self {
Self::Idle
}
}

type SplitStateSQLRepr = Int2;

impl ToSql<SplitStateSQLRepr, Pg> for SplitState {
fn to_sql<'a>(
&'a self,
out: &'a mut diesel::serialize::Output<Pg>,
) -> diesel::serialize::Result {
let raw_value: i16 = *self as i16;
let mut new_out = out.reborrow();
ToSql::<SplitStateSQLRepr, Pg>::to_sql(&raw_value, &mut new_out)
}
}

impl FromSql<SplitStateSQLRepr, Pg> for SplitState {
fn from_sql(pg_value: PgValue) -> diesel::deserialize::Result<Self> {
match FromSql::<SplitStateSQLRepr, Pg>::from_sql(pg_value).map(|v| match v {
0 => Some(Self::Idle),
1 => Some(Self::Splitting),
_ => None,
})? {
Some(v) => Ok(v),
None => Err(format!("Invalid SplitState value, was: {:?}", pg_value.as_bytes()).into()),
}
}
}
1 change: 1 addition & 0 deletions control_plane/attachment_service/src/schema.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ diesel::table! {
generation -> Int4,
generation_pageserver -> Int8,
placement_policy -> Varchar,
splitting -> Int2,
config -> Text,
}
}
Loading

1 comment on commit af91a28

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2475 tests run: 2351 passed, 0 failed, 124 skipped (full report)


Code coverage (full report)

  • functions: 54.3% (11471 of 21106 functions)
  • lines: 81.6% (64431 of 78972 lines)

The comment gets automatically updated with the latest test results
af91a28 at 2024-02-08T17:12:00.826Z :recycle:

Please sign in to comment.