-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Postgres COPY FROM/TO support #3951
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is some cool code. Just some questions (sorry if there are any dumb questions)
} | ||
|
||
#[derive(Default, Debug)] | ||
struct CommonOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for not including HEADER
, FORCE_QUOTE
, ENCODING
, etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ENCODING
is currently always assumed to be the same as for the connection (so UTF-8
). It might be worth to expose that at some point.
HEADER
is currently only exposed for copy_in
, but after reading the documentation again it seems to be useful to expose it also for copy_out
without allowing MATCH
FORCE_QUOTE
support requires figuring out passing down a list of columns to that function. That's not impossible, it's just something that I did not want to do for the first version.
.as_ref() | ||
.map(|l| l.as_ptr()) | ||
.unwrap_or(std::ptr::null()); | ||
let ret = unsafe { pq_sys::PQputCopyEnd(self.internal_connection.as_ptr(), error) }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@weiznich what would happen in the event this line does not get called as a result of the '?' on line 164 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, addressed in dbc92f2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good 👍
|
||
fn get<'b, I>(&'b self, idx: I) -> Option<Self::Field<'b>> | ||
where | ||
'a: 'b, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@weiznich does this mean that 'a must be a subtype of 'b or is it the other way around ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://doc.rust-lang.org/reference/trait-bounds.html#lifetime-bounds
So it's usually 'a
needs to outlive 'b
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yep makes sense, thanks for the link 👍
6b8f727
to
dbc92f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple questions from me to help get myself up to speed 😄
diesel/src/pg/connection/copy.rs
Outdated
|
||
impl<'conn> Write for CopyFromSink<'conn> { | ||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { | ||
self.conn.put_copy_data(buf).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Should we .unwrap()
here if we're already returning a Result<_>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, that shouldn't use unwrap. We don't want to use unwrap anywhere other than in tests.
#[allow(unsafe_code)] // ffi code | ||
fn drop(&mut self) { | ||
if !self.ptr.is_null() { | ||
unsafe { pq_sys::PQfreemem(self.ptr as *mut ffi::c_void) }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: This is my first time seeing CFFI code before. Let me see if I understand this right:
- We have a function in
pq_sys
,PQfreemem
which is a binding to thelibpq
library and drops (something) from memory? - The
PQfreemem
is equivalent to doing adrop()
in Rust, and because we're using the C library we need to implementDrop
to callPQfreemem
so that we candrop(impl CopyToBuffer)
.
I have a gap in my knowledge here though, around the purpose of self.ptr as *mut ffi::c_void
? Does this equate to C like void* self.ptr
, so the library knows it's receiving a pointer but doesn't know what data type it is? The library then knows the data at that pointer is what it has to clear, but doesn't care what that data is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PQfreemem
is the postgresql specific variant of the C dealloc/free function. This drop impl essentially ensures that we always free the buffer this pointer points to. The cast is required as the c function takes a void pointer as argument.
Not calling that function would potentially leak memory, which would be a bug.
if !self.ptr.is_null() { | ||
pq_sys::PQfreemem(self.ptr as *mut ffi::c_void); | ||
self.ptr = std::ptr::null_mut(); | ||
} | ||
let len = | ||
pq_sys::PQgetCopyData(self.conn.internal_connection.as_ptr(), &mut self.ptr, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Okay, I think it's starting to click more for me. We clear the pointer from any previous library call, then do another FFI call supplying our (now empty) pointer.
If I'm following correctly, does this mean the library call will mutate our underlying pointer to make it point to the results of this library call, and this function returns the length of data that exists at the pointer location?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is correct.
Ok(res) | ||
} | ||
|
||
pub(crate) fn copy_to<T>(&mut self, command: CopyToCommand<T>) -> QueryResult<CopyToBuffer<'_>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: This might be a styling thing I need to get familiar with, but why do we do command: CopyToCommand<T>
vs the copy_from
function above which does target: S [...] where S: CopyInExpression<T>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are functionally different: command: CopyToCommand<T>
is a struct with a generic type argument. target: S [...] where S: CopyInExpression <T>
is a complete generic type that implements a certain trait (CopyInExpression
in that case). The struct essentially holds a set of common values that is the same for all possible values of the generic type, while the trait variant allows fundamentally different types there. I've just used what seemed to fit the needs in each location.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep totally right, tired brain was thinking there should've been a type T/S
somewhere in the trait which isn't always true 🤦
dbc92f2
to
a740c1a
Compare
32b47f1
to
fd49b3f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple more comments from me here. I also noticed there's quite a few .expect()
in the copy_to.rs
changes, among other places. I'm not sure if that's a pattern we accept here, but for those that are in a Result<_>
it might be suitable to change to errors instead of panicking.
Also, please let me know if my comments are too nitpick-y. I'm testing the waters to see how deep I go with my reviews 😄
diesel/src/pg/connection/mod.rs
Outdated
let mut copy_in = CopyFromSink::new(&mut conn.raw_connection); | ||
let r = source.target.callback(&mut copy_in); | ||
copy_in.finish(r.as_ref().err().map(|e| e.to_string()))?; | ||
let next_res = conn.raw_connection.get_next_result()?.expect("exists"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Should we give these .expect()
's a better error message? Also, curious why we wouldn't use something like .ok_or_else(|| SomeError())?
here, as this function already returns a Result<_>
?
) -> Result<R, E>, | ||
) -> Result<R, E> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Why the type change from QueryResult
to a Result
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The copy_from
method allows to configure a callback that is then used to pull data into the database. That callback can return custom error, that we want to propagate back to the user. This callback is called somewhere in the callback passed to with_prepared_query
, therefore we need to change the return type there to allow that.
CString::new("Error message contains a \\0 byte") | ||
.expect("Does not contain a null byte") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Should we make this .expect()
more true to the error, in that it failed to initialize our new CString
error? I would've expected to see the other .expect()
in .map(CString::new)
if it were to be somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually write expect()
messages as factual statement. In this case we know at compile time (by looking at the string above) that is does not contain a null byte. This cannot ever be an error.
self.target.walk_target(pass.reborrow())?; | ||
pass.push_sql(" FROM STDIN"); | ||
self.target.options().walk_ast(pass.reborrow())?; | ||
// todo: where? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Should this TODO be addressed before merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's for now not implemented. I do not plan to support that for now, so I've removed the todo.
type Error = E; | ||
|
||
fn callback(&mut self, copy: &mut CopyFromSink<'_>) -> Result<(), Self::Error> { | ||
(self.copy_callback)(copy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Why do we wrap self.copy_callback
in brackets if we already know it's a function? Couldn't we just self.copy_callback(copy)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's required by rust to call a function stored in a field. See this playground for a minimal example: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=f5a4af872196441cb75487ee05af5aca
let values = self | ||
.0 | ||
.take() | ||
.expect("We only call this callback once") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: .expect()
in a Result<_>
. Ditto for above, could we map this to an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's one of the cases where I personally prefer using expect to get a panic if that's ever hit as that would be clearly a bug in diesel. My reasoning there is to panic for cases that are considered to be a problem with our (== diesel's) code to get reports on that. On the other hand anything that could be triggered and handled by a user should rather be a normal error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay that's fair enough, I'll keep that in mind for future reviews! 😁
&'b self, | ||
mut pass: crate::query_builder::AstPass<'_, 'b, Pg>, | ||
comma: &mut &'static str, | ||
) -> crate::QueryResult<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Why is this QueryResult<_>
if we don't ?
anything here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your hard work! 🚢
57ba5c0
to
3f32a02
Compare
This commit adds support for PostgreSQL `COPY` commands. For `COPY FROM` we expose a variant that allows users to configure the stream manually and write directly to the stream. We also support a variant that takes essentially a `Vec<Insertable>` (or equivalent batch insert containers) and uses the binary format to perform a streamed batch insert. For `COPY TO` we expose again a variant that allows the user to configure the stream manually and read directly from the stream. We also support loading the results directly via the binary protocol into a iterator of `Queryable` structs.
* Renaming everything to follow `COPY FROM`/`COPY TO` nameing * Require `Selectable` for loading (to prevent errors) * Add instrumentation support + tests
3f32a02
to
c3b6dd1
Compare
This commit adds support for PostgreSQL
COPY
commands.For
COPY FROM
we expose a variant that allows users to configure the stream manually and write directly to the stream. We also support a variant that takes essentially aVec<Insertable>
(or equivalent batch insert containers) and uses the binary format to perform a streamed batch insert.For
COPY TO
we expose again a variant that allows the user to configure the stream manually and read directly from the stream. We also support loading the results directly via the binary protocol into a iterator ofQueryable
structs.This is not finished yet, I open the PR nevertheless to gather some feedback on the API.
Things that need to be addressed before merging:
Selectable
for the loading part to avoid running into nasty mismatch errors there