Skip to content

Commit

Permalink
Optimize mapSubcript by caching input keys in a hash map. (facebookin…
Browse files Browse the repository at this point in the history
…cubator#7191)

Summary:
Pull Request resolved: facebookincubator#7191

There are cases where the mapSubscript function can receive the same (base) map over and over again.
In that case searching the same map again and again is redundant.

This optimization makes mapSubscript a stateful function, if it sees that the same
base map is being provided for it, it will cache a materialized version of the input.

The shadowed query e2e runtime reduces from 1.50 hours to 16.48 min.

This optimization is enabled for non-bool primitive types only.

A benchmark is added. The function itself becomes so much faster.  Speedup depends on the
number of batches and the size of the base vector. The production case we have 1 map with 80k
entries, so speedup for the function is extremely high.

```
============================================================================
[...]hmarks/ExpressionBenchmarkBuilder.cpp     relative  time/iter   iters/s
============================================================================
INTEGER_10000_1##subscript                                 25.15ms     39.76
INTEGER_10000_1##subscriptNocaching                          3.48s   287.58m
INTEGER_10000_1000##subscript                              81.40ms     12.29
INTEGER_10000_1000##subscriptNocaching                       3.30s   303.07m
INTEGER_1000_1##subscript                                  23.00ms     43.48
INTEGER_1000_1##subscriptNocaching                        319.80ms      3.13
INTEGER_1000_1000##subscript                               36.41ms     27.46
INTEGER_1000_1000##subscriptNocaching                     372.11ms      2.69
INTEGER_100_1##subscript                                   22.45ms     44.53
INTEGER_100_1##subscriptNocaching                          52.40ms     19.08
INTEGER_100_1000##subscript                                27.77ms     36.01
INTEGER_100_1000##subscriptNocaching                       57.05ms     17.53
INTEGER_10_1##subscript                                    23.65ms     42.28
INTEGER_10_1##subscriptNocaching                           22.81ms     43.83
INTEGER_10_1000##subscript                                 24.18ms     41.36
INTEGER_10_1000##subscriptNocaching                        23.94ms     41.78
VARCHAR_10000_1##subscript                                 62.20ms     16.08
VARCHAR_10000_1##subscriptNocaching                          4.77s   209.59m
VARCHAR_10000_1000##subscript                             155.07ms      6.45
VARCHAR_10000_1000##subscriptNocaching                       7.21s   138.77m
VARCHAR_1000_1##subscript                                  55.51ms     18.01
VARCHAR_1000_1##subscriptNocaching                        483.55ms      2.07
VARCHAR_1000_1000##subscript                               90.37ms     11.07
VARCHAR_1000_1000##subscriptNocaching                     584.56ms      1.71
VARCHAR_100_1##subscript                                   53.77ms     18.60
VARCHAR_100_1##subscriptNocaching                          69.78ms     14.33
VARCHAR_100_1000##subscript                                66.42ms     15.06
VARCHAR_100_1000##subscriptNocaching                       87.73ms     11.40
VARCHAR_10_1##subscript                                    31.04ms     32.21
VARCHAR_10_1##subscriptNocaching                           33.17ms     30.14
VARCHAR_10_1000##subscript                                 33.75ms     29.63
VARCHAR_10_1000##subscriptNocaching                        35.62ms     28.07

```

Reviewed By: oerling

Differential Revision: D50544250

fbshipit-source-id: 6a901473a2cf018c290cbbbe3b73b13ab22c7bfa
  • Loading branch information
laithsakka authored and facebook-github-bot committed Nov 2, 2023
1 parent acebba6 commit 73d4279
Show file tree
Hide file tree
Showing 9 changed files with 749 additions and 252 deletions.
246 changes: 246 additions & 0 deletions velox/functions/lib/SubscriptUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,256 @@
* limitations under the License.
*/

#include <cstdint>
#include <memory>
#include <type_traits>
#include <utility>

#include "velox/common/memory/MemoryPool.h"
#include "velox/functions/lib/SubscriptUtil.h"
#include "velox/type/Type.h"
#include "velox/vector/TypeAliases.h"

namespace facebook::velox::functions {

namespace {

template <TypeKind Kind>
struct SimpleType {
using type = typename TypeTraits<Kind>::NativeType;
};

template <>
struct SimpleType<TypeKind::VARBINARY> {
using type = Varbinary;
};

template <>
struct SimpleType<TypeKind::VARCHAR> {
using type = Varchar;
};

/// Decode arguments and transform result into a dictionaryVector where the
/// dictionary maintains a mapping from a given row to the index of the input
/// map value vector. This allows us to ensure that element_at is zero-copy.
template <TypeKind kind>
VectorPtr applyMapTyped(
bool triggeCaching,
std::shared_ptr<LookupTableBase>& cachedLookupTablePtr,
const SelectivityVector& rows,
const VectorPtr& mapArg,
const VectorPtr& indexArg,
exec::EvalCtx& context) {
static constexpr vector_size_t kMinCachedMapSize = 100;
using TKey = typename TypeTraits<kind>::NativeType;

if (triggeCaching) {
if (!cachedLookupTablePtr) {
cachedLookupTablePtr =
std::make_shared<LookupTable<kind>>(*context.pool());
}
}

auto& typedLookupTable = cachedLookupTablePtr->typedTable<kind>();

auto* pool = context.pool();
BufferPtr indices = allocateIndices(rows.end(), pool);
auto rawIndices = indices->asMutable<vector_size_t>();

// Create nulls for lazy initialization.
NullsBuilder nullsBuilder(rows.end(), pool);

// Get base MapVector.
// TODO: Optimize the case when indices are identity.
exec::LocalDecodedVector mapHolder(context, *mapArg, rows);
auto decodedMap = mapHolder.get();
auto baseMap = decodedMap->base()->as<MapVector>();
auto mapIndices = decodedMap->indices();

// Get map keys.
auto mapKeys = baseMap->mapKeys();
exec::LocalSelectivityVector allElementRows(context, mapKeys->size());
allElementRows->setAll();
exec::LocalDecodedVector mapKeysHolder(context, *mapKeys, *allElementRows);
auto decodedMapKeys = mapKeysHolder.get();

// Get index vector (second argument).
exec::LocalDecodedVector indexHolder(context, *indexArg, rows);
auto decodedIndices = indexHolder.get();

auto rawSizes = baseMap->rawSizes();
auto rawOffsets = baseMap->rawOffsets();

// Lambda that does the search for a key, for each row.
auto processRow = [&](vector_size_t row, TKey searchKey) {
size_t mapIndex = mapIndices[row];
auto size = rawSizes[mapIndex];
size_t offsetStart = rawOffsets[mapIndex];
size_t offsetEnd = offsetStart + size;
bool found = false;

if (triggeCaching && size >= kMinCachedMapSize) {
// Create map for mapIndex if not created.
if (!typedLookupTable.containsMapAtIndex(mapIndex)) {
typedLookupTable.ensureMapAtIndex(mapIndex);
// Materialize the map at index row.
auto& map = typedLookupTable.getMapAtIndex(mapIndex);
for (size_t offset = offsetStart; offset < offsetEnd; ++offset) {
map.emplace(decodedMapKeys->valueAt<TKey>(offset), offset);
}
}

auto& map = typedLookupTable.getMapAtIndex(mapIndex);

// Fast lookup.
auto value = map.find(searchKey);
if (value != map.end()) {
rawIndices[row] = value->second;
found = true;
}

} else {
// Search map without caching.
for (size_t offset = offsetStart; offset < offsetEnd; ++offset) {
if (decodedMapKeys->valueAt<TKey>(offset) == searchKey) {
rawIndices[row] = offset;
found = true;
break;
}
}
}

// Handle NULLs.
if (!found) {
nullsBuilder.setNull(row);
}
};

// When second argument ("at") is a constant.
if (decodedIndices->isConstantMapping()) {
auto searchKey = decodedIndices->valueAt<TKey>(0);
rows.applyToSelected(
[&](vector_size_t row) { processRow(row, searchKey); });
}

// When the second argument ("at") is also a variable vector.
else {
rows.applyToSelected([&](vector_size_t row) {
auto searchKey = decodedIndices->valueAt<TKey>(row);
processRow(row, searchKey);
});
}

// Subscript into empty maps always returns NULLs. Check added at the end to
// ensure user error checks for indices are not skipped.
if (baseMap->mapValues()->size() == 0) {
return BaseVector::createNullConstant(
baseMap->mapValues()->type(), rows.end(), context.pool());
}

return BaseVector::wrapInDictionary(
nullsBuilder.build(), indices, rows.end(), baseMap->mapValues());
}

VectorPtr applyMapComplexType(
const SelectivityVector& rows,
const VectorPtr& mapArg,
const VectorPtr& indexArg,
exec::EvalCtx& context) {
auto* pool = context.pool();

// Use indices with the mapValues wrapped in a dictionary vector.
BufferPtr indices = allocateIndices(rows.end(), pool);
auto rawIndices = indices->asMutable<vector_size_t>();

// Create nulls for lazy initialization.
NullsBuilder nullsBuilder(rows.end(), pool);

// Get base MapVector.
exec::LocalDecodedVector mapHolder(context, *mapArg, rows);
auto decodedMap = mapHolder.get();
auto baseMap = decodedMap->base()->as<MapVector>();
auto mapIndices = decodedMap->indices();

// Get map keys.
auto mapKeys = baseMap->mapKeys();
exec::LocalSelectivityVector allElementRows(context, mapKeys->size());
allElementRows->setAll();
exec::LocalDecodedVector mapKeysHolder(context, *mapKeys, *allElementRows);
auto mapKeysDecoded = mapKeysHolder.get();
auto mapKeysBase = mapKeysDecoded->base();
auto mapKeysIndices = mapKeysDecoded->indices();

// Get index vector (second argument).
exec::LocalDecodedVector indexHolder(context, *indexArg, rows);
auto decodedIndices = indexHolder.get();
auto searchBase = decodedIndices->base();
auto searchIndices = decodedIndices->indices();

auto rawSizes = baseMap->rawSizes();
auto rawOffsets = baseMap->rawOffsets();

// Search the key in each row.
rows.applyToSelected([&](vector_size_t row) {
size_t mapIndex = mapIndices[row];
size_t size = rawSizes[mapIndex];
size_t offset = rawOffsets[mapIndex];

bool found = false;
auto searchIndex = searchIndices[row];
for (auto i = 0; i < size; i++) {
if (mapKeysBase->equalValueAt(
searchBase, mapKeysIndices[offset + i], searchIndex)) {
rawIndices[row] = offset + i;
found = true;
break;
}
}

if (!found) {
nullsBuilder.setNull(row);
};
});

// Subscript into empty maps always returns NULLs.
if (baseMap->mapValues()->size() == 0) {
return BaseVector::createNullConstant(
baseMap->mapValues()->type(), rows.end(), context.pool());
}

return BaseVector::wrapInDictionary(
nullsBuilder.build(), indices, rows.end(), baseMap->mapValues());
}

} // namespace

VectorPtr MapSubscript::applyMap(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
exec::EvalCtx& context) const {
auto& mapArg = args[0];
auto& indexArg = args[1];

// Ensure map key type and second argument are the same.
VELOX_CHECK(mapArg->type()->childAt(0)->equivalent(*indexArg->type()));

if (indexArg->type()->isPrimitiveType()) {
bool triggerCaching = shouldTriggerCaching(mapArg);

return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
applyMapTyped,
indexArg->typeKind(),
triggerCaching,
const_cast<std::shared_ptr<LookupTableBase>&>(lookupTable_),
rows,
mapArg,
indexArg,
context);
} else {
return applyMapComplexType(rows, mapArg, indexArg, context);
}
}

namespace {
std::exception_ptr makeZeroSubscriptError() {
try {
Expand Down
Loading

0 comments on commit 73d4279

Please sign in to comment.