Skip to content

Commit

Permalink
Merge branch 'main' into use-latest-arrow-version
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei1949 authored Aug 21, 2024
2 parents b1a3358 + cfc73d9 commit 91cab05
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
13 changes: 13 additions & 0 deletions flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,17 @@ void ODPSReadClient::CreateReadSession(
const std::vector<std::string>& selected_partitions) {
auto resp = createReadSession(table_identifier, selected_cols, partition_cols,
selected_partitions);
size_t cur_retry = 0;
while (resp.status_ != apsara::odps::sdk::storage_api::Status::OK &&
resp.status_ != apsara::odps::sdk::storage_api::Status::WAIT) {
LOG(ERROR) << "CreateReadSession failed" << resp.error_message_;
if (cur_retry >= MAX_RETRY) {
LOG(FATAL) << "Reach max retry times " << MAX_RETRY
<< ", when creating read session.";
}
resp = createReadSession(table_identifier, selected_cols, partition_cols,
selected_partitions);
cur_retry++;
}
*session_id = resp.session_id_;

Expand Down Expand Up @@ -279,6 +285,7 @@ ODPSStreamRecordBatchSupplier::GetNextBatch() {
if (!cur_batch_reader_ || cur_split_index_ >= split_count_) {
return record_batch;
}
size_t cur_retry = 0;
while (true) {
if (!cur_batch_reader_->Read(record_batch)) {
if (cur_batch_reader_->GetStatus() !=
Expand All @@ -287,10 +294,16 @@ ODPSStreamRecordBatchSupplier::GetNextBatch() {
<< cur_batch_reader_->GetErrorMessage() << ", "
<< cur_batch_reader_->GetStatus()
<< ", split id: " << cur_split_index_;
if (cur_retry >= ODPSReadClient::MAX_RETRY) {
LOG(FATAL) << "Reach max retry times " << ODPSReadClient::MAX_RETRY
<< ", split id: " << cur_split_index_;
}
cur_batch_reader_ =
odps_read_client_.GetArrowClient()->ReadRows(read_rows_req_);
cur_retry++;
} else {
VLOG(1) << "Read split " << cur_split_index_ << " finished";
cur_retry = 0;
// move to next split
cur_split_index_ += worker_num_;
if (cur_split_index_ >= split_count_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class ODPSReadClient {
public:
static constexpr const int CONNECTION_TIMEOUT = 5;
static constexpr const int READ_WRITE_TIMEOUT = 10;
static constexpr const size_t MAX_RETRY = 10;
ODPSReadClient();

~ODPSReadClient();
Expand Down Expand Up @@ -112,7 +113,6 @@ class ODPSReadClient {
std::string output_directory_;
std::shared_ptr<ArrowClient> arrow_client_ptr_;
size_t MAX_PRODUCER_NUM = 8;
size_t MAX_RETRY = 5;
};

class ODPSStreamRecordBatchSupplier : public IRecordBatchSupplier {
Expand Down

0 comments on commit 91cab05

Please sign in to comment.