Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use StringViewArray when reading String columns from Parquet #10921

Closed
Tracked by #10918
alamb opened this issue Jun 14, 2024 · 17 comments · Fixed by #11667
Closed
Tracked by #10918

use StringViewArray when reading String columns from Parquet #10921

alamb opened this issue Jun 14, 2024 · 17 comments · Fixed by #11667
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jun 14, 2024

Is your feature request related to a problem or challenge?

Part of #10918

In order to take advantage of the parquet writer generating StringViewArrays ( apache/arrow-rs#5530 from @ariesdevil (❤️ ) ) we need to make sure datafusion doesn't immediately cast the array back to StringView which would undo the benefits

                ▲                       
┌ ─ ─ ─ ─ ─ ─ ┐ │   After filtering,    
  StringArray   │   any unfiltered rows 
└ ─ ─ ─ ─ ─ ─ ┘ │   are gathered via    
      ...       │   the `take` kernel   
                │                       
 ┌────────────────────────────┐         
 │                            │         
 │         FilterExec         │         
 │                            │         
 └────────────────────────────┘         
                ▲                       
┌ ─ ─ ─ ─ ─ ─ ┐ │                       
  StringArray   │                       
└ ─ ─ ─ ─ ─ ─ ┘ │   Reading String data 
                │   from a Parquet file 
      ...       │   results in          
                │   StringArrays passed 
┌ ─ ─ ─ ─ ─ ─ ┐ │                       
  StringArray   │                       
└ ─ ─ ─ ─ ─ ─ ┘ │                       
                │                       
 ┌────────────────────────────┐         
 │                            │         
 │        ParquetExec         │         
 │                            │         
 └────────────────────────────┘         
                                        
                                        
                                        
      Current situation                 

Describe the solution you'd like

To support a phased rollout of this feature, I recommend we focus at first on only the first filtering operation

Specifically get to the point where the parquet reader will read data out as StringView like this:

                ▲              
┌ ─ ─ ─ ─ ─ ─ ┐ │              
  StringArray   │              
└ ─ ─ ─ ─ ─ ─ ┘ │              
      ...       │              
                │              
 ┌────────────────────────────┐
 │                            │
 │         FilterExec         │
 │                            │
 └────────────────────────────┘
┌ ─ ─ ─ ─ ─ ─ ┐ ▲              
 StringViewArr  │              
│     ay      │ │              
 ─ ─ ─ ─ ─ ─ ─  │              
      ...       │              
                │              
┌ ─ ─ ─ ─ ─ ─ ┐ │              
 StringViewArr  │              
│     ay      │ │              
 ─ ─ ─ ─ ─ ─ ─  │              
                │              
 ┌────────────────────────────┐
 │                            │
 │        ParquetExec         │
 │                            │
 └────────────────────────────┘
                               
                               
                               
      Intermediate             
      Situation 1: pass        
      StringViewArray          
      between ParquetExec      

Describe alternatives you've considered

I suggest we:

  1. Make a configuration setting like "force StringViewArray" when reading parquet so we can test this. When this setting is enabled, DataFusion should configure the ParquetExec to produce StringViewArray regardless of the type stored in the parquet file
  2. Then work on incrementally rolling out support / testing for various filter expressions (especially string functions like substring and Implement equality = and inequality <> support for StringView #10919)

Additional context

No response

@alamb alamb added the enhancement New feature or request label Jun 14, 2024
@XiangpengHao
Copy link
Contributor

I'll take this one, can you assign me @alamb ?

@alamb
Copy link
Contributor Author

alamb commented Jun 15, 2024

I'll take this one, can you assign me @alamb ?

BTW you can assign yourself (single word comment take): https://datafusion.apache.org/contributor-guide/index.html#finding-and-creating-issues-to-work-on

@XiangpengHao
Copy link
Contributor

It seems that the current filter and parquet exec play nicely, and the following code will directly filter on the string view array (instead of converting to StringArrary), which is quite nice.

(the test case requires the latest arrow-rs to run)

#[tokio::test]
async fn parquet_read_filter_string_view() {
    let tmp_dir = TempDir::new().unwrap();

    let values = vec![Some("small"), None, Some("Larger than 12 bytes array")];
    let c1: ArrayRef = Arc::new(StringViewArray::from_iter(values.iter()));
    let c2: ArrayRef = Arc::new(StringArray::from_iter(values.iter()));

    let batch =
        RecordBatch::try_from_iter(vec![("c1", c1.clone()), ("c2", c2.clone())]).unwrap();

    let file_name = {
        let table_dir = tmp_dir.path().join("parquet_test");
        std::fs::create_dir(&table_dir).unwrap();
        let file_name = table_dir.join("part-0.parquet");
        let mut writer = ArrowWriter::try_new(
            fs::File::create(&file_name).unwrap(),
            batch.schema(),
            None,
        )
        .unwrap();
        writer.write(&batch).unwrap();
        writer.close().unwrap();
        file_name
    };

    let ctx = SessionContext::new();
    ctx.register_parquet("t", file_name.to_str().unwrap(), Default::default())
        .await
        .unwrap();

    async fn display_result(sql: &str, ctx: &SessionContext) {
        let result = ctx.sql(sql).await.unwrap().collect().await.unwrap();

        arrow::util::pretty::print_batches(&result).unwrap();

        for b in result {
            println!("schema: {:?}", b.schema());
        }
    }

    display_result("SELECT * from t", &ctx).await;
    display_result("SELECT * from t where c1 <> 'small'", &ctx).await;
    display_result("SELECT * from t where c2 <> 'small'", &ctx).await;
}

I'll take a closer look at the generated logical/physical plans to verify that the string view array is never being converted to string array. And if that is the case, the remaining work of this issue is probably (1) add an option to force reading StringArray as StringView array, and (2) add more tests and potentially test the generated plan uses StringViewArray consistently.

@XiangpengHao
Copy link
Contributor

I have manually checked that there isn't any unexpected conversion.

The logical plan is

Projection: t.c1, t.c2 [c1:Utf8View;N, c2:Utf8;N]
  Filter: t.c2 != Utf8("small") [c1:Utf8View;N, c2:Utf8;N]
    TableScan: t [c1:Utf8View;N, c2:Utf8;N]

The physical plan is

CoalesceBatchesExec: target_batch_size=8192
  FilterExec: c2@1 != small
    ParquetExec: file_groups={1 group: [[tmp/.tmp4ADQEP/parquet_test/part-0.parquet]]}, projection=[c1, c2], predicate=c2@1 != small, pruning_predicate=CASE WHEN c2_null_count@2 = c2_row_count@3 THEN false ELSE c2_min@0 != small OR small != c2_max@1 END, required_guarantees=[c2 not in (small)]

I looked at the filter exec, it will eventually call into: https://github.com/XiangpengHao/datafusion/blob/string-view/datafusion/physical-expr/src/expressions/binary.rs#L260

Which calls compare_op from arrow.

I'll add more tests to demonstrate that the filter works out of the box.

@alamb
Copy link
Contributor Author

alamb commented Jun 19, 2024

@jayzhan211 and I are talking about something similar in #9403 (comment)

What do you think about potentially adding a new OptimizerRule, something like

struct UseStringView {}

/// Optimizer rule that rewrites portions of the Plan to use `StringViewArray` instead of 
/// `StringArray` where the operators support the new type
///
/// (some background on StringArray and why it is better for some operators)
///
/// This rule currently supports:
/// 1. Reading strings from ParquetExec (which can save a copy of the string)
/// 2. GroupBy operation
/// ...
impl OptimzierRule for UseViews { 
...
}

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jun 19, 2024

@jayzhan211 and I are talking about something similar in #9403 (comment)

What do you think about potentially adding a new OptimizerRule, something like

struct UseStringView {}

/// Optimizer rule that rewrites portions of the Plan to use `StringViewArray` instead of 
/// `StringArray` where the operators support the new type
///
/// (some background on StringArray and why it is better for some operators)
///
/// This rule currently supports:
/// 1. Reading strings from ParquetExec (which can save a copy of the string)
/// 2. GroupBy operation
/// ...
impl OptimzierRule for UseViews { 
...
}

We usually ensure the schema remain unchanged after applying the optimize rule, should this be the special case?

// verify the rule didn't change the schema
.and_then(|tnr| {
assert_schema_is_the_same(rule.name(), &starting_schema, &tnr.data)?;
Ok(tnr)
});

@alamb
Copy link
Contributor Author

alamb commented Jun 19, 2024

We usually ensure the schema remain unchanged after applying the optimize rule, should this be the special case?

I think we need to keep the schema the same (that is a pretty far reaching invariant)

But maybe we could do something like add a projection to convert it bac

So like if the input plan was

FilterExec [Utf8]
  ParquetExec[Utf8]

We could implement an optimizer rule that made a plan like

ProjectionExec(cast(col) as Utf8) [utf8]
  Filter[Utf8View]
    ParquetExec[Utf8View]

🤔

@XiangpengHao
Copy link
Contributor

I like the idea of using an optimizer rule to optimistically/optionally use StringView!

@alamb
Copy link
Contributor Author

alamb commented Jun 19, 2024

BTW the more I think about this the more I think it should probably be a PhysicalOptimizer rule (not a logical optimizer rule) as the change is related to the particular properties of the ExecutionPlans (rather than the logical type)

https://docs.rs/datafusion/latest/datafusion/physical_optimizer/optimizer/trait.PhysicalOptimizerRule.html

@XiangpengHao
Copy link
Contributor

BTW the more I think about this the more I think it should probably be a PhysicalOptimizer rule (not a logical optimizer rule) as the change is related to the particular properties of the ExecutionPlans (rather than the logical type)

https://docs.rs/datafusion/latest/datafusion/physical_optimizer/optimizer/trait.PhysicalOptimizerRule.html

I'll try to prototype a physical optimizer

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jun 19, 2024

Without changing schema, we need to convert StringArray to StringViewArray -> compute -> convert back to StringArray. It would be nice if the conversion is negligible

In my case #9403 , we need to

  1. Convert to StringViewArray and introduce CastExpr to keep the schema unchanged in physical optimziation
  2. Run GroupStream with StringViewArray
  3. Apply CastExpr to get the StringArray back.

And, the cost of these 3 should always beat with processing with StringArray directly regardless of what kind of second computation is.

@alamb
Copy link
Contributor Author

alamb commented Jun 20, 2024

  • Convert to StringViewArray and introduce CastExpr to keep the schema unchanged in physical optimziation

I was actually thinking the schema will be different between some ExecutionPlans (e.g. to pass a StringView between ParquetExec and a Filter)

@alamb
Copy link
Contributor Author

alamb commented Jul 1, 2024

An update here is I think @XiangpengHao has this working on a branch, but found that performance is actually slower for StringViewArray rather than StringView. Thus he is working on apache/arrow-rs#5904 now

@XiangpengHao
Copy link
Contributor

XiangpengHao commented Jul 9, 2024

Want to share some thoughts here on when to use StringViewArray and when not.

We only consider the cost of loading data from parquet to narrow the scope.

To load a StringArray, we need to copy the data to a new buffer and build offset array. The extra memory we need to setup is array len * (string len + offset size). Specifically, StringArray is array len * (string len + 4), BigStringArray is array len * (string len + 8)

To load a StringViewArray, we only need to build view array and can reuse the buffer from parquet decoder. The extra memory to setup is array len * view size , i.e., array len * 16. Note that the memory consumption of StringViewArray is constant to string length, i.e., it takes 16 bytes of memory no matter how long the underlying string is.

For a sufficiently large array, the time to build the array should be proportional to the extra memory we set up.

This means that if each of the individual string is small, i.e., smaller than 12 bytes, StringArray is actually faster than StringViewArray. In other words, we should use StringViewArray only when strings are larger than 12 bytes.

@alamb
Copy link
Contributor Author

alamb commented Jul 9, 2024

Update -- @XiangpengHao found the root cause of the "small string is slower" -- read about it in this great writeup: apache/arrow-rs#6031

TLDR is that we can make arrow/parquet reading faster than StringView always with (even) more! work

@alamb
Copy link
Contributor Author

alamb commented Jul 27, 2024

This is done on the string-view2 branch. Once we mereg #11667 we can close this ticket I think

@alamb
Copy link
Contributor Author

alamb commented Jul 29, 2024

#11667 is ready for review 🥳

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
3 participants