Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MySQL errors #180

Merged
merged 13 commits into from
Nov 22, 2024
4 changes: 2 additions & 2 deletions src/sql/db_connection_pool/dbconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ pub enum Error {
#[snafu(display("Unable to downcast connection"))]
UnableToDowncastConnection {},

#[snafu(display("Unable to get schema: {source}"))]
#[snafu(display("{source}"))]
UnableToGetSchema { source: GenericError },

#[snafu(display("The field '{field_name}' has an unsupported data type: {data_type}"))]
#[snafu(display("The field '{field_name}' has an unsupported data type: {data_type}.\nReport a bug to request support for this data type: https://github.com/datafusion-contrib/datafusion-table-providers/issues"))]
#[cfg(feature = "duckdb")]
UnsupportedDataType {
data_type: DataType,
Expand Down
33 changes: 22 additions & 11 deletions src/sql/db_connection_pool/dbconnection/mysqlconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@ use super::{AsyncDbConnection, DbConnection};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("{source}"))]
#[snafu(display("{source}\nFor further information, refer to the MySQL manual: https://dev.mysql.com/doc/mysql-errors/9.1/en/error-reference-introduction.html"))]
QueryError { source: mysql_async::Error },

#[snafu(display("Failed to convert query result to Arrow: {source}"))]
#[snafu(display("Failed to convert query result to Arrow: {source}.\nThis was likely caused by a bug in the DataFusion Table Providers code, which you can report here: https://github.com/datafusion-contrib/datafusion-table-providers/issues"))]
ConversionError { source: arrow_sql_gen::mysql::Error },

#[snafu(display("Unable to get MySQL query result stream"))]
#[snafu(display("Unable to get MySQL query result stream.\nReport a bug to request support: https://github.com/datafusion-contrib/datafusion-table-providers/issues"))]
QueryResultStreamError {},

#[snafu(display("Unsupported column data type: {data_type}"))]
UnsupportedDataTypeError { data_type: String },
#[snafu(display("The field '{column_name}' has an unsupported data type: {data_type}.\nReport a bug to request support for this data type: https://github.com/datafusion-contrib/datafusion-table-providers/issues"))]
UnsupportedDataTypeError {
column_name: String,
data_type: String,
},

#[snafu(display("Unable to extract precision and scale from type: {data_type}"))]
#[snafu(display("Unable to extract precision and scale from type: {data_type}.\nThis was likely caused by a bug in the DataFusion Table Providers code, which you can report here: https://github.com/datafusion-contrib/datafusion-table-providers/issues"))]
UnableToGetDecimalPrecisionAndScale { data_type: String },

#[snafu(display("Field '{field}' is missing"))]
#[snafu(display("Failed to find the field '{field}'.\nThis was likely caused by a bug in the DataFusion Table Providers code, which you can report here: https://github.com/datafusion-contrib/datafusion-table-providers/issues"))]
MissingField { field: String },
}

Expand Down Expand Up @@ -142,6 +145,7 @@ impl<'a> AsyncDbConnection<Conn, &'a (dyn ToValue + Sync)> for MySQLConnection {
) -> Result<SendableRecordBatchStream> {
let params_vec: Vec<_> = params.iter().map(|&p| p.to_value()).collect();
let sql = sql.replace('"', "");

let conn = Arc::clone(&self.conn);

let mut stream = Box::pin(stream! {
Expand Down Expand Up @@ -214,7 +218,7 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {
field: "Type".to_string(),
})?;

let column_type = map_str_type_to_column_type(&data_type)?;
let column_type = map_str_type_to_column_type(&column_name, &data_type)?;
let column_is_binary = map_str_type_to_is_binary(&data_type);
let column_is_enum = map_str_type_to_is_enum(&data_type);
let column_use_large_str_or_blob = map_str_type_to_use_large_str_or_blob(&data_type);
Expand All @@ -236,14 +240,17 @@ fn columns_meta_to_schema(columns_meta: Vec<Row>) -> Result<SchemaRef> {
precision,
scale,
)
.context(UnsupportedDataTypeSnafu { data_type })?;
.context(UnsupportedDataTypeSnafu {
column_name: column_name.clone(),
data_type,
})?;

fields.push(Field::new(&column_name, arrow_data_type, true));
}
Ok(Arc::new(Schema::new(fields)))
}

fn map_str_type_to_column_type(data_type: &str) -> Result<ColumnType> {
fn map_str_type_to_column_type(column_name: &str, data_type: &str) -> Result<ColumnType> {
let data_type = data_type.to_lowercase();
let column_type = match data_type.as_str() {
_ if data_type.starts_with("decimal") || data_type.starts_with("numeric") => {
Expand Down Expand Up @@ -284,7 +291,11 @@ fn map_str_type_to_column_type(data_type: &str) -> Result<ColumnType> {
_ if data_type.starts_with("char") => ColumnType::MYSQL_TYPE_STRING,
_ if data_type.starts_with("binary") => ColumnType::MYSQL_TYPE_STRING,
_ if data_type.starts_with("geometry") => ColumnType::MYSQL_TYPE_GEOMETRY,
_ => UnsupportedDataTypeSnafu { data_type }.fail()?,
_ => UnsupportedDataTypeSnafu {
column_name,
data_type,
}
.fail()?,
};

Ok(column_type)
Expand Down
12 changes: 7 additions & 5 deletions src/sql/db_connection_pool/mysqlpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("MySQL connection error: {source}"))]
#[snafu(display("MySQL connection error: {source}\nFor further information, refer to the MySQL manual: https://dev.mysql.com/doc/mysql-errors/9.1/en/error-reference-introduction.html"))]
MySQLConnectionError { source: mysql_async::Error },

#[snafu(display("Invalid MySQL connection string: {source} "))]
#[snafu(display(
"Invalid MySQL connection string: {source}\nEnsure the MySQL connection string is valid"
))]
InvalidConnectionString { source: mysql_async::UrlError },

#[snafu(display("Invalid parameter: {parameter_name}"))]
#[snafu(display("Invalid value for parameter {parameter_name}\nEnsure the parameter value is valid for {parameter_name}"))]
InvalidParameterError { parameter_name: String },

#[snafu(display("Invalid root cert path: {path}"))]
#[snafu(display("Invalid root cert path: {path}\nEnsure the root cert path is valid"))]
InvalidRootCertPathError { path: String },

#[snafu(display("Cannot connect to MySQL on {host}:{port}. Ensure that the host and port are correctly configured, and that the host is reachable."))]
Expand All @@ -46,7 +48,7 @@ pub enum Error {
))]
InvalidUsernameOrPassword,

#[snafu(display("{message}"))]
#[snafu(display("{message}\nEnsure that the MySQL database name provided exists"))]
UnknownMySQLDatabase { message: String },
}

Expand Down