Skip to content

Commit

Permalink
Merge pull request #3951 from weiznich/feature/copy_in_out
Browse files Browse the repository at this point in the history
Add Postgres COPY FROM/TO support
  • Loading branch information
weiznich authored Apr 10, 2024
2 parents 1affe6c + c3b6dd1 commit 8afe715
Show file tree
Hide file tree
Showing 51 changed files with 2,521 additions and 152 deletions.
18 changes: 9 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ jobs:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@master
with:
toolchain: nightly-2023-09-21
toolchain: nightly-2024-01-15
- name: Cache cargo registry
uses: Swatinem/rust-cache@v2
with:
Expand All @@ -336,7 +336,7 @@ jobs:
- name: Run compile tests
shell: bash
run: cargo +nightly-2023-09-21 test --manifest-path diesel_compile_tests/Cargo.toml
run: cargo +nightly-2024-01-15 test --manifest-path diesel_compile_tests/Cargo.toml

rustfmt_and_clippy:
name: Check rustfmt style && run clippy
Expand Down Expand Up @@ -485,11 +485,11 @@ jobs:
run: cargo +nightly-2024-04-05 -Z build-std test --manifest-path diesel/Cargo.toml --no-default-features --features "postgres pq-sys pq-sys/bundled pq-src/with-asan" --target x86_64-unknown-linux-gnu

minimal_rust_version:
name: Check Minimal supported rust version (1.70.0)
name: Check Minimal supported rust version (1.75.0)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@1.70.0
- uses: dtolnay/rust-toolchain@1.75.0
- uses: dtolnay/rust-toolchain@nightly
- uses: taiki-e/install-action@cargo-hack
- uses: taiki-e/install-action@cargo-minimal-versions
Expand All @@ -502,15 +502,15 @@ jobs:
sudo apt-get update
sudo apt-get -y install libsqlite3-dev libpq-dev libmysqlclient-dev
- name: Check diesel_derives
run: cargo +1.70.0 minimal-versions check -p diesel_derives --features "postgres sqlite mysql 32-column-tables 64-column-tables 128-column-tables without-deprecated with-deprecated r2d2"
run: cargo +1.75.0 minimal-versions check -p diesel_derives --features "postgres sqlite mysql 32-column-tables 64-column-tables 128-column-tables without-deprecated with-deprecated r2d2"
- name: Check diesel
run: cargo +1.70.0 minimal-versions check -p diesel --features "postgres mysql sqlite extras"
run: cargo +1.75.0 minimal-versions check -p diesel --features "postgres mysql sqlite extras"
- name: Check diesel_dynamic_schema
run: cargo +1.70.0 minimal-versions check -p diesel-dynamic-schema --all-features
run: cargo +1.75.0 minimal-versions check -p diesel-dynamic-schema --all-features
- name: Check diesel_migrations
run: cargo +1.70.0 minimal-versions check -p diesel_migrations --all-features
run: cargo +1.75.0 minimal-versions check -p diesel_migrations --all-features
- name: Check diesel_cli
run: cargo +1.70.0 minimal-versions check -p diesel_cli --features "default sqlite-bundled"
run: cargo +1.75.0 minimal-versions check -p diesel_cli --features "default sqlite-bundled"

typos:
name: Spell Check with Typos
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ Increasing the minimal supported Rust version will always be coupled at least wi
* Support for libsqlite3-sys 0.28
* Add `sqlite-integer-primary-key-is-bigint` configuration option, usable with SQLite 3.37 or above, allowing to use `BigInt` for `INTEGER PRIMARY KEY` columns in SQLite for tables without the `WITHOUT ROWID` attribute ([SQLite doc](https://www.sqlite.org/lang_createtable.html#rowid)).
* Support for multiple `print_schema` entry in `diesel.toml` (e.g. `[print_schema.user1]`), which allows generating multiple schema.rs files
* Add support for `COPY TO` and `COPY FROM` statements

### Changed

* The minimal officially supported rustc version is now 1.70.0
* The minimal officially supported rustc version is now 1.75.0
* Deprecated `sql_function!` in favour of `define_sql_function!` which provides compatibility with `#[dsl::auto_type]`

## [2.1.0] 2023-05-26
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ members = [
]

[workspace.package]
rust-version = "1.70.0"
rust-version = "1.75.0"

# Config for 'cargo dist'
[workspace.metadata.dist]
Expand Down
2 changes: 1 addition & 1 deletion diesel/src/expression/bound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::sql_types::DieselNumericOps;
#[doc(hidden)] // This is used by the `AsExpression` derive
#[derive(Debug, Clone, Copy, DieselNumericOps)]
pub struct Bound<T, U> {
item: U,
pub(crate) item: U,
_marker: PhantomData<T>,
}

Expand Down
2 changes: 1 addition & 1 deletion diesel/src/insertable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub trait InsertValues<T: Table, DB: Backend>: QueryFragment<DB> {
#[derive(Debug, Copy, Clone, QueryId)]
#[doc(hidden)]
pub struct ColumnInsertValue<Col, Expr> {
expr: Expr,
pub(crate) expr: Expr,
p: PhantomData<Col>,
}

Expand Down
10 changes: 10 additions & 0 deletions diesel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ pub mod dsl {
delete, insert_into, insert_or_ignore_into, replace_into, select, sql_query, update,
};

#[doc(inline)]
#[cfg(feature = "postgres_backend")]
pub use crate::query_builder::functions::{copy_from, copy_to};

#[doc(inline)]
pub use diesel_derives::auto_type;

Expand Down Expand Up @@ -718,6 +722,9 @@ pub mod prelude {
#[cfg(feature = "mysql")]
#[doc(inline)]
pub use crate::mysql::MysqlConnection;
#[doc(inline)]
#[cfg(feature = "postgres_backend")]
pub use crate::pg::query_builder::copy::ExecuteCopyFromDsl;
#[cfg(feature = "postgres")]
#[doc(inline)]
pub use crate::pg::PgConnection;
Expand All @@ -732,6 +739,9 @@ pub use crate::prelude::*;
#[doc(inline)]
pub use crate::query_builder::debug_query;
#[doc(inline)]
#[cfg(feature = "postgres")]
pub use crate::query_builder::functions::{copy_from, copy_to};
#[doc(inline)]
pub use crate::query_builder::functions::{
delete, insert_into, insert_or_ignore_into, replace_into, select, sql_query, update,
};
Expand Down
121 changes: 121 additions & 0 deletions diesel/src/pg/connection/copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use core::ffi;
use std::io::BufRead;
use std::io::Read;
use std::io::Write;

use super::raw::RawConnection;
use super::result::PgResult;
use crate::QueryResult;

#[allow(missing_debug_implementations)] // `PgConnection` is not debug
pub(in crate::pg) struct CopyFromSink<'conn> {
conn: &'conn mut RawConnection,
}

impl<'conn> CopyFromSink<'conn> {
pub(super) fn new(conn: &'conn mut RawConnection) -> Self {
Self { conn }
}

pub(super) fn finish(self, err: Option<String>) -> QueryResult<()> {
self.conn.finish_copy_from(err)
}
}

impl<'conn> Write for CopyFromSink<'conn> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.conn
.put_copy_data(buf)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

#[allow(missing_debug_implementations)] // `PgConnection` is not debug
pub struct CopyToBuffer<'conn> {
conn: &'conn mut RawConnection,
ptr: *mut ffi::c_char,
offset: usize,
len: usize,
result: PgResult,
}

impl<'conn> CopyToBuffer<'conn> {
pub(super) fn new(conn: &'conn mut RawConnection, result: PgResult) -> Self {
Self {
conn,
ptr: std::ptr::null_mut(),
offset: 0,
len: 0,
result,
}
}

#[allow(unsafe_code)] // construct a slice from a raw ptr
pub(crate) fn data_slice(&self) -> &[u8] {
if !self.ptr.is_null() && self.offset < self.len {
let slice = unsafe { std::slice::from_raw_parts(self.ptr as *const u8, self.len - 1) };
&slice[self.offset..]
} else {
&[]
}
}

pub(crate) fn get_result(&self) -> &PgResult {
&self.result
}
}

impl<'conn> Drop for CopyToBuffer<'conn> {
#[allow(unsafe_code)] // ffi code
fn drop(&mut self) {
if !self.ptr.is_null() {
unsafe { pq_sys::PQfreemem(self.ptr as *mut ffi::c_void) };
self.ptr = std::ptr::null_mut();
}
}
}

impl<'conn> Read for CopyToBuffer<'conn> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let data = self.fill_buf()?;
let len = usize::min(buf.len(), data.len());
buf[..len].copy_from_slice(&data[..len]);
self.consume(len);
Ok(len)
}
}

impl<'conn> BufRead for CopyToBuffer<'conn> {
#[allow(unsafe_code)] // ffi code + ptr arithmetic
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
if self.data_slice().is_empty() {
unsafe {
if !self.ptr.is_null() {
pq_sys::PQfreemem(self.ptr as *mut ffi::c_void);
self.ptr = std::ptr::null_mut();
}
let len =
pq_sys::PQgetCopyData(self.conn.internal_connection.as_ptr(), &mut self.ptr, 0);
match len {
len if len >= 0 => self.len = len as usize + 1,
-1 => self.len = 0,
_ => {
let error = self.conn.last_error_message();
return Err(std::io::Error::new(std::io::ErrorKind::Other, error));
}
}
self.offset = 0;
}
}
Ok(self.data_slice())
}

fn consume(&mut self, amt: usize) {
self.offset = usize::min(self.len, self.offset + amt);
}
}
96 changes: 92 additions & 4 deletions diesel/src/pg/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
pub(super) mod copy;
pub(crate) mod cursor;
mod raw;
mod result;
mod row;
mod stmt;

use self::copy::CopyFromSink;
use self::copy::CopyToBuffer;
use self::cursor::*;
use self::private::ConnectionAndTransactionManager;
use self::raw::{PgTransactionStatus, RawConnection};
use self::result::PgResult;
use self::stmt::Statement;
use crate::connection::instrumentation::DebugQuery;
use crate::connection::instrumentation::Instrumentation;
Expand All @@ -16,6 +18,7 @@ use crate::connection::statement_cache::{MaybeCached, StatementCache};
use crate::connection::*;
use crate::expression::QueryMetadata;
use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
use crate::pg::query_builder::copy::InternalCopyFromQuery;
use crate::pg::{Pg, TransactionBuilder};
use crate::query_builder::bind_collector::RawBytesBindCollector;
use crate::query_builder::*;
Expand All @@ -26,6 +29,12 @@ use std::ffi::CString;
use std::fmt::Debug;
use std::os::raw as libc;

use super::query_builder::copy::CopyFromExpression;
use super::query_builder::copy::CopyTarget;
use super::query_builder::copy::CopyToCommand;

pub(super) use self::result::PgResult;

/// The connection string expected by `PgConnection::establish`
/// should be a PostgreSQL connection string, as documented at
/// <https://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-CONNSTRING>
Expand Down Expand Up @@ -393,7 +402,82 @@ impl PgConnection {
TransactionBuilder::new(self)
}

fn with_prepared_query<'conn, T: QueryFragment<Pg> + QueryId, R>(
pub(crate) fn copy_from<S, T>(&mut self, target: S) -> Result<usize, S::Error>
where
S: CopyFromExpression<T>,
{
let query = InternalCopyFromQuery::new(target);
let res = self.with_prepared_query(query, false, |stmt, binds, conn, mut source| {
fn inner_copy_in<S, T>(
stmt: MaybeCached<'_, Statement>,
conn: &mut ConnectionAndTransactionManager,
binds: Vec<Option<Vec<u8>>>,
source: &mut InternalCopyFromQuery<S, T>,
) -> Result<usize, S::Error>
where
S: CopyFromExpression<T>,
{
let _res = stmt.execute(&mut conn.raw_connection, &binds, false)?;
let mut copy_in = CopyFromSink::new(&mut conn.raw_connection);
let r = source.target.callback(&mut copy_in);
copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?;
let next_res = conn.raw_connection.get_next_result()?.ok_or_else(|| {
crate::result::Error::DeserializationError(
"Failed to receive result from the database".into(),
)
})?;
let rows = next_res.rows_affected();
while let Some(_r) = conn.raw_connection.get_next_result()? {}
r?;
Ok(rows)
}

let rows = inner_copy_in(stmt, conn, binds, &mut source);
if let Err(ref e) = rows {
let database_error = crate::result::Error::DatabaseError(
crate::result::DatabaseErrorKind::Unknown,
Box::new(e.to_string()),
);
conn.instrumentation
.on_connection_event(InstrumentationEvent::FinishQuery {
query: &crate::debug_query(&source),
error: Some(&database_error),
});
} else {
conn.instrumentation
.on_connection_event(InstrumentationEvent::FinishQuery {
query: &crate::debug_query(&source),
error: None,
});
}

rows
})?;

Ok(res)
}

pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>>
where
T: CopyTarget,
{
let res = self.with_prepared_query::<_, _, Error>(
command,
false,
|stmt, binds, conn, source| {
let res = stmt.execute(&mut conn.raw_connection, &binds, false);
conn.instrumentation
.on_connection_event(InstrumentationEvent::FinishQuery {
query: &crate::debug_query(&source),
error: res.as_ref().err(),
});
Ok(CopyToBuffer::new(&mut conn.raw_connection, res?))
},
)?;
Ok(res)
}

fn with_prepared_query<'conn, T, R, E>(
&'conn mut self,
source: T,
execute_returning_count: bool,
Expand All @@ -402,8 +486,12 @@ impl PgConnection {
Vec<Option<Vec<u8>>>,
&'conn mut ConnectionAndTransactionManager,
T,
) -> QueryResult<R>,
) -> QueryResult<R> {
) -> Result<R, E>,
) -> Result<R, E>
where
T: QueryFragment<Pg> + QueryId,
E: From<crate::result::Error>,
{
self.connection_and_transaction_manager
.instrumentation
.on_connection_event(InstrumentationEvent::StartQuery {
Expand Down
Loading

0 comments on commit 8afe715

Please sign in to comment.