Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sub-transaction positioning #2035

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open

sub-transaction positioning #2035

wants to merge 5 commits into from

Conversation

osheroff
Copy link
Collaborator

allows maxwell to set the last-good binlog position somewhere inside of an event; this allows us to make progress in the face of periodic crashes or OOM errors from giant txs, stuff like that.

osheroff added 5 commits July 2, 2023 00:31
When we end up crashing for reasons like we got throttled by the
producer, this will allow us to continue to make progress.
@@ -69,8 +79,14 @@ public int hashCode() {
}
Copy link

@jackjoesh jackjoesh Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug. We should ensure binlogPosition + txOffset is unique for HashMap.
Shall we fix like this: return Objects.hash(binlogPosition,this.txOffset);

All this mechanism allows maxwell to stop and restart in the middle of very large
transactions.
*/
Position nextPosition = Position.valueOf(lastTableMapPosition, lastHeartbeatRead, 0L);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we fix like this, prevent it from some nullpoint exception
Position nextPosition = null;
if(lastTableMapPosition != null){
nextPosition = Position.valueOf(lastTableMapPosition, lastHeartbeatRead, 0L);
}else{
nextPosition = Position.valueOf(this.nextPosition, lastHeartbeatRead, 0L);
}

for ( RowMap r : rows )
if (shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter)) {
buffer.add(r);
for ( RowMap r : rows ) {
Copy link

@jackjoesh jackjoesh Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should prevent this logic from impacting rows having different positions, not applying any tx_offset logic:

                    if(rows != null){
                        for ( int i = 0; i < rows.size(); i++) {
                            RowMap r = rows.get(i);
                            if(i > 0 && rows.get(i-1).getPosition().equals(r.getPosition())){
                                //only batch insert uses tx offset
                                r.setXoffset(++txOffset);
                            }else{
                                txOffset = 0l;
                                r.setXoffset(0l);
                            }
                            if ( shouldOutputRowMap(table.getDatabase(), table.getName(), r, filter) ) {
                                if ( startTXOffset <= r.getXoffset() ) {
                                    buffer.add(r);
                                }
                            }
                        }
                    }

@jackjoesh
Copy link

Hi @osheroff , I have tried this pr to fix this problem: #2119.
It seems ok and I leaves some comments on this pr. What do you think of these comments? Do you have any suggestions?
Because this PR has not been merged into the main trunk, I should be the first to use it. I am looking forward to seeing your opinions and suggestions. Thank you.

@osheroff
Copy link
Collaborator Author

Hi @jackjoesh

So I started this work at the last company I worked for and yeah, never quite completed it. I'm happy to work with you to try to push it across the finish line but I think we need some really really good integration tests -- something where an integration test can terminate maxwell in the middle of a transaction and verify that it will properly restart.

@jackjoesh
Copy link

Hi @osheroff
Thank you very much for your reply and attention.
We have applied this pr and fixed some bugs, and we will release to online environment next week. After that, I can issue a new modified pr for this.
And now I want to discuss some cognition with you:
1 I think this pr is only suitable for batch sql like : INSERT INTO blob_test(id, blob_value, order_id) VALUES (80, NULL, '796788'),(81, NULL, '796788'),(82, NULL, '796788'),(83, NULL, '796788');
Since every inserts in this batch sql all has the same position, it can use new tx_offset field to distinguish every sub inserts.
But it's not suitable for transaction sql due to every child sql in a transaction all have different position, tx_offset is useless.
2 Then I think it is a bit strange to use the position of the table map event as the next position of each dml event, but it is necessary.
A) For batch sql, the table map event is the first event, so that when the task is interrupted and restarted, tx_offset can always be incremented from the head for each child dml sql, and finally determine which tx_offset to continue synchronizing data from.
B) For other sql, such as transactional sql, or ordinary sql, each dml event all has its own table map event. Although it is a bit strange to use the table map event as the next position of each dml event, when it is interrupted and restarted, it only parses one more table map event, and does not cause data loss or performance loss. From the perspective of logical unification, the unified introduction of the position of the table map event for these ordinary sql can also reduce the complexity of code maintenance

What are your opinions or suggestions on these? These structural cognitions can help me better understand the underlying logic of your modification at that time.
Thank you

@osheroff
Copy link
Collaborator Author

yeah this is why we need good integration tests, to study the actual behavior in these scenarios. The correct behavior should be like:

  • leave the server position at the top of the transaction (since we can't ask the server to start inside a transaction).
  • use txOffset to skip already-processed data inside the transaction.

@jackjoesh
Copy link

@osheroff Yes, our QA have tested terminating and restarting task inside batch dml(insert/update/delete)sql, transaction dml sql, common dml sql.
We find that batch dml sql can recover from the tx_offset correctly, and transaction dml sql and common dml sql can still recover from different position correctly due to their's different dml event's positions are different.
And we use Iptable tool to block the kafka's callback to achieve terminating in a transaction randomly.
Do you think we should add other test cases more?

@osheroff
Copy link
Collaborator Author

uh, no, i mean integration tests within maxwell itself.

@jackjoesh
Copy link

@osheroff ok, I can try some integration tests later. And I have an another question:
Your original intention is to solve the problem of not repeating the processing in a large transaction and start it quickly.
But in fact, this pr can also solve the problem of batch insertion. Each sub-sql has the same position, and tx_offset is used to distinguish each sub-sql to avoid missing the synchronization message of each sub-sql.
Is it right? thanks

@osheroff
Copy link
Collaborator Author

Is it right? thanks

yeah, you can think about a insert batch and a transaction exactly the same -- just a group of rows, and with this PR we can have different positions within the group. There's still the possibility that we repeat the rows, but if maxwell goes down in the middle of a huge batch this PR can help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants