-
Notifications
You must be signed in to change notification settings - Fork 913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BP-49: Support read ahead async #3111
base: master
Are you sure you want to change the base?
Conversation
Hi @merlimat @sijie @eolivelli @dlg99 Could you help review this PR ? |
// Async mode | ||
entry = readAheadManager.readEntryOrWait(ledgerId, entryId); | ||
} else { | ||
// Sync mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you encapsulate the original sync mode implementation into one meaningful method?
|
||
dbLedgerStorageStats.getReadCacheMissCounter().inc(); | ||
dbLedgerStorageStats.getReadCacheMissCounter().inc(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant spaces prefixing the line, please correct the style, same for following lines.
private static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false; | ||
private static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8; | ||
|
||
private static final class LedgerEntryPosition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it more suitable if split the class LedgerEntryPosition
to separated file, since it seems like one common definition, it maybe could be used in other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class only used in the ReadAheadManager, it's better to keep it here.
@wuzhanpeng left some comments |
private static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false; | ||
private static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8; | ||
|
||
private static final class LedgerEntryPosition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class only used in the ReadAheadManager, it's better to keep it here.
} | ||
|
||
@Override | ||
public boolean equals(Object o) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use @DaTa to cover equals
and hashCode
} | ||
|
||
@Override | ||
public boolean equals(Object o) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use @DaTa to cover equals
and hashCode
} catch (InterruptedException e) { | ||
log.warn("Failed to read entries={} ahead from L{} E{} due to timeout={}ms", | ||
readAheadEntries, ledgerId, startEntryId, readAheadTimeoutMs); | ||
readCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When condition await throws InterruptedException, and runs into readCompleted, it will cause deadlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This place will not cause deadlock when meeting an interruption. The lock associated with this Condition is atomically released when calling await
. See https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html
} catch (InterruptedException e) { | ||
log.warn("Failed to read entries={} ahead from L{} E{} due to timeout={}ms", | ||
readAheadEntries, ledgerId, startEntryId, readAheadTimeoutMs); | ||
readCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When condition await throws InterruptedException, and runs into readCompleted, it will cause deadlock
* @return | ||
* @throws IOException | ||
*/ | ||
public ByteBuf readEntryOrWait(long ledgerId, long entryId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can simplify the function name
public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize"; | ||
|
||
private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000; | ||
private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do to decide the max default read ahead bytes ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DEFAULT_READ_AHEAD_BYTES
and DEFAULT_READ_AHEAD_MESSAGES
represent the maximum number of bytes and messages that can be read in a single read-ahead task respectively. If any of the conditions are met, the current read-ahead task will end.
public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize"; | ||
|
||
private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000; | ||
private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do to decide the max default read ahead bytes ?
public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately"; | ||
public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize"; | ||
|
||
private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better decrease the default max read ahead entries number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our practice, it is more cost-effective to read more consecutive entries when the entry size is small. In addition, because the termination condition of the read-ahead task depends on the maximum number of bytes and the maximum number of entries, under the default limit of reading up to 256KB, generally speaking, the number of entries in read-ahead task will not be too many.
public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately"; | ||
public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize"; | ||
|
||
private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better decrease the default max read ahead entries number.
* A structure that records the transitions of the task status. | ||
*/ | ||
public static class ReadAheadTaskStatus { | ||
private long ledgerId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It better to set it as final
I have explained and updated the code according to the comments, you can take a look again~ @hangc0276 @ArvinDevel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments, I would suggest you to split this PR to several dedicated PR
if (enableReadAheadAsync) { | ||
return readEntryUnderAsyncReadAhead(ledgerId, entryId); | ||
} else { | ||
return readEntryUnderSyncReadAhead(ledgerId, entryId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuzhanpeng could you divide this PR into several little PRs?, so that it's more easy to review.
1)first PR focusing on the reorganize of current sync mode read ahead : such as move fillReadAheadCache
and related method to the ReadAheadManager
this PR doesn't introduce the new feature, just prepare for the new feature, and this PR should has no impact to the system;
2)following PR focusing on the functionality, 1 or more PR needed, for example core functionality, monitoring could be split into different PR.
* @return | ||
* @throws IOException | ||
*/ | ||
private ByteBuf readEntryUnderAsyncReadAhead(long ledgerId, long entryId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the method name is a little redundant. just use readEntryAsync
?
* @return | ||
* @throws IOException | ||
*/ | ||
private ByteBuf readEntryUnderSyncReadAhead(long ledgerId, long entryId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use readEntrySync
or similar name?
1ba76bf
to
6cd125a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's nice
but this branch has conflicts that must be resolved, @wuzhanpeng please fix it
fix old workflow,please see #3455 for detail |
resolve conflicts
resolve conflicts
6cd125a
to
b25b0f8
Compare
The conflicts have been resolved. Please check again~ |
break; | ||
} | ||
} else { | ||
entry = entryLogger.readEntry(ledgerId, entryId, location); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- In accordance with the previous logic, we'd better use
entry = entryLogger.internalReadEntry(ledgerId, entryId, location, false);
don't do entry validation here.
Master issue: #3085