Skip to content

Commit

Permalink
Include full column name in write_column error message
Browse files Browse the repository at this point in the history
  • Loading branch information
exyi committed Jun 2, 2024
1 parent 2490e26 commit 0be1adb
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions cli/src/appenders/generic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{marker::PhantomData, sync::Arc, borrow::Cow};

use parquet::{data_type::DataType, file::writer::SerializedColumnWriter, errors::ParquetError};
use parquet::{column::writer::ColumnWriter, data_type::DataType, errors::ParquetError, file::writer::SerializedColumnWriter, schema::types::ColumnDescriptor};

use crate::{level_index::{LevelIndexState, LevelIndexList}, myfrom::MyFrom};

Expand Down Expand Up @@ -75,17 +75,24 @@ impl<TPg, TPq, FConversion> ColumnAppenderBase for GenericColumnAppender<TPg, TP

fn write_columns<'b>(&mut self, column_i: usize, next_col: &mut dyn DynamicSerializedWriter) -> Result<(), String> {
let mut error = None;
let mut col_descriptor: Option<(Arc<ColumnDescriptor>, u64, u64)> = None;
let c = next_col.next_column(&mut |mut column| {
let result = self.write_column(&mut column);
col_descriptor = Some(get_column_descriptor(&mut column));
let error1 = result.err();
let result2 = column.close();

error = error1.or(result2.err());

}).map_err(|e| format!("Could not create column[{}]: {}", column_i, e))?;

debug_assert!(col_descriptor.is_some());
debug_assert_eq!(col_descriptor.as_ref().unwrap().0.max_def_level(), self.max_dl);
debug_assert_eq!(col_descriptor.as_ref().unwrap().0.max_rep_level(), self.max_rl);

if error.is_some() {
return Err(format!("Couldn't write data of column[{}]: {}", column_i, error.unwrap()));
let col_name = col_descriptor.map(|(desc, _, _)| desc.path().string()).unwrap_or_else(|| format!("column[{}]", column_i));
return Err(format!("Couldn't write data of {}: {}", col_name, error.unwrap()));
}

if !c {
Expand Down Expand Up @@ -116,6 +123,19 @@ impl<TPg, TPq, FConversion> ColumnAppenderBase for GenericColumnAppender<TPg, TP
fn max_rl(&self) -> i16 { self.max_rl }
}

fn get_column_descriptor(column: &mut SerializedColumnWriter) -> (Arc<ColumnDescriptor>, u64, u64) {
match column.untyped() {
ColumnWriter::BoolColumnWriter(x) => (x.get_descriptor().clone(), x.get_total_rows_written(), x.get_total_bytes_written()),
ColumnWriter::Int32ColumnWriter(x) => (x.get_descriptor().clone(), x.get_total_rows_written(), x.get_total_bytes_written()),
ColumnWriter::Int64ColumnWriter(x) => (x.get_descriptor().clone(), x.get_total_rows_written(), x.get_total_bytes_written()),
ColumnWriter::Int96ColumnWriter(x) => (x.get_descriptor().clone(), x.get_total_rows_written(), x.get_total_bytes_written()),
ColumnWriter::FloatColumnWriter(x) => (x.get_descriptor().clone(), x.get_total_rows_written(), x.get_total_bytes_written()),
ColumnWriter::DoubleColumnWriter(x) => (x.get_descriptor().clone(), x.get_total_rows_written(), x.get_total_bytes_written()),
ColumnWriter::ByteArrayColumnWriter(x) => (x.get_descriptor().clone(), x.get_total_rows_written(), x.get_total_bytes_written()),
ColumnWriter::FixedLenByteArrayColumnWriter(x) => (x.get_descriptor().clone(), x.get_total_rows_written(), x.get_total_bytes_written()),
}
}

impl<TPg: Clone, TPq, FConversion> ColumnAppender<TPg> for GenericColumnAppender<TPg, TPq, FConversion>
where TPq::T: Clone + RealMemorySize, TPq: DataType, FConversion: Fn(TPg) -> TPq::T {
fn copy_value(&mut self, repetition_index: &LevelIndexList, value: Cow<TPg>) -> Result<usize, String> {
Expand Down

0 comments on commit 0be1adb

Please sign in to comment.