Skip to content

Commit

Permalink
fix(interactive): Add max retry for ODPS fragment loader (#4166)
Browse files Browse the repository at this point in the history
ODPSFragmentLoader need to fetch data via network connection, which may
fails. We should fail after maximum times retry, not waiting forever.

Fix #4158
  • Loading branch information
zhanglei1949 authored Aug 21, 2024
1 parent aac9060 commit cfc73d9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
13 changes: 13 additions & 0 deletions flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,17 @@ void ODPSReadClient::CreateReadSession(
const std::vector<std::string>& selected_partitions) {
auto resp = createReadSession(table_identifier, selected_cols, partition_cols,
selected_partitions);
size_t cur_retry = 0;
while (resp.status_ != apsara::odps::sdk::storage_api::Status::OK &&
resp.status_ != apsara::odps::sdk::storage_api::Status::WAIT) {
LOG(ERROR) << "CreateReadSession failed" << resp.error_message_;
if (cur_retry >= MAX_RETRY) {
LOG(FATAL) << "Reach max retry times " << MAX_RETRY
<< ", when creating read session.";
}
resp = createReadSession(table_identifier, selected_cols, partition_cols,
selected_partitions);
cur_retry++;
}
*session_id = resp.session_id_;

Expand Down Expand Up @@ -279,6 +285,7 @@ ODPSStreamRecordBatchSupplier::GetNextBatch() {
if (!cur_batch_reader_ || cur_split_index_ >= split_count_) {
return record_batch;
}
size_t cur_retry = 0;
while (true) {
if (!cur_batch_reader_->Read(record_batch)) {
if (cur_batch_reader_->GetStatus() !=
Expand All @@ -287,10 +294,16 @@ ODPSStreamRecordBatchSupplier::GetNextBatch() {
<< cur_batch_reader_->GetErrorMessage() << ", "
<< cur_batch_reader_->GetStatus()
<< ", split id: " << cur_split_index_;
if (cur_retry >= ODPSReadClient::MAX_RETRY) {
LOG(FATAL) << "Reach max retry times " << ODPSReadClient::MAX_RETRY
<< ", split id: " << cur_split_index_;
}
cur_batch_reader_ =
odps_read_client_.GetArrowClient()->ReadRows(read_rows_req_);
cur_retry++;
} else {
VLOG(1) << "Read split " << cur_split_index_ << " finished";
cur_retry = 0;
// move to next split
cur_split_index_ += worker_num_;
if (cur_split_index_ >= split_count_) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class ODPSReadClient {
public:
static constexpr const int CONNECTION_TIMEOUT = 5;
static constexpr const int READ_WRITE_TIMEOUT = 10;
static constexpr const size_t MAX_RETRY = 10;
ODPSReadClient();

~ODPSReadClient();
Expand Down Expand Up @@ -112,7 +113,6 @@ class ODPSReadClient {
std::string output_directory_;
std::shared_ptr<ArrowClient> arrow_client_ptr_;
size_t MAX_PRODUCER_NUM = 8;
size_t MAX_RETRY = 5;
};

class ODPSStreamRecordBatchSupplier : public IRecordBatchSupplier {
Expand Down

0 comments on commit cfc73d9

Please sign in to comment.