Skip to content

Commit

Permalink
Resolve merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
acvictor committed May 31, 2024
1 parent 3f9d0c8 commit 0914f82
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 14 deletions.
19 changes: 12 additions & 7 deletions velox/common/caching/CachedFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ template <
typename Key,
typename Value,
typename Generator,
typename Properties = void,
typename Sizer = DefaultSizer<Value>,
typename Comparator = std::equal_to<Key>,
typename Hash = std::hash<Key>>
Expand Down Expand Up @@ -175,7 +176,9 @@ class CachedFactory {
/// repeatedly, handing off the results to one thread at a time until the
/// all pending requests are satisfied or a cache insert succeeds. This
/// will probably mess with your memory model, so really try to avoid it.
CachedPtr<Key, Value, Comparator, Hash> generate(const Key& key);
CachedPtr<Key, Value, Comparator, Hash> generate(
const Key& key,
const Properties* properties = nullptr);

/// Advanced function taking in a group of keys. Separates those keys into
/// one's present in the cache (returning CachedPtrs for them) and those not
Expand Down Expand Up @@ -350,17 +353,18 @@ template <
typename Key,
typename Value,
typename Generator,
typename Properties,
typename Sizer,
typename Comparator,
typename Hash>
CachedPtr<Key, Value, Comparator, Hash>
CachedFactory<Key, Value, Generator, Sizer, Comparator, Hash>::generate(
const Key& key) {
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
generate(const Key& key, const Properties* properties) {
process::TraceContext trace("CachedFactory::generate");
if (cache_ == nullptr) {
return CachedPtr<Key, Value, Comparator, Hash>{
/*fromCache=*/false,
(*generator_)(key).release(),
(*generator_)(key, properties).release(),
nullptr,
std::make_unique<Key>(key)};
}
Expand Down Expand Up @@ -389,7 +393,7 @@ CachedFactory<Key, Value, Generator, Sizer, Comparator, Hash>::generate(
}
pendingLock.unlock();
// Regenerates in the edge case.
return generate(key);
return generate(key, properties);
}

pending_.insert(key);
Expand All @@ -400,7 +404,7 @@ CachedFactory<Key, Value, Generator, Sizer, Comparator, Hash>::generate(
pendingCv_.notify_all();
};

std::unique_ptr<Value> generatedValue = (*generator_)(key);
std::unique_ptr<Value> generatedValue = (*generator_)(key, properties);
const uint64_t valueSize = Sizer()(*generatedValue);
Value* rawValue = generatedValue.release();
const bool inserted = addCache(key, rawValue, valueSize);
Expand All @@ -424,10 +428,11 @@ template <
typename Key,
typename Value,
typename Generator,
typename Properties,
typename Sizer,
typename Comparator,
typename Hash>
void CachedFactory<Key, Value, Generator, Sizer, Comparator, Hash>::
void CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
retrieveCached(
const std::vector<Key>& keys,
std::vector<std::pair<Key, CachedPtr<Key, Value, Comparator, Hash>>>&
Expand Down
13 changes: 10 additions & 3 deletions velox/common/caching/tests/CachedFactoryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@
using namespace facebook::velox;

namespace {

struct DoublerGenerator {
std::unique_ptr<int> operator()(const int& value) {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
++generated;
return std::make_unique<int>(value * 2);
}
std::atomic<int> generated = 0;
};

struct IdentityGenerator {
std::unique_ptr<int> operator()(const int& value) {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
return std::make_unique<int>(value);
}
};
Expand Down Expand Up @@ -106,7 +111,9 @@ TEST(CachedFactoryTest, basicGeneration) {
}

struct DoublerWithExceptionsGenerator {
std::unique_ptr<int> operator()(const int& value) {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
if (value == 3) {
VELOX_FAIL("3 is bad");
}
Expand Down
11 changes: 9 additions & 2 deletions velox/connectors/hive/FileHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ std::string groupName(const std::string& filename) {
} // namespace

std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
const std::string& filename) {
const std::string& filename,
const FileProperties* properties) {
// We have seen cases where drivers are stuck when creating file handles.
// Adding a trace here to spot this more easily in future.
process::TraceContext trace("FileHandleGenerator::operator()");
Expand All @@ -49,8 +50,14 @@ std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
{
MicrosecondTimer timer(&elapsedTimeUs);
fileHandle = std::make_unique<FileHandle>();
filesystems::FileOptions options;
if (properties) {
options.fileSize = properties->fileSize == -1
? std::nullopt
: std::optional<int64_t>{properties->fileSize};
}
fileHandle->file = filesystems::getFileSystem(filename, properties_)
->openFileForRead(filename);
->openFileForRead(filename, options);
fileHandle->uuid = StringIdLease(fileIds(), filename);
fileHandle->groupId = StringIdLease(fileIds(), groupName(filename));
VLOG(1) << "Generating file handle for: " << filename
Expand Down
6 changes: 5 additions & 1 deletion velox/connectors/hive/FileHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "velox/common/caching/CachedFactory.h"
#include "velox/common/caching/FileIds.h"
#include "velox/common/file/File.h"
#include "velox/connectors/hive/FileProperties.h"

namespace facebook::velox {

Expand Down Expand Up @@ -67,7 +68,9 @@ class FileHandleGenerator {
FileHandleGenerator() {}
FileHandleGenerator(std::shared_ptr<const Config> properties)
: properties_(std::move(properties)) {}
std::unique_ptr<FileHandle> operator()(const std::string& filename);
std::unique_ptr<FileHandle> operator()(
const std::string& filename,
const FileProperties* properties);

private:
const std::shared_ptr<const Config> properties_;
Expand All @@ -77,6 +80,7 @@ using FileHandleFactory = CachedFactory<
std::string,
FileHandle,
FileHandleGenerator,
FileProperties,
FileHandleSizer>;

using FileHandleCachedPtr = CachedPtr<std::string, FileHandle>;
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <optional>
#include <unordered_map>
#include "velox/connectors/Connector.h"
#include "velox/connectors/hive/FileProperties.h"
#include "velox/dwio/common/Options.h"

namespace facebook::velox::connector::hive {
Expand All @@ -27,6 +28,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
dwio::common::FileFormat fileFormat;
const uint64_t start;
const uint64_t length;
std::optional<FileProperties> properties;

/// Mapping from partition keys to values. Values are specified as strings
/// formatted the same way as CAST(x as VARCHAR). Null values are specified as
Expand All @@ -49,6 +51,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
dwio::common::FileFormat _fileFormat,
uint64_t _start = 0,
uint64_t _length = std::numeric_limits<uint64_t>::max(),
std::optional<FileProperties> _properties = std::nullopt,
const std::unordered_map<std::string, std::optional<std::string>>&
_partitionKeys = {},
std::optional<int32_t> _tableBucketNumber = std::nullopt,
Expand All @@ -62,6 +65,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
fileFormat(_fileFormat),
start(_start),
length(_length),
properties(_properties),
partitionKeys(_partitionKeys),
tableBucketNumber(_tableBucketNumber),
customSplitInfo(_customSplitInfo),
Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ void SplitReader::createReader() {

FileHandleCachedPtr fileHandleCachePtr;
try {
fileHandleCachePtr = fileHandleFactory_->generate(hiveSplit_->filePath);
fileHandleCachePtr = fileHandleFactory_->generate(
hiveSplit_->filePath,
hiveSplit_->properties.has_value() ? &*hiveSplit_->properties
: nullptr);
VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get());
} catch (const VeloxRuntimeError& e) {
if (e.errorCode() == error_code::kFileNotFound &&
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/iceberg/IcebergSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ HiveIcebergSplit::HiveIcebergSplit(
dwio::common::FileFormat _fileFormat,
uint64_t _start,
uint64_t _length,
FileProperties _properties,
const std::unordered_map<std::string, std::optional<std::string>>&
_partitionKeys,
std::optional<int32_t> _tableBucketNumber,
Expand All @@ -38,6 +39,7 @@ HiveIcebergSplit::HiveIcebergSplit(
_fileFormat,
_start,
_length,
_properties,
_partitionKeys,
_tableBucketNumber,
_customSplitInfo,
Expand All @@ -55,6 +57,7 @@ HiveIcebergSplit::HiveIcebergSplit(
dwio::common::FileFormat _fileFormat,
uint64_t _start,
uint64_t _length,
FileProperties _properties,
const std::unordered_map<std::string, std::optional<std::string>>&
_partitionKeys,
std::optional<int32_t> _tableBucketNumber,
Expand All @@ -68,6 +71,7 @@ HiveIcebergSplit::HiveIcebergSplit(
_fileFormat,
_start,
_length,
_properties,
_partitionKeys,
_tableBucketNumber,
_customSplitInfo,
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/iceberg/IcebergSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
dwio::common::FileFormat _fileFormat,
uint64_t _start = 0,
uint64_t _length = std::numeric_limits<uint64_t>::max(),
FileProperties fileProperties = {},
const std::unordered_map<std::string, std::optional<std::string>>&
_partitionKeys = {},
std::optional<int32_t> _tableBucketNumber = std::nullopt,
Expand All @@ -46,6 +47,7 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
dwio::common::FileFormat _fileFormat,
uint64_t _start = 0,
uint64_t _length = std::numeric_limits<uint64_t>::max(),
FileProperties fileProperties = {},
const std::unordered_map<std::string, std::optional<std::string>>&
_partitionKeys = {},
std::optional<int32_t> _tableBucketNumber = std::nullopt,
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class HiveIcebergTest : public HiveConnectorTestBase {
fileFomat_,
0,
fileSize,
FileProperties(),
partitionKeys,
std::nullopt,
customSplitInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,33 @@ TEST_F(AbfsFileSystemTest, openFileForReadWithInvalidOptions) {
"File size must be non-negative");
}

TEST_F(AbfsFileSystemTest, fileHandleWithProperties) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
FileHandleFactory factory(
std::make_unique<
SimpleLRUCache<std::string, std::shared_ptr<FileHandle>>>(1),
std::make_unique<FileHandleGenerator>(hiveConfig));
FileProperties properties = {
15 + kOneMB,
1
} auto fileHandle = factory.generate(fullFilePath, &properties).second;
readData(fileHandle->file.get());
}

TEST_F(AbfsFileSystemTest, fileHandleWithoutProperties) {
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
{{"fs.azure.account.key.test.dfs.core.windows.net",
azuriteServer->connectionStr()}});
FileHandleFactory factory(
std::make_unique<
SimpleLRUCache<std::string, std::shared_ptr<FileHandle>>>(1),
std::make_unique<FileHandleGenerator>(hiveConfig));
auto fileHandle = factory.generate(fullFilePath).second;
readData(fileHandle->file.get());
}

TEST_F(AbfsFileSystemTest, multipleThreadsWithReadFile) {
std::atomic<bool> startThreads = false;
auto hiveConfig = AbfsFileSystemTest::hiveConfig(
Expand Down
26 changes: 26 additions & 0 deletions velox/connectors/hive/tests/FileHandleTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,29 @@ TEST(FileHandleTest, localFile) {
// Clean up
remove(filename.c_str());
}

TEST(FileHandleTest, localFileWithProperties) {
filesystems::registerLocalFileSystem();

auto tempFile = exec::test::TempFilePath::create();
const auto& filename = tempFile->getPath();
remove(filename.c_str());

{
LocalWriteFile writeFile(filename);
writeFile.append("foo");
}

FileHandleFactory factory(
std::make_unique<SimpleLRUCache<std::string, FileHandle>>(1000),
std::make_unique<FileHandleGenerator>());
FileProperties properties = {
tempFile->fileSize(), tempFile->fileModifiedTime()};
auto fileHandle = factory.generate(filename, &properties);
ASSERT_EQ(fileHandle->file->size(), 3);
char buffer[3];
ASSERT_EQ(fileHandle->file->pread(0, 3, &buffer), "foo");

// Clean up
remove(filename.c_str());
}
1 change: 1 addition & 0 deletions velox/connectors/hive/tests/HiveConnectorUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ TEST_F(HiveConnectorUtilTest, configureReaderOptions) {
fileFormat,
0UL,
std::numeric_limits<uint64_t>::max(),
facebook::velox::FileProperties(),
partitionKeys,
std::nullopt,
customSplitInfo,
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/tests/utils/HiveConnectorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class HiveConnectorSplitBuilder {
fileFormat_,
start_,
length_,
properties_,
partitionKeys_,
tableBucketNumber_,
customSplitInfo,
Expand All @@ -298,6 +299,7 @@ class HiveConnectorSplitBuilder {
dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF};
uint64_t start_{0};
uint64_t length_{std::numeric_limits<uint64_t>::max()};
facebook::velox::FileProperties properties_;
std::unordered_map<std::string, std::optional<std::string>> partitionKeys_;
std::optional<int32_t> tableBucketNumber_;
std::unordered_map<std::string, std::string> customSplitInfo_ = {};
Expand Down

0 comments on commit 0914f82

Please sign in to comment.