-
Notifications
You must be signed in to change notification settings - Fork 110
swap, swap/chain, contracts/swap: add transaction queue #2124
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing work!!
My comments are mainly for explanation, but at points I also note that you have something defined, but not implemented. Perhaps, in these cases, it is better to leave the definition out and add this when you actually need it (especially since this PR will be used as a reference for developers who are going to implement the queue with other blockchain interactions in Swarm).
I do think that a markdown file, perhaps with diagrams would help to understand the architecture of this PR and assist developers who are going to work with this code. Maybe @vojtechsimetka could help with this.
After you addressed my comments and clarified things for me, I would like to go over this PR once again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a quite long PR and most of the implementation is pretty great. 👏 My comments are related to lock handling which I think can be better designed.
// A lock must be held and kept until after the trigger function was called or the batch write failed | ||
func (pq *PersistentQueue) Queue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) { | ||
// the nonce guarantees keys don't collide if multiple transactions are queued in the same second | ||
pq.nonce++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A possible data race on nonce field on concurrent Queue calls. There is a comment about the lock, but it would be nicer to have an api where locking is implicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #2124 (comment)
lock.Lock() | ||
key, exists, err = pq.Peek(i) | ||
if exists { | ||
return key, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A deadlock if exists, as the lock is not unlocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #2124 (comment)
lock.Lock() | ||
key, exists, err := pq.Peek(i) | ||
if exists { | ||
return key, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A deadlock if exists, as the lock is not unlocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #2124 (comment)
swap/chain/persistentqueue.go
Outdated
// No lock should not be held when this is called. Only a single call to next may be active at any time | ||
// If the the key is not "", the value exists, the supplied lock was acquired and must be released by the caller after processing the item | ||
// The supplied lock should be the same that is used for the other functions | ||
func (pq *PersistentQueue) Next(ctx context.Context, i interface{}, lock *sync.Mutex) (key string, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lock handling is a bit strange. The function can return leaving the lock both unlocked or locked. I think that lock should be internal to the implementation, not exposed with the package API.
I think that it would be better to protect the queue with internal lock, then to relay on the queue user to do the locking. It is easy to unlock an already unlocked lock or to have a deadlock, by wrong usage.
Batch processing and writing require lock, but that could be encapsulated by different functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This locking design was the consequence of an earlier version of the code where the request queue and a notification queue were modified at the same time and I wanted to avoid having to hold three locks of three different objects at the same time as the risk of deadlock seemed high. Anyway that case does not exist anymore in this version, so perhaps a lock can be put back into pq
again. I will attempt a redesign next week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with @janos here.
we can have a separate PR for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to leave it as is for now. Usage of any of the pq functions without holding the main txqueue lock is always wrong. A lock managed by the pq internally is insufficient as in order to make sure that batches don't overlap or trigger signals are not missed locking is required beyond the scope of single functions. I also tried various approaches to put the locking as part of the batch itself but those only further complicated things.
I think for now we should consider persistentQueue
(which was unexported, this was never supposed to be used elsewhere anyway) as a helper structure that is exclusively used by the txqueue and therefore have it share its lock. I think we should merge this to finish work on this codebase and if necessary consider any further redesigns when migrating transaction sending to bee.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My opinion is that we should not share the lock this way as it creates a code that is hard to maintain. I would not like to approve it in this state.
If we are focusing on bee and will not add new features to the swarm repo, we do not need to merge this PR now. But to leave it for the bee project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. I'll leave this PR open for further experimentation. Then we can either still merge this at some point in the future or just continue the redesign on a PR on the bee repo (although I assume it will take a while until we get to tx sending there).
|
||
count := 200 | ||
|
||
var errout error // stores the last error that occurred in one of the routines |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errorout can race as two goroutins may set it at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -0,0 +1,7 @@ | |||
package chain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add copyright header to every new file in this package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
swap/chain/txqueue.go
Outdated
func (txq *TxQueue) waitForNextRequest() (requestMetadata *txRequestData, err error) { | ||
var id uint64 | ||
// get the id of the next request in the queue | ||
key, err := txq.requestQueue.Next(txq.ctx, &id, &txq.lock) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also relates to the lock usage. I have no concrete suggestion how to implement it differently, but tracking lock state across TxQueue and PersistentQueue may be quite hard to debug deadlocks or data races.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #2124 (comment)
@@ -126,6 +129,7 @@ func newTestSwap(t *testing.T, key *ecdsa.PrivateKey, backend *swapTestBackend) | |||
usedBackend = newTestBackend(t) | |||
} | |||
swap, dir := newBaseTestSwap(t, key, usedBackend) | |||
swap.txScheduler.Start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for retuned error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't return an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few questions from my first pass through the code:
- could you give an conceptual example of another
struct
that would implement theTxScheduler
interface, other thanTxQueue
? (i want to make sure i understand the difference between these 2 in terms of responsibilities) - why does
persistentQueue
have aprefix
, if all entries have their own separate keys? is it the idea to have multiplepersistentQueue
structs in the samestate.Store
? is this the case already? - i understand the situation of having a transaction with an unknown status, but why is there a func to actually notify this? would this take place in the future, when we allow transactions to expire, or is it already happening?
- regarding future PRs: can you please explain what the node's actions would be in terms of confirmation monitoring? would this be basically issue Wait for sufficient amount of transaction confirmations #1633?
i definitely will review this PR again (even if it is merged before i manage to do so) as i would like to have a more in-depth understanding of some of the code here.
looks good so far though 👍
swap/chain/persistentqueue.go
Outdated
// It returns the generated key and a trigger function which must be called once the batch was successfully written | ||
// This only returns an error if the encoding fails which is an unrecoverable error | ||
// A lock must be held and kept until after the trigger function was called or the batch write failed | ||
func (pq *PersistentQueue) Queue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a developer, not sure this comment
// call trigger function after writing to the batch to prevent undefined behaviour
is really clear about what to do here.
but it would at least be a sign that i would have to be careful when using these functions
} | ||
|
||
// ToSignedTx returns a signed types.Transaction for the given request and nonce | ||
func (request *TxRequest) ToSignedTx(nonce uint64, opts *bind.TransactOpts) (*types.Transaction, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am for ToSignedTx
since it operates on the receiver request
An alternative would be a scheduler which tracks nonce count locally and allows for parallel requests instead of queueing. Another one might be a simple mock for testing the rest of the code without running the entire queue mechanism.
Yes, there are already multiple in the same store now. The request queue plus one notification queue per handler. Also this is the same state store as for swap in production and we want to avoid key collisions.
This can already happen now if transactions don't confirm in time or
Not fully sure yet about that. There would at least be a notification once the confirmation number has been reached. |
This PR introduces a central component, the
TxScheduler
, in charge of sending transactions, waiting for their result afterwards and ensuring the result was successfully processed. In the future this component is supposed to take care of more chain-related tasks.The
TxScheduler
is an interface currently only implemented by theTxQueue
which executes transactions in sequence (see #2006, the next one is only sent after the previous confirmed) in order to avoid most nonce-related issues (#1929 ).The general idea behind the
TxScheduler
is as follows:chain.TxRequest
and schedules it with theTxScheduler
which returns an assigned id and takes care of the rest.handlerID
which specifies which handler to notify of events for this request. A component should register itself as the handler for thehandlerID
s it uses on startup.TxScheduler
will execute the requests at some point and notify the appropriate handler. If the handler function failed, the notification does not count as delivered and will be tried again in the future. This guarantee is also preserved across restarts of the swarm client. The idea behind that is that other places in the code which need to send transaction no longer need to be concerned with issues like io errors, network problems, client restarts, etc.. They just queue the request and are guaranteed to be notified of its result at some point.For the
TxQueue
transactions are processed the following way:PersistentQueue
was introduced as a helper structure.On the cashing out side the
CashoutProcessor
now accepts aCashoutResultHandler
which it will notify of the cashout result. This is usuallySwap
. During tests this handler is overridden to keep track of cashed out cheques. This mechanism replaces thecashDone
channel on the backend and therefore obsoletes the globalcashCheque
function variable and thesetupContractTest
function.In this PR only the cashout transactions for the chequebook transaction go through this mechanism. This was done to keep the PR small. Smaller future PRs should
This PR is quite large so it might be useful to look at commits individually. The PR has been split into 3 commits. The first one is the
PersistentQueue
, the second one implements the actual queue and the third one integrates it with the cashout transactions.closes #2006
closes #2005
closes #1929
closes #1634