Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into expose_connection
Browse files Browse the repository at this point in the history
  • Loading branch information
KABBOUCHI committed Sep 1, 2023
2 parents 5e02e6e + f0be3b6 commit d05f2c7
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 49 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ensemble/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rbs = "4.3.3"
sha256 = "1.4.0"
tokio = "1.32.0"
serde = "1.0.183"
tracing = "0.1.37"
fastdate = "0.1.39"
itertools = "0.11.0"
Inflector = "0.11.4"
Expand Down
59 changes: 40 additions & 19 deletions ensemble/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ impl Builder {

let model = M::default();
for relation in self.eager_load {
tracing::trace!(
"Eager loading {relation} relation for {} models",
models.len()
);

let rows = model
.eager_load(&relation, models.iter().collect::<Vec<&M>>().as_slice())
.get_rows()
Expand Down Expand Up @@ -312,16 +317,20 @@ impl Builder {
let mut conn = connection::get().await?;
let values: Vec<(String, Value)> = columns.into().0;

let (sql, bindings) = (
format!(
"INSERT INTO {} ({}) VALUES ({})",
self.table,
values.iter().map(|(column, _)| column).join(", "),
values.iter().map(|_| "?").join(", ")
),
values.into_iter().map(|(_, value)| value).collect(),
);

tracing::debug!(sql = sql.as_str(), bindings = ?bindings, "Executing INSERT SQL query");

let result = conn
.exec(
&format!(
"INSERT INTO {} ({}) VALUES ({})",
self.table,
values.iter().map(|(column, _)| column).join(", "),
values.iter().map(|_| "?").join(", ")
),
values.into_iter().map(|(_, value)| value).collect(),
)
.exec(&sql, bindings)
.await
.map_err(|e| Error::Database(e.to_string()))?;

Expand All @@ -335,27 +344,31 @@ impl Builder {
/// Returns an error if the query fails, or if a connection to the database cannot be established.
pub async fn update<T: Into<Columns> + Send>(self, values: T) -> Result<u64, Error> {
let mut conn = connection::get().await?;
let sql = self.to_sql(QueryType::Update);
let values: Vec<(String, Value)> = values.into().0;

conn.exec(
&format!(
"UPDATE {} SET {} {sql}",
let (sql, bindings) = (
format!(
"UPDATE {} SET {} {}",
self.table,
values
.iter()
.map(|(column, _)| format!("{} = ?", column))
.join(", "),
self.to_sql(QueryType::Update)
),
values
.iter()
.map(|(_, value)| value.clone())
.chain(self.get_bindings())
.collect(),
)
.await
.map_err(|e| Error::Database(e.to_string()))
.map(|r| r.rows_affected)
);

tracing::debug!(sql = sql.as_str(), bindings = ?bindings, "Executing UPDATE SQL query");

conn.exec(&sql, bindings)
.await
.map_err(|e| Error::Database(e.to_string()))
.map(|r| r.rows_affected)
}

/// Delete records from the database. Returns the number of affected rows.
Expand All @@ -365,8 +378,11 @@ impl Builder {
/// Returns an error if the query fails, or if a connection to the database cannot be established.
pub async fn delete(self) -> Result<u64, Error> {
let mut conn = connection::get().await?;
let (sql, bindings) = (self.to_sql(QueryType::Delete), self.get_bindings());

tracing::debug!(sql = sql.as_str(), bindings = ?bindings, "Executing DELETE SQL query");

conn.exec(&self.to_sql(QueryType::Delete), self.get_bindings())
conn.exec(&sql, bindings)
.await
.map_err(|e| Error::Database(e.to_string()))
.map(|r| r.rows_affected)
Expand All @@ -379,8 +395,11 @@ impl Builder {
/// Returns an error if the query fails, or if a connection to the database cannot be established.
pub async fn truncate(self) -> Result<u64, Error> {
let mut conn = connection::get().await?;
let sql = format!("TRUNCATE TABLE {}", self.table);

conn.exec(&format!("TRUNCATE TABLE {}", self.table), vec![])
tracing::debug!(sql = sql.as_str(), "Executing TRUNCATE SQL query");

conn.exec(&sql, vec![])
.await
.map_err(|e| Error::Database(e.to_string()))
.map(|r| r.rows_affected)
Expand All @@ -392,6 +411,8 @@ impl Builder {
let mut conn = connection::get().await?;
let (sql, bindings) = (self.to_sql(QueryType::Select), self.get_bindings());

tracing::debug!(sql = sql.as_str(), bindings = ?bindings, "Executing SELECT SQL query");

let values = conn
.get_values(&sql, bindings)
.await
Expand Down
11 changes: 11 additions & 0 deletions ensemble/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ pub enum SetupError {
pub async fn setup(database_url: &str) -> Result<(), SetupError> {
let rb = RBatis::new();

#[cfg(feature = "mysql")]
tracing::info!(
database_url = database_url,
"Setting up MySQL database pool..."
);
#[cfg(feature = "postgres")]
tracing::info!(
database_url = database_url,
"Setting up PostgreSQL database pool..."
);

#[cfg(feature = "mysql")]
rb.link(MysqlDriver {}, database_url).await?;
#[cfg(feature = "postgres")]
Expand Down
53 changes: 41 additions & 12 deletions ensemble/src/migrations/migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ impl Migrator {
.unwrap_or_default()
.saturating_add(1);

tracing::debug!(
batch = batch,
state = ?state,
"Loaded migration state from database."
);

Ok(Self {
state,
batch,
Expand All @@ -40,6 +46,8 @@ impl Migrator {
}

pub fn register(&mut self, name: String, migration: Box<dyn Migration>) {
tracing::trace!("Registered migration [{name}]");

self.migrations.insert(name, migration);
}

Expand Down Expand Up @@ -67,6 +75,7 @@ impl Migrator {
pub async fn run(mut self) -> Result<(), Error> {
for (name, migration) in &self.migrations {
if self.state.iter().any(|m| &m.migration == name) {
tracing::trace!("Skipping migration [{name}], since it's already been run.");
continue;
}

Expand Down Expand Up @@ -106,6 +115,8 @@ impl Migrator {
batch: self.batch,
migration: name.to_string(),
});

tracing::info!("Successfully ran migration [{name}].");
}

Ok(())
Expand All @@ -127,7 +138,7 @@ impl Migrator {
let migration = self
.migrations
.get(&record.migration)
.ok_or(Error::NotFound(record.migration))?;
.ok_or_else(|| Error::NotFound(record.migration.clone()))?;

self.connection
.exec("begin", vec![])
Expand All @@ -150,7 +161,7 @@ impl Migrator {
self.connection
.exec(
"delete from migrations where id = ?",
vec![to_value!(record.id)],
vec![to_value!(&record.id)],
)
.await
.map_err(|e| Error::Database(e.to_string()))?;
Expand All @@ -159,22 +170,21 @@ impl Migrator {
.exec("commit", vec![])
.await
.map_err(|e| Error::Database(e.to_string()))?;

tracing::info!("Successfully rolled back migration [{}].", record.migration);
}

Ok(())
}

async fn get_state(conn: &mut Connection) -> Result<Vec<StoredMigration>, Error> {
conn.exec(
"create table if not exists migrations (
id int unsigned not null auto_increment primary key,
migration varchar(255) not null,
batch int not null
)",
vec![],
)
.await
.map_err(|e| Error::Database(e.to_string()))?;
let sql = migrations_table_query();

tracing::debug!(sql = sql, "Running CREATE TABLE IF NOT EXISTS SQL query");

conn.exec(sql, vec![])
.await
.map_err(|e| Error::Database(e.to_string()))?;

Ok(conn
.get_values("select * from migrations", vec![])
Expand All @@ -192,3 +202,22 @@ pub struct StoredMigration {
batch: u64,
migration: String,
}

fn migrations_table_query() -> &'static str {
#[cfg(feature = "mysql")]
return "create table if not exists migrations (
id int unsigned not null auto_increment primary key,
migration varchar(255) not null,
batch int not null
)";

#[cfg(feature = "postgres")]
return "create table if not exists migrations (
id serial primary key,
migration varchar(255) not null,
batch int not null
)";

#[cfg(all(not(feature = "mysql"), not(feature = "postgres")))]
panic!("either the `mysql` or `postgres` feature must be enabled to use migrations.");
}
18 changes: 16 additions & 2 deletions ensemble/src/migrations/schema/column.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use ensemble_derive::Column;
use itertools::Itertools;
use rbs::Value;
use std::{fmt::Display, sync::mpsc};

use super::Schemable;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Type {
Json,
Uuid,
Expand Down Expand Up @@ -92,7 +93,20 @@ pub struct Column {
impl Column {
/// Specify a "default" value for the column
pub fn default<T: serde::Serialize>(mut self, default: T) -> Self {
self.default = Some(rbs::to_value!(default));
let value = if self.r#type == Type::Json {
Value::String(serde_json::to_string(&default).unwrap())
} else {
rbs::to_value!(default)
};

if let Type::Enum(values) = &self.r#type {
assert!(
values.contains(&value.as_str().unwrap_or_default().to_string()),
"default value must be one of the enum values"
);
}

self.default = Some(value);

self
}
Expand Down
34 changes: 19 additions & 15 deletions ensemble/src/migrations/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,21 @@ impl Schema {
let mut conn_lock = MIGRATE_CONN.try_lock().map_err(|_| Error::Lock)?;
let mut conn = conn_lock.take().ok_or(Error::Lock)?;

conn.exec(
&format!(
"CREATE TABLE {} ({}) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci",
table_name,
columns
.iter()
.map(Column::to_sql)
.chain(commands.iter().map(Command::to_sql))
.join(", "),
),
vec![],
)
.await
.map_err(|e| Error::Database(e.to_string()))?;
let sql = format!(
"CREATE TABLE {} ({}) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci",
table_name,
columns
.iter()
.map(Column::to_sql)
.chain(commands.iter().map(Command::to_sql))
.join(", "),
);

tracing::debug!(sql = sql.as_str(), "Running CREATE TABLE SQL query");

conn.exec(&sql, vec![])
.await
.map_err(|e| Error::Database(e.to_string()))?;

conn_lock.replace(conn);
drop(conn_lock);
Expand All @@ -63,8 +64,11 @@ impl Schema {
/// Returns an error if the table cannot be dropped, or if a connection to the database cannot be established.
pub async fn drop(table_name: &str) -> Result<(), Error> {
let mut conn = connection::get().await?;
let (sql, bindings) = (format!("DROP TABLE ?"), vec![to_value!(table_name)]);

tracing::debug!(sql = sql.as_str(), bindings = ?bindings, "Running DROP TABLE SQL query");

conn.exec(&format!("DROP TABLE ?"), vec![to_value!(table_name)])
conn.exec(&sql, bindings)
.await
.map_err(|e| Error::Database(e.to_string()))?;

Expand Down
2 changes: 1 addition & 1 deletion ensemble/src/types/hashed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<T: Sha256Digest> validator::HasLen for &Hashed<T> {
#[cfg(feature = "schema")]
impl<T: Sha256Digest> schemars::JsonSchema for Hashed<T> {
fn schema_name() -> String {
String::from("String")
String::schema_name()
}

fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
Expand Down
Loading

0 comments on commit d05f2c7

Please sign in to comment.