Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fast min/max accumulator for binary / strings (now it uses the slower path) #6906

Closed
alamb opened this issue Jul 10, 2023 · 42 comments · Fixed by #12792
Closed

Implement fast min/max accumulator for binary / strings (now it uses the slower path) #6906

alamb opened this issue Jul 10, 2023 · 42 comments · Fixed by #12792
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 10, 2023

Is your feature request related to a problem or challenge?

#6904 introduces some fancy new hashing and ways to implement aggregates

See related blog post: https://datafusion.apache.org/blog/2023/08/05/datafusion_fast_grouping/

min/max for strings (StringArray / LargeStringArray, etc) now uses the slower Accumulator implementation which could be made much faster

Describe the solution you'd like

I would like to implement a fast GroupsAccumulator for Min/Max

Describe alternatives you've considered

here is one potential way to implement it:

We could store the current minimum for all groups in the same Rows 🤔 and track an index into that Rows for the current minimum for each group.

This would require an extra copy of the input values, but it could probably be vectorized pretty well, as shown in the following diagram.

Sorry what I meant was something like the following where the accumulator only stored the current minimum values.

This approach would potentially end up with min_storage being full of "garbage" if many batches had new minumums, but I think we could heuristically "compact" min_storage (if it had 2*num_groups, for example) if it got too large

                                    ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  
                                                                           
                                    │           Accumulator             │  
                                                state                      
┌─────────┐       ┌─────────┐       │ ┌─────────┐          ┌─────────┐  │  
│ ┌─────┐ │       │ ┌─────┐ │         │ ┌─────┐ │          │ ┌─────┐ │     
│ │  A  │ │       │ │  A  │─┼────┐  │ │ │  D  │ │    ┌─────┼─│  1  │ │  │  
│ ├─────┤ │       │ ├─────┤ │    │    │ ├─────┤ │    │     │ ├─────┤ │     
│ │  B  │ │       │ │  B  │ │    └──┼─┼▶│  A  │◀┼────┘     │ │  0  │ │  │  
│ ├─────┤ │       │ ├─────┤ │         │ └─────┘ │          │ └─────┘ │     
│ │  A  │ │       │ │  A  │ │       │ │         │          │         │  │  
│ ├─────┤ │       │ ├─────┤ │         │         │          │         │     
│ │  A  │ │       │ │  A  │ │       │ │         │          │         │  │  
│ ├─────┤ │       │ ├─────┤ │         │         │          │         │     
│ │  C  │ │       │ │  C  │ │       │ │         │          │         │  │  
│ └─────┘ │       │ └─────┘ │         │         │          │         │     
└─────────┘       └─────────┘       │ └─────────┘          └─────────┘  │  
                                                                           
  input              input          │ min_storage:         min_values   │  
  values             values           Rows                                 
  (Array)            (Rows)         └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  
          step 1:           step 2: for                                    
          convert           any value                   step 3: min value  
          arguments to      that is a new               (per group) is     
          Row format        group                       tracked as an      
                            minimum, copy               index into         
                            it to a                     min_storage `Rows` 
                            second `Rows`                                  
                                                                           

See #6800 (comment) for more details

Additional context

No response

@alamb alamb added the enhancement New feature or request label Jul 10, 2023
@alamb alamb changed the title Implement fast min/max accumulator for strings (now it uses the slower path) Implement fast min/max accumulator for binary / strings (now it uses the slower path) Jan 8, 2024
@alamb
Copy link
Contributor Author

alamb commented Jan 8, 2024

One observation here is that min and max on strings is not that common of an operation from what it seems -- grouping on strings is more common.

Maybe there is some binary usecase where it is important (e.g. embeddings 🤔 )

@devanbenz
Copy link
Contributor

@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.

@alamb
Copy link
Contributor Author

alamb commented Aug 26, 2024

@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.

I dont know of anyone working on this @devanbenz -- but I also don't know of any benchmarks or actual queries that use min / max on string columns. The place it shows up is when computing statistics when writing parquet, but I think parquet is already pretty good at this (and has its own code to compute min/max)

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

It actually turns out that Min / Max on string/binary columns are in several ClickBench queries:

SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE '%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;

SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;

I don't think min/max have appeared as priorities in benchmarking before because the queries in question are doing other string heavy operations that tend to dominate. Thus the use of GroupsAccumulatorAdaptor for Min/Max on strings, while bad, is overshadowed by other things

However, while working on #12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow. We can likely restore their speed to something similar to apache/arrow-rs#6408 but I also think this is a good time to actually make Min / Max on strings faster. I will write up some ideas on how to do this

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

Background

(I will make a PR shortly to add this to the actual datafusion docs)

GroupsAccumulator logically does this:

      ┌─────┐                            
      │  0  │───────────▶   "A"          
      ├─────┤                            
      │  1  │───────────▶   "Z"          
      └─────┘                            
        ...                 ...          
      ┌─────┐                            
      │ N-2 │               "A"          
      ├─────┤                            
      │ N-1 │───────────▶   "Q"          
      └─────┘                            
                                         
                                         
    Logical group      Current Min/Max   
       number          value for that    
                       group             
                                         
                                         
                                         
GroupsAccumulator to store N aggregate   
values: logically keepa a mapping from   
each group index to the current value                                        

Today, String / Binary min/max values are implemented using GroupsAccumulatorAdapter which results in

                                                              Individual String
                                                              (separate        
                                                              allocation)      
                                                                               
   ┌─────┐            ┌──────────────────────────┐                             
   │  0  │───────────▶│  ScalarValue::Utf8("A")  ├──────────▶   "A"            
   ├─────┤            ├──────────────────────────┤                             
   │  1  │───────────▶│  ScalarValue::Utf8("Z")  │──────────▶   "Z"            
   └─────┘            └──────────────────────────┘                             
     ...                 ...                                    ...            
   ┌─────┐            ┌──────────────────────────┐                             
   │ N-2 │            │  ScalarValue::Utf8("A")  │──────────▶   "A"            
   ├─────┤            ├──────────────────────────┤                             
   │ N-1 │───────────▶│  ScalarValue::Utf8("Q")  │──────────▶   "Q"            
   └─────┘            └──────────────────────────┘                             
                                                                               
                                                                               
 Logical group         Current Min/Max value for that group stored             
    number             as a ScalarValue which points to an                     
                       indivdually allocated String                            
                                                                               
                                                                               
                                                                               
   How GroupsAccumulatorAdaptor works today:                                   
   stores each current min/max as a                                            
   ScalarValue                                                                 
                                                                               

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

Potential Design

One high level idea is to build a data structure that uses the same internal format (views/buffers) as StringViewArray in Arrow:

                  ┌───────────────────────────────────────────┐  
                  │                        Stored in Vec<u8>  │  
                  │   Stored in a         ┌─────────────────┐ │  
                  │     Vec<u128>         │ ┌───────────┐   │ │  
                  │ ┌─────────────┐    ─ ─│▶│some value │   │ │  
   ┌─────┐        │ │ ┌─────────┐ │   │   │ └───────────┘   │ │  
   │  0  │────────┼─┼▶│  View   │ │       │                 │ │  
   ├─────┤        │ │ ├─────────┤ │   │   │                 │ │  
   │  1  │────────┼─┼▶│  View   │─│─ ─ ┐  │      ...        │ │  
   └─────┘        │ │ └─────────┘ │   │   │                 │ │  
     ...          │ │    ...      │    │  │                 │ │  
   ┌─────┐        │ │ ┌─────────┐ │   │   │                 │ │  
   │ N-2 │        │ │ │  View   │─│─ ─ │  │   ┌────────────┐│ │  
   ├─────┤        │ │ ├─────────┤ │     ─ ┼ ─▶│other value ││ │  
   │ N-1 │────────┼─┼▶│  View   │ │       │   └────────────┘│ │  
   └─────┘        │ │ └─────────┘ │       └─────────────────┘ │  
                  │ └─────────────┘  String values are stored │  
                  │                  inline or in extra byte  │  
 Logical group    │                  buffer                   │  
    number        └───────────────────────────────────────────┘  
                   New structure: MutableStringViewBuilder       
                                                                 
                   Current Min/Max value for that group stored in
                   same format as StringViewArray                
                                                                 

In this design, the current value for each group is stored in two parts (as described on arrow docs)

  1. a fixed size u128 view
  2. a variable length part with the string data

As new batches are updated, each View is updated if necessary

Benefits of design:

  1. Hopefully use the same code as in arrow-rs
  2. Allows Zero copy conversion to StringView / BinaryView at output
  3. Use inlined values for quick min/max comparison

I believe (though we will have to verify it) that the conversion from MutableStringViewBuilder to just StringArray (not StringViewArray) should also be better than the current GroupsAccumulatorAdapter. Both conversions need to copy the string bytes again into the packed StringArray format, but the GroupsAccumulatorAdapter also has to allocate/free owned Strings as well

Potential challenges

I think the trickiest part of this code, other the low level code optimizations is that as min/max values are replaced, data in the variable length buffer will become "garbage" (not reachable) thus consuming more memory than necessary:

                        Stored in Vec<u8>   
                     ┌────────────────────┐ 
                     │ ┌────────────────┐ │ 
┌─────────────┐      │ │prev max value 1│ │ 
│ ┌─────────┐ │      │ └────────────────┘ │ 
│ │  View   │─│─ ┐   │        ...         │ 
│ └─────────┘ │      │ ┌────────────────┐ │ 
│     ...     │  │   │ │prev max value m│ │ 
│             │      │ └────────────────┘ │ 
│             │  │   │ ┌────────────────┐ │ 
│             │   ─ ─│▶│prev max value m│ │ 
│             │      │ └────────────────┘ │ 
│             │      │        ...         │ 
│             │      └────────────────────┘ 
│             │                             
└─────────────┘      Previous min/max values
                     are not pointed to     
                     anymore and need to be 
                     cleaned up             

I think this means the code will need something GenericByteViewArray::gc run occasionally

Random Thoughts

Thoughts: maybe this structure (MutableStringViewBuilder??) could be upstreamed eventually

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.

@devanbenz I think this would be a fun and interesting project as well as valuable to DataFusion. However, I also think it is pretty advanced -- I would enjoy helping with it, but also maybe @Rachelint or @jayzhan211 are interested in helping out 🤔

@devanbenz
Copy link
Contributor

@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.

@devanbenz I think this would be a fun and interesting project as well as valuable to DataFusion. However, I also think it is pretty advanced -- I would enjoy helping with it, but also maybe @Rachelint or @jayzhan211 are interested in helping out 🤔

Great! I'll get started on this later in the week/over the weekend :) Will likely bug folks as I require help haha 😆

@devanbenz
Copy link
Contributor

take

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

In terms of implementation, what I suggest is:

  1. Do a POC implementaiton: wire up just enough `StringView, don't worry about GC, basic unit tests
  2. Verify it makes the clickbench query faster
  3. Flesh out testing, documentation, add support for StringArrary, etc
  4. Merge and profit (bonus points for blogging about it)

For the POC here is the reproducer I recommend:

Step 1. Get hits_partitioned using bench.sh:

cd benchmarks
./bench.sh data clickbench_partitioned

Step 2: Prepare a script with reproducer query:

set datafusion.execution.parquet.schema_force_view_types = true;


SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer")
FROM hits_partitioned
WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2/benchmarks/data$ cat q28.sql
set datafusion.execution.parquet.schema_force_view_types = true;

SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer")
FROM hits_partitioned
WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;

Step 3: Run script (with release build of datafusion-cli):

datafusion-cli -f q28.sql
  • set datafusion.execution.parquet.schema_force_view_types = true; --> Elapsed 18.431 seconds.
  • set datafusion.execution.parquet.schema_force_view_types = false; --> Elapsed 6.427 seconds.

The goal is to get set datafusion.execution.parquet.schema_force_view_types = true; to be the same (or better) than when it is false

If you look at the flamegraph-string-view.svg, you can see most of the time is spent doing GroupsAccumulator

Screenshot 2024-09-16 at 4 44 50 PM

@Rachelint
Copy link
Contributor

@alamb is there anyone working on this + is this issue still relevant? I would love to tackle it as it seems like an interesting feature/optimization.

@devanbenz I think this would be a fun and interesting project as well as valuable to DataFusion. However, I also think it is pretty advanced -- I would enjoy helping with it, but also maybe @Rachelint or @jayzhan211 are interested in helping out 🤔

Seems really interesting, I am reading the related disscusions.

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

BTW in case anyone is interested, I recorded a short video on how to make these flamegraphs: https://youtu.be/2z11xtYw_xs

I will add a link to that in the docs later

@Rachelint
Copy link
Contributor

The challenge of String seems that?

  • If we just simply use a Vec<String> like primitives to keep the min/max values, it is too expensive to convert them to StringArray/StringViewArray(many many copy)
  • But if we use StringArray like approach to keep the values, we can't update the min/max values.
  • So Finally we need to use a StringViewArray like approach to make it, but still have the new challenge about gc?

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

The challenge of String seems that?

  • If we just simply use a Vec<String> like primitives to keep the min/max values, it is too expensive to convert them to StringArray/StringViewArray(many many copy)

I think the overhead is actually mostly that there is an additional (small) allocation for each String. For queries with a small numer of groups (like 100) an extra 100 allocations isn't all that bad. For queries with millions of groups the overhad is substantial

  • But if we use StringArray like approach to keep the values, we can't update the min/max values.

I suppose we could potentially update the values as long as the new strings were shorter 🤔

  • So Finally we need to use a StringViewArray like approach to make it, but still have the new challenge about gc?

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

@Rachelint your implicit idea of using Vec<String> to store the state I think is actually quite interesting and maybe we should try that one first:

It would at least avoid calling Array::slice and likely be better than using GroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management 🤔

@Rachelint
Copy link
Contributor

Rachelint commented Sep 17, 2024

@Rachelint your implicit idea of using Vec<String> to store the state I think is actually quite interesting and maybe we should try that one first:

It would at least avoid calling Array::slice and likely be better than using GroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management 🤔

🤔 Can we still use the string view like approach to store states #6906 (comment), but for the uninlined state, we use a single String to store it?
And when we output them to StringViewArray, we convert this single String to a tiny buffer.

For example, if all states are uninlined(len > 12), the output StringViewArray may be like:

row1: view1 buffer1(with only one string)
row2: view2 buffer2(with only one string)
...
rown: viewn buffern

I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?

@Rachelint
Copy link
Contributor

@Rachelint your implicit idea of using Vec<String> to store the state I think is actually quite interesting and maybe we should try that one first:

It would at least avoid calling Array::slice and likely be better than using GroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management 🤔

Yes... At least it will be better than now, even we just use Vec<String> to impl a specific GroupsAccumulator for String type...

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?

I think using a single Buffer for each string will be bad for performance (likely worse than storing as String and copying them at the end. StringViewArray is really optimized for a small number of buffers (even though in theory it could have 2B of them as it is indexed on i32)

@Rachelint
Copy link
Contributor

Rachelint commented Sep 17, 2024

I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?

I think using a single Buffer for each string will be bad for performance (likely worse than storing as String and copying them at the end. StringViewArray is really optimized for a small number of buffers (even though in theory it could have 2B of them as it is indexed on i32)

Ok, for StringView min/max, seems we can just start with using Vec<u128>(views) to store the inlined state(<= 12), use Vec<String> to store the unlined.

And when converting it to StringViewArray, we just copy the Vec<String> to create the buffer (GroupsAccumulatorAdapter copy the states too).

For the short strings(<=12), it can avoid allocating String, and for the long ones, it just do the same thing as GroupsAccumulatorAdapter. Seems it can have a better performance(due optimization for shorts)?

@alamb
Copy link
Contributor Author

alamb commented Sep 17, 2024

I am not familiar enough with StringViewArray, is it ok to do that? And will it lead to a extremely bad performance?

I think using a single Buffer for each string will be bad for performance (likely worse than storing as String and copying them at the end. StringViewArray is really optimized for a small number of buffers (even though in theory it could have 2B of them as it is indexed on i32)

Ok, for StringView min/max, seems we can just start with using Vec<u128>(views) to store the inlined state(<= 12), use Vec<String> to store the unlined.

And when converting it to StringViewArray, we just copy the Vec<String> to create the buffer (GroupsAccumulatorAdapter copy the states too).

For the short strings(<=12), it can avoid allocating String, and for the long ones, it just do the same thing as GroupsAccumulatorAdapter. Seems it can have a better performance(due optimization for shorts)?

Seems like a reasonable place to start in my opinion. If we want to get more sophisticated at a later time we can try something more exotic. I suspect there will be times when individual allocations will be faster and times when a buffer will be faster and there will be memory consumption tradeoffs as well.

TLDR we should implement something and as long as it is better than what is currently going on that will be good.

@XiangpengHao
Copy link
Contributor

while working on #12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow.

Fixed in #12575, now min/max on StringViewArray should be slightly faster than StringArray

@XiangpengHao
Copy link
Contributor

while working on #12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow.

Fixed in #12575, now min/max on StringViewArray should be slightly faster than StringArray

I mean, fixed for that particular clickbench query which does not leverage stats to prune parquet access.

@devanbenz
Copy link
Contributor

devanbenz commented Sep 22, 2024

while working on #12092 it turns out that Min / Max on BinaryView and StringView are suuuuper slow.

Fixed in #12575, now min/max on StringViewArray should be slightly faster than StringArray

I mean, fixed for that particular clickbench query which does not leverage stats to prune parquet access.

Cool, I'm assuming this change didn't impact the GroupsAccumulatorAdapter call in anyway since I'm still seeing a similar performance + flamegraph with the following query:

set datafusion.execution.parquet.schema_force_view_types = true;


SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\\.)?([^/]+)/.*$', '\\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer")
FROM hits_partitioned
WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;

with_changes_sv

@alamb
Copy link
Contributor Author

alamb commented Sep 22, 2024

BTW I was thinking more about this issue -- while a native Min/Max for strings / stringview will help, I have an idea that might make it simply it an added bonus: pushing the cast down into the parquet reader #12509 (comment)

@devanbenz
Copy link
Contributor

devanbenz commented Sep 22, 2024

BTW I was thinking more about this issue -- while a native Min/Max for strings / stringview will help, I have an idea that might make it simply it an added bonus: pushing the cast down into the parquet reader #12509 (comment)

This would only work for parquet correct? It would effectively be a part of the TableScan within the logical plan?

@devanbenz
Copy link
Contributor

devanbenz commented Sep 22, 2024

@Rachelint your implicit idea of using Vec<String> to store the state I think is actually quite interesting and maybe we should try that one first:

It would at least avoid calling Array::slice and likely be better than using GroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management 🤔

So this would effectively be using a Vec<String> for state instead of your proposal of using something akin to StringView notated here #6906 (comment) @alamb?

@devanbenz
Copy link
Contributor

I have had more time to take a look at this and sort of just wrap my head around how GroupsAccumulatorAdapter works a bit. I'm seeing that the performance impact is happening here

in slice_and_maybe_filter I'm under the assumption that this is happening due to the difference between the BinaryView and the Binary Scalar values. Taking a look at https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/ I understand that BinaryView is effectively a non-contiguous structure where as Binary is contiguous so it is easily sliced. So the idea here is to effectively change the underlying state in which the accumulator structure receives that data thus making it either easier to call slice or remove the calling of slice entirely?

@Rachelint
Copy link
Contributor

Rachelint commented Sep 23, 2024

@Rachelint your implicit idea of using Vec<String> to store the state I think is actually quite interesting and maybe we should try that one first:
It would at least avoid calling Array::slice and likely be better than using GroupsAccumulatorAdapter, even if we could improve it later with more explciit memory management 🤔

So this would effectively be using a Vec<String> for state instead of your proposal of using something akin to StringView notated here #6906 (comment) @alamb?

I think what @alamb means is that just simply using Vec<String> to store the states will be at least not worse than StringArray + GroupsAccumulatorAdapter, and it is easy to start from.

And after improving it to be not worse than StringArray + GroupsAccumulatorAdapter, we can continue to push forward enabling string view by default.

@Rachelint
Copy link
Contributor

I have had more time to take a look at this and sort of just wrap my head around how GroupsAccumulatorAdapter works a bit. I'm seeing that the performance impact is happening here


in slice_and_maybe_filter I'm under the assumption that this is happening due to the difference between the BinaryView and the Binary Scalar values. Taking a look at https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/ I understand that BinaryView is effectively a non-contiguous structure where as Binary is contiguous so it is easily sliced. So the idea here is to effectively change the underlying state in which the accumulator structure receives that data thus making it either easier to call slice or remove the calling of slice entirely?

I guess the current goal is to remove the calling of slice, and get an at least not worse performance than StringArray + GroupsAccumulatorAdapter as mentioned above.

@alamb
Copy link
Contributor Author

alamb commented Sep 23, 2024

I think what @alamb means is that just simply using Vec to store the states will be at least not worse than StringArray + GroupsAccumulatorAdapter, and it is easy to start from.

Yes indeed. Thank you

I guess the current goal is to remove the calling of slice, and get an at least not worse performance than StringArray + GroupsAccumulatorAdapter as mentioned above.

💯

@devanbenz
Copy link
Contributor

devanbenz commented Sep 23, 2024

I have had more time to take a look at this and sort of just wrap my head around how GroupsAccumulatorAdapter works a bit. I'm seeing that the performance impact is happening here

in slice_and_maybe_filter I'm under the assumption that this is happening due to the difference between the BinaryView and the Binary Scalar values. Taking a look at influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb I understand that BinaryView is effectively a non-contiguous structure where as Binary is contiguous so it is easily sliced. So the idea here is to effectively change the underlying state in which the accumulator structure receives that data thus making it either easier to call slice or remove the calling of slice entirely?

I guess the current goal is to remove the calling of slice, and get an at least not worse performance than StringArray + GroupsAccumulatorAdapter as mentioned above.

Awesome thank you @Rachelint.

I've begun working on a small POC locally with this information. It took me a bit of reading up on context to get going for sure. After reading through https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html and https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html, tracing through the code locally, and reading through the discussion a few times I now have a better understanding of where to start, the expected outcome and a path to get there.

@devanbenz
Copy link
Contributor

devanbenz commented Sep 26, 2024

Finally got around to trying out a little POC locally. I'm getting much better results by just storing the state as Vec<String> it's very much not polished at this moment in time but I will have some more time on Sunday + early next week to continue working on this. It is currently only implemented for MIN will need to work on MAX next but I foresee the implementation being almost identical.

Screenshot 2024-09-26 at 4 56 25 PM

Without view types:

25 row(s) fetched.
Elapsed 12.293 seconds.

With view types:

25 row(s) fetched.
Elapsed 12.135 seconds.

A 1.29% performance improvement 😎

@alamb
Copy link
Contributor Author

alamb commented Sep 29, 2024

Nice! If you want to put up a POC / draft PR I would be more than happy to help work on it with you

@devanbenz
Copy link
Contributor

devanbenz commented Sep 30, 2024

Nice! If you want to put up a POC / draft PR I would be more than happy to help work on it with you

Sounds good I pushed up what I have so far as a draft. There are a few things I would like to do including making the accumulator more generic so that I use a single method for min or max and pass an aggregate fn similar to how other UDFs do it.

@alamb
Copy link
Contributor Author

alamb commented Sep 30, 2024

I will check out #12677 -- I hoped to do it today but I appear to be almost out of time

@jayzhan211
Copy link
Contributor

I think we can store (len, prefix, index) for string/binary view cases. We then compare the value based on len + inline or prefix first, if they are the same we look up the actual value with index. In this way, we don't have the garbage collect issue, since we store the index instead of the whole string.

@alamb @Rachelint @devanbenz

@Rachelint
Copy link
Contributor

Rachelint commented Oct 4, 2024

I think we can store (len, prefix, index) for string/binary view cases. We then compare the value based on len + inline or prefix first, if they are the same we look up the actual value with index. In this way, we don't have the garbage collect issue, since we store the index instead of the whole string.

@alamb @Rachelint @devanbenz

I am a bit confused about where we store actual string values?

We still store it using Vec<String>, and through copying way when constructing the StringView array like #6906 (comment) ?

I think what @Dandandan memtioned in #6800 (comment) seems to be performance best impl?
But it is a bit hard to make it... we need to impl something like memory allocator...

@alamb
Copy link
Contributor Author

alamb commented Oct 4, 2024

I think we can store (len, prefix, index) for string/binary view cases. We then compare the value based on len + inline or prefix first, if they are the same we look up the actual value with index. In this way, we don't have the garbage collect issue, since we store the index instead of the whole string.

I think the garbage collection issue happens when the current "min" for a group is updated to a new value. We have to copy the new min value into the buffer and adjust the offset, but the old min value is still in the string view buffer

@jayzhan211
Copy link
Contributor

jayzhan211 commented Oct 5, 2024

My approach only works within batch not across batch. The garbage collection issue here is that whenever we got a better "min", we need to append the bytes and the old bytes is now "garbage". But given a batch, we can get the string value by the row index of the StringArray, therefore we don't need to update the possible long bytes but update only (len, prefix, row - index) . We only store this "min" (which creates garbage) after computing the whole batch and update it to the accumulator. The more values are in the same group within the batch, the less "garbage" we created compare to the original proposed design. The best case is that the whole batch has the same group value so we just do 1 update to the accumulator, the worst case is that every group value is distinct so we need n (n is batch size and the group size of the batch) updates to the accumulator. Maybe repartition prior to the aggregation helps a lot since the same group stays together in a batch.

@alamb
Copy link
Contributor Author

alamb commented Oct 6, 2024

The more values are in the same group within the batch, the less "garbage" we created compare to the original proposed design.

this makes a lot of sense (and is similar to what the current non groups accumulator does -- finds the min per batch and then updates the current overall min from that, rather than updating the min for each input row).

@alamb
Copy link
Contributor Author

alamb commented Oct 7, 2024

I started hacking on this today: #12792. I am fighting with the generics, but I think I am pretty close.

@alamb
Copy link
Contributor Author

alamb commented Oct 10, 2024

Ok, I think #12792 is ready to review. It has a nice 10% improvement on Q28 and avoids the slowdown when string view is enabled

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
5 participants