diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 38967f8552..d0d2f71a2c 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -1,5 +1,5 @@ use crate::error::PythonError; -use crate::utils::{delete_dir, rt, walk_tree, warn}; +use crate::utils::{delete_dir, io_rt, walk_tree, warn}; use crate::RawDeltaTable; use deltalake::storage::object_store::{MultipartUpload, PutPayloadMut}; use deltalake::storage::{DynObjectStore, ListResult, ObjectStoreError, Path}; @@ -96,7 +96,8 @@ impl DeltaFileSystemHandler { fn copy_file(&self, src: String, dest: String) -> PyResult<()> { let from_path = Self::parse_path(&src); let to_path = Self::parse_path(&dest); - rt().block_on(self.inner.copy(&from_path, &to_path)) + io_rt() + .block_on(self.inner.copy(&from_path, &to_path)) .map_err(PythonError::from)?; Ok(()) } @@ -108,14 +109,16 @@ impl DeltaFileSystemHandler { fn delete_dir(&self, path: String) -> PyResult<()> { let path = Self::parse_path(&path); - rt().block_on(delete_dir(self.inner.as_ref(), &path)) + io_rt() + .block_on(delete_dir(self.inner.as_ref(), &path)) .map_err(PythonError::from)?; Ok(()) } fn delete_file(&self, path: String) -> PyResult<()> { let path = Self::parse_path(&path); - rt().block_on(self.inner.delete(&path)) + io_rt() + .block_on(self.inner.delete(&path)) .map_err(PythonError::from)?; Ok(()) } @@ -140,13 +143,14 @@ impl DeltaFileSystemHandler { for file_path in paths { let path = Self::parse_path(&file_path); let listed = py.allow_threads(|| { - rt().block_on(self.inner.list_with_delimiter(Some(&path))) + io_rt() + .block_on(self.inner.list_with_delimiter(Some(&path))) .map_err(PythonError::from) })?; // TODO is there a better way to figure out if we are in a directory? if listed.objects.is_empty() && listed.common_prefixes.is_empty() { - let maybe_meta = py.allow_threads(|| rt().block_on(self.inner.head(&path))); + let maybe_meta = py.allow_threads(|| io_rt().block_on(self.inner.head(&path))); match maybe_meta { Ok(meta) => { let kwargs = HashMap::from([ @@ -203,7 +207,7 @@ impl DeltaFileSystemHandler { }; let path = Self::parse_path(&base_dir); - let list_result = match rt().block_on(walk_tree(self.inner.clone(), &path, recursive)) { + let list_result = match io_rt().block_on(walk_tree(self.inner.clone(), &path, recursive)) { Ok(res) => Ok(res), Err(ObjectStoreError::NotFound { path, source }) => { if allow_not_found { @@ -263,7 +267,8 @@ impl DeltaFileSystemHandler { let from_path = Self::parse_path(&src); let to_path = Self::parse_path(&dest); // TODO check the if not exists semantics - rt().block_on(self.inner.rename(&from_path, &to_path)) + io_rt() + .block_on(self.inner.rename(&from_path, &to_path)) .map_err(PythonError::from)?; Ok(()) } @@ -275,7 +280,7 @@ impl DeltaFileSystemHandler { }; let path = Self::parse_path(&path); - let file = rt() + let file = io_rt() .block_on(ObjectInputFile::try_new( self.inner.clone(), path, @@ -312,7 +317,7 @@ impl DeltaFileSystemHandler { Some(2), )?; } - let file = rt() + let file = io_rt() .block_on(ObjectOutputStream::try_new( self.inner.clone(), path, @@ -475,7 +480,8 @@ impl ObjectInputFile { self.pos += nbytes; let data = if nbytes > 0 { py.allow_threads(|| { - rt().block_on(self.store.get_range(&self.path, range)) + io_rt() + .block_on(self.store.get_range(&self.path, range)) .map_err(PythonError::from) })? } else { @@ -549,14 +555,15 @@ impl ObjectOutputStream { } fn abort(&mut self) -> PyResult<()> { - rt().block_on(self.upload.lock().abort()) + io_rt() + .block_on(self.upload.lock().abort()) .map_err(PythonError::from)?; Ok(()) } fn upload_buffer(&mut self) -> PyResult<()> { let payload = std::mem::take(&mut self.buffer).freeze(); - let res = rt().block_on(self.upload.lock().put_part(payload)); + let res = io_rt().block_on(self.upload.lock().put_part(payload)); match res { Ok(_) => Ok(()), Err(err) => { @@ -575,7 +582,7 @@ impl ObjectOutputStream { if !self.buffer.is_empty() { self.upload_buffer()?; } - match rt().block_on(self.upload.lock().complete()) { + match io_rt().block_on(self.upload.lock().complete()) { Ok(_) => Ok(()), Err(err) => Err(PyIOError::new_err(err.to_string())), } diff --git a/python/src/utils.rs b/python/src/utils.rs index b02437cee6..6f0148049d 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -24,6 +24,23 @@ pub fn rt() -> &'static Runtime { TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime.")) } +#[inline] +pub fn io_rt() -> &'static Runtime { + static IO_TOKIO_RT: OnceLock = OnceLock::new(); + static IO_PID: OnceLock = OnceLock::new(); + let pid = std::process::id(); + let runtime_pid = *IO_PID.get_or_init(|| pid); + if pid != runtime_pid { + panic!( + "Forked process detected - current PID is {pid} but the tokio runtime was created by {runtime_pid}. The tokio \ + runtime does not support forked processes https://github.com/tokio-rs/tokio/issues/4301. If you are \ + seeing this message while using Python multithreading make sure to use the `spawn` or `forkserver` \ + mode.", + ); + } + IO_TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create an IO tokio runtime.")) +} + /// walk the "directory" tree along common prefixes in object store pub async fn walk_tree( storage: Arc,