diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 184b2c60ba..3e12cb119d 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -435,6 +435,12 @@ type sweepInputMessage struct { input input.Input params Params resultChan chan Result + + // processed is a signal chan that will be closed once the input has + // been added to the sweeper's internal map. This is used to ensure + // when `SweepInput` finishes, the input is guaranteed to be in the + // sweeper's internal map via `handleNewInput`. + processed chan struct{} } // New returns a new Sweeper instance. @@ -538,6 +544,7 @@ func (s *UtxoSweeper) SweepInput(inp input.Input, input: inp, params: params, resultChan: make(chan Result, 1), + processed: make(chan struct{}), } // Deliver input to the main event loop. @@ -547,6 +554,17 @@ func (s *UtxoSweeper) SweepInput(inp input.Input, return nil, ErrSweeperShuttingDown } + // Wait for the input to be processed, which means the input has been + // added to the sweeper's internal map once `handleNewInput` finishes. + select { + case <-sweeperInput.processed: + log.Tracef("Sweep request for out_point=%v processed", + inp.OutPoint()) + + case <-s.quit: + return nil, ErrSweeperShuttingDown + } + return sweeperInput.resultChan, nil } @@ -1216,6 +1234,8 @@ func (s *UtxoSweeper) calculateDefaultDeadline(pi *SweeperInput) int32 { // handleNewInput processes a new input by registering spend notification and // scheduling sweeping for it. func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error { + defer close(input.processed) + outpoint := input.input.OutPoint() pi, pending := s.inputs[outpoint] if pending {