From cfc73d99db446cf1653efacde55e63f32cb431ee Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Wed, 21 Aug 2024 14:25:35 +0800 Subject: [PATCH] fix(interactive): Add max retry for ODPS fragment loader (#4166) ODPSFragmentLoader need to fetch data via network connection, which may fails. We should fail after maximum times retry, not waiting forever. Fix #4158 --- .../rt_mutable_graph/loader/odps_fragment_loader.cc | 13 +++++++++++++ .../rt_mutable_graph/loader/odps_fragment_loader.h | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc index b90dc11b5b40..d00aebea41cb 100644 --- a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc @@ -117,11 +117,17 @@ void ODPSReadClient::CreateReadSession( const std::vector& selected_partitions) { auto resp = createReadSession(table_identifier, selected_cols, partition_cols, selected_partitions); + size_t cur_retry = 0; while (resp.status_ != apsara::odps::sdk::storage_api::Status::OK && resp.status_ != apsara::odps::sdk::storage_api::Status::WAIT) { LOG(ERROR) << "CreateReadSession failed" << resp.error_message_; + if (cur_retry >= MAX_RETRY) { + LOG(FATAL) << "Reach max retry times " << MAX_RETRY + << ", when creating read session."; + } resp = createReadSession(table_identifier, selected_cols, partition_cols, selected_partitions); + cur_retry++; } *session_id = resp.session_id_; @@ -279,6 +285,7 @@ ODPSStreamRecordBatchSupplier::GetNextBatch() { if (!cur_batch_reader_ || cur_split_index_ >= split_count_) { return record_batch; } + size_t cur_retry = 0; while (true) { if (!cur_batch_reader_->Read(record_batch)) { if (cur_batch_reader_->GetStatus() != @@ -287,10 +294,16 @@ ODPSStreamRecordBatchSupplier::GetNextBatch() { << cur_batch_reader_->GetErrorMessage() << ", " << cur_batch_reader_->GetStatus() << ", split id: " << cur_split_index_; + if (cur_retry >= ODPSReadClient::MAX_RETRY) { + LOG(FATAL) << "Reach max retry times " << ODPSReadClient::MAX_RETRY + << ", split id: " << cur_split_index_; + } cur_batch_reader_ = odps_read_client_.GetArrowClient()->ReadRows(read_rows_req_); + cur_retry++; } else { VLOG(1) << "Read split " << cur_split_index_ << " finished"; + cur_retry = 0; // move to next split cur_split_index_ += worker_num_; if (cur_split_index_ >= split_count_) { diff --git a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h index 026bb954f63d..fb46110b722d 100644 --- a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h @@ -61,6 +61,7 @@ class ODPSReadClient { public: static constexpr const int CONNECTION_TIMEOUT = 5; static constexpr const int READ_WRITE_TIMEOUT = 10; + static constexpr const size_t MAX_RETRY = 10; ODPSReadClient(); ~ODPSReadClient(); @@ -112,7 +113,6 @@ class ODPSReadClient { std::string output_directory_; std::shared_ptr arrow_client_ptr_; size_t MAX_PRODUCER_NUM = 8; - size_t MAX_RETRY = 5; }; class ODPSStreamRecordBatchSupplier : public IRecordBatchSupplier {