Skip to content

Commit

Permalink
Fix data race for RemoteSource::was_query_canceled
Browse files Browse the repository at this point in the history
Since onCancel() and generate() can be called from different threads

TSAN reports:

    WARNING: ThreadSanitizer: data race (pid=253)
      Write of size 1 at 0x7b50008c25c2 by thread T144 (mutexes: write M643587328754916744):
        #0 DB::RemoteSource::onCancel() /build/obj-x86_64-linux-gnu/../src/Processors/Sources/RemoteSource.cpp:79:24 (clickhouse+0x11c2019d)
        #1 DB::IProcessor::cancel() /build/obj-x86_64-linux-gnu/../src/Processors/IProcessor.h:235:9 (clickhouse+0x11c21def)
        #2 DB::RemoteSource::onUpdatePorts() /build/obj-x86_64-linux-gnu/../src/Processors/Sources/RemoteSource.h:32:13 (clickhouse+0x11c21def)
        #3 DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::ExecutingGraph::Edge&, std::__1::queue<DB::ExecutingGraph::Node*, std::__1::deque<DB::ExecutingGraph::Node*, std::__1::allocator<DB::ExecutingGraph::Node*> > >&, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:190:43 (clickhouse+0x11aaf954)
        #4 DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue<DB::ExecutingGraph::Node*, std::__1::deque<DB::ExecutingGraph::Node*, std::__1::allocator<DB::ExecutingGraph::Node*> > >&, std::__1::unique_lock<std::__1::mutex>) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:296:18 (clickhouse+0x11ab0169)
        #5 DB::PipelineExecutor::tryAddProcessorToStackIfUpdated(DB::ExecutingGraph::Edge&, std::__1::queue<DB::ExecutingGraph::Node*, std::__1::deque<DB::ExecutingGraph::Node*, std::__1::allocator<DB::ExecutingGraph::Node*> > >&, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:187:16 (clickhouse+0x11aaf9c2)
        #6 DB::PipelineExecutor::prepareProcessor(unsigned long, unsigned long, std::__1::queue<DB::ExecutingGraph::Node*, std::__1::deque<DB::ExecutingGraph::Node*, std::__1::allocator<DB::ExecutingGraph::Node*> > >&, std::__1::unique_lock<std::__1::mutex>) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:296:18 (clickhouse+0x11ab0169)
        #7 DB::PipelineExecutor::executeStepImpl(unsigned long, unsigned long, std::__1::atomic<bool>*) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:589:26 (clickhouse+0x11ab2a3e)
        #8 DB::PipelineExecutor::executeSingleThread(unsigned long, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:477:5 (clickhouse+0x11ab538b)
        #9 DB::PipelineExecutor::executeImpl(unsigned long)::$_4::operator()() const /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:736:21 (clickhouse+0x11ab538b)
        #10 decltype(std::__1::forward<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&>(fp)()) std::__1::__invoke_constexpr<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3525:1 (clickhouse+0x11ab538b)
        #11 decltype(auto) std::__1::__apply_tuple_impl<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&, std::__1::__tuple_indices<>) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/tuple:1415:1 (clickhouse+0x11ab538b)
        #12 decltype(auto) std::__1::apply<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/tuple:1424:1 (clickhouse+0x11ab538b)
        #13 ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()::operator()() /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.h:178:13 (clickhouse+0x11ab538b)
        #14 decltype(std::__1::forward<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(fp)()) std::__1::__invoke<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3519:1 (clickhouse+0x11ab538b)
        #15 void std::__1::__invoke_void_return_wrapper<void>::__call<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&...) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/__functional_base:348:9 (clickhouse+0x11ab538b)
        #16 std::__1::__function::__alloc_func<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'(), std::__1::allocator<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1540:16 (clickhouse+0x11ab538b)
        #17 std::__1::__function::__func<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'(), std::__1::allocator<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1714:12 (clickhouse+0x11ab538b)
        #18 std::__1::__function::__value_func<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1867:16 (clickhouse+0x8346263)
        #19 std::__1::function<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2473:12 (clickhouse+0x8346263)
        #20 ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:236:17 (clickhouse+0x8346263)
        #21 void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()::operator()() const /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:117:73 (clickhouse+0x8349ea8)
        #22 decltype(std::__1::forward<void>(fp)(std::__1::forward<void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(fp0)...)) std::__1::__invoke<void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(void&&, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()&&...) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3519:1 (clickhouse+0x8349ea8)
        ClickHouse#23 void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(std::__1::tuple<void, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>&, std::__1::__tuple_indices<>) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/thread:273:5 (clickhouse+0x8349ea8)
        ClickHouse#24 void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()> >(void*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/thread:284:5 (clickhouse+0x8349ea8)

      Previous read of size 1 at 0x7b50008c25c2 by thread T91:
        #0 DB::RemoteSource::generate() /build/obj-x86_64-linux-gnu/../src/Processors/Sources/RemoteSource.cpp:35:9 (clickhouse+0x11c1fb9e)
        #1 DB::ISource::work() /build/obj-x86_64-linux-gnu/../src/Processors/ISource.cpp:48:31 (clickhouse+0x11a6c852)
        #2 DB::SourceWithProgress::work() /build/obj-x86_64-linux-gnu/../src/Processors/Sources/SourceWithProgress.cpp:36:30 (clickhouse+0x11c26d1a)
        #3 DB::executeJob(DB::IProcessor*) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:78:20 (clickhouse+0x11ab4836)
        #4 DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0::operator()() const /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:95:13 (clickhouse+0x11ab4836)
        #5 decltype(std::__1::forward<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0&>(fp)()) std::__1::__invoke<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0&>(DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3519:1 (clickhouse+0x11ab4836)
        #6 void std::__1::__invoke_void_return_wrapper<void>::__call<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0&>(DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/__functional_base:348:9 (clickhouse+0x11ab4836)
        #7 std::__1::__function::__alloc_func<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0, std::__1::allocator<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1540:16 (clickhouse+0x11ab4836)
        #8 std::__1::__function::__func<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0, std::__1::allocator<DB::PipelineExecutor::addJob(DB::ExecutingGraph::Node*)::$_0>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1714:12 (clickhouse+0x11ab4836)
        #9 std::__1::__function::__value_func<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1867:16 (clickhouse+0x11ab2801)
        #10 std::__1::function<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2473:12 (clickhouse+0x11ab2801)
        #11 DB::PipelineExecutor::executeStepImpl(unsigned long, unsigned long, std::__1::atomic<bool>*) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:561:17 (clickhouse+0x11ab2801)
        #12 DB::PipelineExecutor::executeSingleThread(unsigned long, unsigned long) /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:477:5 (clickhouse+0x11ab538b)
        #13 DB::PipelineExecutor::executeImpl(unsigned long)::$_4::operator()() const /build/obj-x86_64-linux-gnu/../src/Processors/Executors/PipelineExecutor.cpp:736:21 (clickhouse+0x11ab538b)
        #14 decltype(std::__1::forward<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&>(fp)()) std::__1::__invoke_constexpr<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3525:1 (clickhouse+0x11ab538b)
        #15 decltype(auto) std::__1::__apply_tuple_impl<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&, std::__1::__tuple_indices<>) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/tuple:1415:1 (clickhouse+0x11ab538b)
        #16 decltype(auto) std::__1::apply<DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&, std::__1::tuple<>&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/tuple:1424:1 (clickhouse+0x11ab538b)
        #17 ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()::operator()() /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.h:178:13 (clickhouse+0x11ab538b)
        #18 decltype(std::__1::forward<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(fp)()) std::__1::__invoke<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3519:1 (clickhouse+0x11ab538b)
        #19 void std::__1::__invoke_void_return_wrapper<void>::__call<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()&>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&...) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/__functional_base:348:9 (clickhouse+0x11ab538b)
        #20 std::__1::__function::__alloc_func<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'(), std::__1::allocator<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1540:16 (clickhouse+0x11ab538b)
        #21 std::__1::__function::__func<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'(), std::__1::allocator<ThreadFromGlobalPool::ThreadFromGlobalPool<DB::PipelineExecutor::executeImpl(unsigned long)::$_4>(DB::PipelineExecutor::executeImpl(unsigned long)::$_4&&)::'lambda'()>, void ()>::operator()() /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1714:12 (clickhouse+0x11ab538b)
        #22 std::__1::__function::__value_func<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:1867:16 (clickhouse+0x8346263)
        ClickHouse#23 std::__1::function<void ()>::operator()() const /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/functional:2473:12 (clickhouse+0x8346263)
        ClickHouse#24 ThreadPoolImpl<std::__1::thread>::worker(std::__1::__list_iterator<std::__1::thread, void*>) /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:236:17 (clickhouse+0x8346263)
        ClickHouse#25 void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()::operator()() const /build/obj-x86_64-linux-gnu/../src/Common/ThreadPool.cpp:117:73 (clickhouse+0x8349ea8)
        ClickHouse#26 decltype(std::__1::forward<void>(fp)(std::__1::forward<void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(fp0)...)) std::__1::__invoke<void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(void&&, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()&&...) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/type_traits:3519:1 (clickhouse+0x8349ea8)
        ClickHouse#27 void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>(std::__1::tuple<void, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()>&, std::__1::__tuple_indices<>) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/thread:273:5 (clickhouse+0x8349ea8)
        ClickHouse#28 void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void ThreadPoolImpl<std::__1::thread>::scheduleImpl<void>(std::__1::function<void ()>, int, std::__1::optional<unsigned long>)::'lambda1'()> >(void*) /build/obj-x86_64-linux-gnu/../contrib/libcxx/include/thread:284:5 (clickhouse+0x8349ea8)
  • Loading branch information
azat committed Nov 14, 2020
1 parent a350f3f commit aa073aa
Showing 1 changed file with 2 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/Processors/Sources/RemoteSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Processors/Pipe.h>
#include <atomic>

namespace DB
{
Expand Down Expand Up @@ -37,7 +38,7 @@ class RemoteSource : public SourceWithProgress
void onCancel() override;

private:
bool was_query_canceled = false;
std::atomic<bool> was_query_canceled = false;
bool was_query_sent = false;
bool add_aggregation_info = false;
RemoteQueryExecutorPtr query_executor;
Expand Down

0 comments on commit aa073aa

Please sign in to comment.