diff --git a/okdownload/src/main/java/com/liulishuo/okdownload/core/file/MultiPointOutputStream.java b/okdownload/src/main/java/com/liulishuo/okdownload/core/file/MultiPointOutputStream.java index e87fb931..527805a9 100644 --- a/okdownload/src/main/java/com/liulishuo/okdownload/core/file/MultiPointOutputStream.java +++ b/okdownload/src/main/java/com/liulishuo/okdownload/core/file/MultiPointOutputStream.java @@ -148,16 +148,13 @@ public void cancel() { noMoreStreamList.add(streamMap.keyAt(i)); } - if (syncFuture != null && !syncFuture.isDone() && runSyncThread != null) { + if (syncFuture != null && !syncFuture.isDone()) { inspectValidPath(); OkDownload.with().processFileStrategy().getFileLock().increaseLock(path); - - unparkThread(runSyncThread); - try { - syncFuture.get(); - } catch (InterruptedException ignored) { - } catch (ExecutionException ignored) { + ensureSync(true, -1); + } finally { + OkDownload.with().processFileStrategy().getFileLock().decreaseLock(path); } } } finally { @@ -181,19 +178,34 @@ public void cancel() { } } + final StreamsState doneState = new StreamsState(); + public void done(int blockIndex) throws IOException { noMoreStreamList.add(blockIndex); try { if (syncException != null) throw syncException; - if (syncFuture != null && !syncFuture.isDone() && runSyncThread != null) { + if (syncFuture != null && !syncFuture.isDone()) { final AtomicLong noSyncLength = noSyncLengthMap.get(blockIndex); if (noSyncLength != null && noSyncLength.get() > 0) { + inspectStreamState(doneState); + final boolean isNoMoreStream = doneState.isNoMoreStream; + // ensure this block is synced. - parkedRunBlockThreadMap.put(blockIndex, Thread.currentThread()); - unparkThread(runSyncThread); - parkThread(); + ensureSync(isNoMoreStream, blockIndex); + + } + } else { + if (syncFuture == null) { + Util.d(TAG, "OutputStream done but no need to ensure sync, because the " + + "sync job not run yet. task[" + task.getId() + + "] block[" + blockIndex + "]"); + } else { + Util.d(TAG, "OutputStream done but no need to ensure sync, because the " + + "syncFuture.isDone[" + syncFuture.isDone() + "] task[" + task.getId() + + "] block[" + blockIndex + "]"); + } } @@ -202,6 +214,45 @@ public void done(int blockIndex) throws IOException { } } + void ensureSync(boolean isNoMoreStream, int blockIndex) { + // sync job not run yet. + if (syncFuture == null || syncFuture.isDone()) return; + + if (!isNoMoreStream) { + parkedRunBlockThreadMap.put(blockIndex, Thread.currentThread()); + } + + if (runSyncThread != null) { + unparkThread(runSyncThread); + } else { + // wait for runSyncThread is valid. + while (true) { + if (isRunSyncThreadValid()) { + unparkThread(runSyncThread); + break; + } else { + parkThread(25); + } + } + } + + if (isNoMoreStream) { + unparkThread(runSyncThread); + try { + syncFuture.get(); + } catch (InterruptedException ignored) { + } catch (ExecutionException ignored) { + } + } else { + parkThread(); + } + } + + // convenient for test + boolean isRunSyncThreadValid() { + return runSyncThread != null; + } + public void inspectComplete(int blockIndex) throws IOException { final BlockInfo blockInfo = info.getBlock(blockIndex); if (!Util.isCorrectFull(blockInfo.getCurrentOffset(), blockInfo.getContentLength())) { @@ -343,8 +394,6 @@ void runSync() throws IOException { flushProcess(); nextParkMills = syncBufferIntervalMills; } - - runSyncThread = null; } // convenient for test. @@ -364,56 +413,51 @@ long now() { } void flushProcess() throws IOException { - try { - boolean success; - final int size; - synchronized (noSyncLengthMap) { - // make sure the length of noSyncLengthMap is equal to outputStreamMap - size = noSyncLengthMap.size(); - } + boolean success; + final int size; + synchronized (noSyncLengthMap) { + // make sure the length of noSyncLengthMap is equal to outputStreamMap + size = noSyncLengthMap.size(); + } - final SparseArray increaseLengthMap = new SparseArray<>(size); + final SparseArray increaseLengthMap = new SparseArray<>(size); - try { - for (int i = 0; i < size; i++) { - final int blockIndex = outputStreamMap.keyAt(i); - // because we get no sync length value before flush and sync, - // so the length only possible less than or equal to the real persist - // length. - final long noSyncLength = noSyncLengthMap.get(blockIndex).get(); - if (noSyncLength > 0) { - increaseLengthMap.put(blockIndex, noSyncLength); - final DownloadOutputStream outputStream = outputStreamMap - .get(blockIndex); - outputStream.flushAndSync(); - } + try { + for (int i = 0; i < size; i++) { + final int blockIndex = outputStreamMap.keyAt(i); + // because we get no sync length value before flush and sync, + // so the length only possible less than or equal to the real persist + // length. + final long noSyncLength = noSyncLengthMap.get(blockIndex).get(); + if (noSyncLength > 0) { + increaseLengthMap.put(blockIndex, noSyncLength); + final DownloadOutputStream outputStream = outputStreamMap + .get(blockIndex); + outputStream.flushAndSync(); } - success = true; - } catch (IOException ex) { - Util.w(TAG, "OutputStream flush and sync data to filesystem failed " + ex); - success = false; } + success = true; + } catch (IOException ex) { + Util.w(TAG, "OutputStream flush and sync data to filesystem failed " + ex); + success = false; + } - if (success) { - final int increaseLengthSize = increaseLengthMap.size(); - long allIncreaseLength = 0; - for (int i = 0; i < increaseLengthSize; i++) { - final int blockIndex = increaseLengthMap.keyAt(i); - final long noSyncLength = increaseLengthMap.valueAt(i); - store.onSyncToFilesystemSuccess(info, blockIndex, noSyncLength); - allIncreaseLength += noSyncLength; - noSyncLengthMap.get(blockIndex).addAndGet(-noSyncLength); - Util.d(TAG, "OutputStream sync success (" + task.getId() + ") " - + "block(" + blockIndex + ") " + " syncLength(" + noSyncLength + ")" - + " currentOffset(" + info.getBlock(blockIndex).getCurrentOffset() - + ")"); - } - allNoSyncLength.addAndGet(-allIncreaseLength); - lastSyncTimestamp.set(SystemClock.uptimeMillis()); + if (success) { + final int increaseLengthSize = increaseLengthMap.size(); + long allIncreaseLength = 0; + for (int i = 0; i < increaseLengthSize; i++) { + final int blockIndex = increaseLengthMap.keyAt(i); + final long noSyncLength = increaseLengthMap.valueAt(i); + store.onSyncToFilesystemSuccess(info, blockIndex, noSyncLength); + allIncreaseLength += noSyncLength; + noSyncLengthMap.get(blockIndex).addAndGet(-noSyncLength); + Util.d(TAG, "OutputStream sync success (" + task.getId() + ") " + + "block(" + blockIndex + ") " + " syncLength(" + noSyncLength + ")" + + " currentOffset(" + info.getBlock(blockIndex).getCurrentOffset() + + ")"); } - } finally { - inspectValidPath(); - OkDownload.with().processFileStrategy().getFileLock().decreaseLock(path); + allNoSyncLength.addAndGet(-allIncreaseLength); + lastSyncTimestamp.set(SystemClock.uptimeMillis()); } } diff --git a/okdownload/src/test/java/com/liulishuo/okdownload/core/file/MultiPointOutputStreamTest.java b/okdownload/src/test/java/com/liulishuo/okdownload/core/file/MultiPointOutputStreamTest.java index 32a59d6a..ec1ec15f 100644 --- a/okdownload/src/test/java/com/liulishuo/okdownload/core/file/MultiPointOutputStreamTest.java +++ b/okdownload/src/test/java/com/liulishuo/okdownload/core/file/MultiPointOutputStreamTest.java @@ -24,6 +24,7 @@ import com.liulishuo.okdownload.core.breakpoint.BlockInfo; import com.liulishuo.okdownload.core.breakpoint.BreakpointInfo; import com.liulishuo.okdownload.core.breakpoint.DownloadStore; +import com.liulishuo.okdownload.core.cause.EndCause; import com.liulishuo.okdownload.core.exception.PreAllocateException; import org.junit.After; @@ -77,7 +78,6 @@ public class MultiPointOutputStreamTest { @Mock private DownloadOutputStream stream0; @Mock private DownloadOutputStream stream1; - @Mock private DownloadOutputStream stream2; @Mock private Future syncFuture; @Mock private Thread runSyncThread; @@ -131,28 +131,13 @@ public void write() throws IOException { } @Test - public void cancel_noSyncLengthIsZero() - throws IOException, ExecutionException, InterruptedException { - multiPointOutputStream.outputStreamMap.put(0, stream0); - multiPointOutputStream.outputStreamMap.put(1, stream1); - multiPointOutputStream.allNoSyncLength.set(0); - doNothing().when(multiPointOutputStream).close(anyInt()); - - multiPointOutputStream.cancel(); - - verify(multiPointOutputStream, never()).unparkThread(any(Thread.class)); - verify(syncFuture, never()).get(); - verify(multiPointOutputStream).close(eq(0)); - verify(multiPointOutputStream).close(eq(1)); - } - - @Test - public void cancel_syncNotRun() throws IOException, ExecutionException, InterruptedException { + public void cancel_syncNotRun() throws IOException { multiPointOutputStream.outputStreamMap.put(0, stream0); multiPointOutputStream.outputStreamMap.put(1, stream1); multiPointOutputStream.allNoSyncLength.set(1); doNothing().when(multiPointOutputStream).close(anyInt()); - multiPointOutputStream.runSyncThread = null; + doNothing().when(multiPointOutputStream).ensureSync(true, -1); + multiPointOutputStream.syncFuture = null; final ProcessFileStrategy strategy = OkDownload.with().processFileStrategy(); final FileLock fileLock = mock(FileLock.class); @@ -161,15 +146,16 @@ public void cancel_syncNotRun() throws IOException, ExecutionException, Interrup multiPointOutputStream.cancel(); assertThat(multiPointOutputStream.noMoreStreamList).containsExactly(0, 1); - verify(multiPointOutputStream, never()).unparkThread(any(Thread.class)); - verify(syncFuture, never()).get(); + verify(multiPointOutputStream, never()).ensureSync(eq(true), eq(-1)); verify(multiPointOutputStream).close(eq(0)); verify(multiPointOutputStream).close(eq(1)); verify(fileLock, never()).increaseLock(eq(existFile.getAbsolutePath())); + verify(fileLock, never()).decreaseLock(eq(existFile.getAbsolutePath())); + verify(store).onTaskEnd(eq(task.getId()), eq(EndCause.CANCELED), nullable(Exception.class)); } @Test - public void cancel() throws IOException, ExecutionException, InterruptedException { + public void cancel() throws IOException { multiPointOutputStream.outputStreamMap.put(0, stream0); multiPointOutputStream.outputStreamMap.put(1, stream1); multiPointOutputStream.noSyncLengthMap.put(0, new AtomicLong()); @@ -180,33 +166,108 @@ public void cancel() throws IOException, ExecutionException, InterruptedExceptio final FileLock fileLock = mock(FileLock.class); when(strategy.getFileLock()).thenReturn(fileLock); - doNothing().when(multiPointOutputStream).unparkThread(nullable(Thread.class)); doNothing().when(multiPointOutputStream).close(anyInt()); + doNothing().when(multiPointOutputStream).ensureSync(true, -1); multiPointOutputStream.cancel(); assertThat(multiPointOutputStream.noMoreStreamList).containsExactly(0, 1); - verify(multiPointOutputStream).unparkThread(any(Thread.class)); - verify(syncFuture).get(); + verify(multiPointOutputStream).ensureSync(eq(true), eq(-1)); verify(multiPointOutputStream).close(eq(0)); verify(multiPointOutputStream).close(eq(1)); verify(fileLock).increaseLock(eq(existFile.getAbsolutePath())); + verify(fileLock).decreaseLock(eq(existFile.getAbsolutePath())); + verify(store).onTaskEnd(eq(task.getId()), eq(EndCause.CANCELED), nullable(Exception.class)); + } + + @Test + public void ensureSync_syncJobNotRunYet() { + multiPointOutputStream.syncFuture = null; + multiPointOutputStream.ensureSync(true, -1); + verify(multiPointOutputStream, never()).unparkThread(any(Thread.class)); + verify(multiPointOutputStream, never()).parkThread(); + assertThat(multiPointOutputStream.parkedRunBlockThreadMap.size()).isZero(); + + multiPointOutputStream.syncFuture = syncFuture; + when(syncFuture.isDone()).thenReturn(true); + multiPointOutputStream.ensureSync(true, -1); + verify(multiPointOutputStream, never()).unparkThread(any(Thread.class)); + verify(multiPointOutputStream, never()).parkThread(); + assertThat(multiPointOutputStream.parkedRunBlockThreadMap.size()).isZero(); } @Test - public void done() throws IOException { + public void ensureSync_noMoreStream() throws ExecutionException, InterruptedException { + doNothing().when(multiPointOutputStream).unparkThread(nullable(Thread.class)); + doNothing().when(multiPointOutputStream).parkThread(); + doNothing().when(multiPointOutputStream).parkThread(25); + + multiPointOutputStream.ensureSync(true, -1); + + verify(multiPointOutputStream, times(2)).unparkThread(eq(runSyncThread)); + verify(syncFuture).get(); + verify(multiPointOutputStream, never()).parkThread(); + assertThat(multiPointOutputStream.parkedRunBlockThreadMap.size()).isZero(); + } + + @Test + public void ensureSync_notNoMoreStream() { doNothing().when(multiPointOutputStream).unparkThread(nullable(Thread.class)); - doNothing().when(multiPointOutputStream).close(1); doNothing().when(multiPointOutputStream).parkThread(); + doNothing().when(multiPointOutputStream).parkThread(25); + + multiPointOutputStream.ensureSync(false, 1); + + verify(multiPointOutputStream).unparkThread(eq(runSyncThread)); + verify(multiPointOutputStream).parkThread(); + assertThat(multiPointOutputStream.parkedRunBlockThreadMap.size()).isOne(); + assertThat(multiPointOutputStream.parkedRunBlockThreadMap.get(1)) + .isEqualTo(Thread.currentThread()); + } + + @Test + public void ensureSync_loop() { + doNothing().when(multiPointOutputStream).unparkThread(nullable(Thread.class)); + doNothing().when(multiPointOutputStream).parkThread(); + doNothing().when(multiPointOutputStream).parkThread(25); + multiPointOutputStream.runSyncThread = null; + when(multiPointOutputStream.isRunSyncThreadValid()).thenReturn(false, true); + + multiPointOutputStream.ensureSync(false, 1); + + verify(multiPointOutputStream).parkThread(eq(25L)); + verify(multiPointOutputStream).unparkThread(nullable(Thread.class)); + } + + @Test + public void done_noMoreStream() throws IOException { + doNothing().when(multiPointOutputStream).close(1); + doNothing().when(multiPointOutputStream).ensureSync(true, 1); + doNothing().when(multiPointOutputStream) + .inspectStreamState(multiPointOutputStream.doneState); multiPointOutputStream.noSyncLengthMap.put(1, new AtomicLong(10)); + multiPointOutputStream.doneState.isNoMoreStream = true; multiPointOutputStream.done(1); assertThat(multiPointOutputStream.noMoreStreamList).containsExactly(1); - assertThat(multiPointOutputStream.parkedRunBlockThreadMap.get(1)) - .isEqualTo(Thread.currentThread()); - verify(multiPointOutputStream).unparkThread(eq(runSyncThread)); - verify(multiPointOutputStream).parkThread(); + verify(multiPointOutputStream).ensureSync(eq(true), eq(1)); + verify(multiPointOutputStream).close(eq(1)); + } + + @Test + public void done_notNoMoreStream() throws IOException { + doNothing().when(multiPointOutputStream).close(1); + doNothing().when(multiPointOutputStream).ensureSync(false, 1); + doNothing().when(multiPointOutputStream) + .inspectStreamState(multiPointOutputStream.doneState); + multiPointOutputStream.noSyncLengthMap.put(1, new AtomicLong(10)); + multiPointOutputStream.doneState.isNoMoreStream = false; + + multiPointOutputStream.done(1); + + assertThat(multiPointOutputStream.noMoreStreamList).containsExactly(1); + verify(multiPointOutputStream).ensureSync(eq(false), eq(1)); verify(multiPointOutputStream).close(eq(1)); } @@ -218,21 +279,15 @@ public void done_syncException() throws IOException { @Test public void done_syncNotRun() throws IOException { - doNothing().when(multiPointOutputStream).unparkThread(nullable(Thread.class)); doNothing().when(multiPointOutputStream).close(1); - doNothing().when(multiPointOutputStream).parkThread(); multiPointOutputStream.noSyncLengthMap.put(1, new AtomicLong(10)); - multiPointOutputStream.runSyncThread = null; + multiPointOutputStream.syncFuture = null; multiPointOutputStream.done(1); assertThat(multiPointOutputStream.noMoreStreamList).containsExactly(1); - assertThat(multiPointOutputStream.parkedRunBlockThreadMap.get(1)) - .isNull(); - verify(multiPointOutputStream, never()).unparkThread(eq(runSyncThread)); - verify(multiPointOutputStream, never()).parkThread(); + verify(multiPointOutputStream, never()).ensureSync(eq(false), eq(1)); verify(multiPointOutputStream).close(eq(1)); - } @Test @@ -291,20 +346,15 @@ public void flushProcess() throws IOException { final DownloadOutputStream outputStream = mock(DownloadOutputStream.class); doReturn(outputStream).when(multiPointOutputStream).outputStream(1); when(info.getBlock(1)).thenReturn(mock(BlockInfo.class)); - final Thread thread = mock(Thread.class); multiPointOutputStream.allNoSyncLength.addAndGet(10); multiPointOutputStream.noSyncLengthMap.put(1, new AtomicLong(10)); multiPointOutputStream.outputStreamMap.put(1, mock(DownloadOutputStream.class)); - final ProcessFileStrategy fileStrategy = OkDownload.with().processFileStrategy(); - final FileLock fileLock = mock(FileLock.class); - when(fileStrategy.getFileLock()).thenReturn(fileLock); multiPointOutputStream.flushProcess(); verify(store).onSyncToFilesystemSuccess(info, 1, 10); - verify(fileLock).decreaseLock(eq(existFile.getAbsolutePath())); assertThat(multiPointOutputStream.allNoSyncLength.get()).isZero(); assertThat(multiPointOutputStream.noSyncLengthMap.get(1).get()).isZero(); }