aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/test
diff options
context:
space:
mode:
authorGravatar ulfjack <ulfjack@google.com>2018-06-15 03:15:03 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-06-15 03:16:15 -0700
commit816f3ef7c8811cd1abd9ff807dc8ff1d9f6e10ac (patch)
treedff3ce3d36308110c2fcad3911f32d6aee07c389 /src/test
parent8e09941b41c48b94b398af1700c4f98bb2ea8596 (diff)
Reimplement AsynchronousFileOutputStream to use a writer thread
Background: the original code is implementing the OutputStream interface. Given a sequence of write calls, the code puts each of these writes into a queue. On the other side of the queue is an unbounded thread pool, which takes the writes off the queue one by one, and then does individual blocking writes with a fixed file offset. There are three problems with the original code: 1. Writes are sent to the Kernel one-by-one. Imagine if the incoming writes a single-byte writes, then we do one kernel call for every single byte. This is the worst case. 2. Due to multithreading, the writes can get reordered. In the worst case, the order is reversed, and the kernel flushes each write to disk individually. Since the writes are not aligned to disk block boundaries, each write has to first *read* the block from disk, overwrite a few bytes, and then flush the block back to disk. On a spinning platter, this is the worst possible sequence of operations: write a block, read the same block back, write the same block again, read the same block back, etc., with each operation having to wait for a full disk rotation. Note that this gets worse if there's high thread and / or disk contention, e.g., when running a build system in the background. 3. Due to the unbounded thread pool, it may end up creating a new thread for every single write (possibly as many as one per byte written). This is also the worst case, although it's probably negligible compared to 1+2. Compared to that, this change uses an in-memory buffer before sending writes to the kernel so non-block sized writes are batched, it writes sequentially, and it uses a single thread created at the start. A single thread should be more than able to fully saturate local disk I/O, so multi-threading should ~never be a improvement here. This might look different if we had perfectly aligned writes of an integer multiple of the disk block size to a distributed network file system or a local SSD raid. If you look at the clients of this class, that's definitely not the case: this code is primarily used for local file BEP transports - we wouldn't use local file BEP transports to write to a network file system, we'd use the BES instead. It's also used to write a couple of log files, also all local - otherwise we'd add the data to BEP. These are all unaligned, ~random-sized writes. If we created a lot of these files, then using a thread pool with fewer threads than files and using non-blocking I/O might be an improvement due to the reduction in thread count, but I think it's very unlikely that we'll ever need that complexity. PiperOrigin-RevId: 200694423
Diffstat (limited to 'src/test')
-rw-r--r--src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java202
1 files changed, 48 insertions, 154 deletions
diff --git a/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java b/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java
index 39c4d6bd76..b9e607727a 100644
--- a/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java
+++ b/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java
@@ -15,21 +15,16 @@ package com.google.devtools.build.lib.util.io;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.when;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.runtime.commands.proto.BazelFlagsProto.FlagInfo;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousFileChannel;
-import java.nio.channels.CompletionHandler;
+import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@@ -41,7 +36,6 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -49,10 +43,9 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class AsynchronousFileOutputStreamTest {
@Rule public TemporaryFolder tmp = new TemporaryFolder();
- @Mock AsynchronousFileChannel mockChannel;
- Random random = ThreadLocalRandom.current();
- static final char[] RAND_CHARS = "abcdefghijklmnopqrstuvwxzy0123456789-".toCharArray();
- static final int RAND_STRING_LENGTH = 10;
+ private final Random random = ThreadLocalRandom.current();
+ private static final char[] RAND_CHARS = "abcdefghijklmnopqrstuvwxzy0123456789-".toCharArray();
+ private static final int RAND_STRING_LENGTH = 10;
@Before
public void initMocks() {
@@ -167,26 +160,17 @@ public class AsynchronousFileOutputStreamTest {
@Test
public void testFailedClosePropagatesIOException() throws Exception {
- AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
- when(mockChannel.isOpen()).thenReturn(true);
- IOException ex = new IOException("foo");
- Mockito.doThrow(ex).when(mockChannel).close();
- Mockito.doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked")
- CompletionHandler<Integer, Void> handler =
- (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
- handler.completed(0, null); // We ignore the arguments.
- return null;
- })
- .when(mockChannel)
- .write(
- any(ByteBuffer.class),
- any(Integer.class),
- eq(null),
- Mockito.<CompletionHandler<Integer, Void>>anyObject());
+ OutputStream failingOutputStream = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ }
+ @Override
+ public void close() throws IOException {
+ throw new IOException("foo");
+ }
+ };
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream("", failingOutputStream);
out.write("bla");
-
try {
out.close();
fail("Expected an IOException");
@@ -197,86 +181,17 @@ public class AsynchronousFileOutputStreamTest {
@Test
public void testFailedClosePropagatesUncheckedException() throws Exception {
- AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
- when(mockChannel.isOpen()).thenReturn(true);
- RuntimeException ex = new RuntimeException("foo");
- Mockito.doThrow(ex).when(mockChannel).close();
- Mockito.doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked")
- CompletionHandler<Integer, Void> handler =
- (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
- handler.completed(0, null); // We ignore the arguments.
- return null;
- })
- .when(mockChannel)
- .write(
- any(ByteBuffer.class),
- any(Integer.class),
- eq(null),
- Mockito.<CompletionHandler<Integer, Void>>anyObject());
- out.write("bla");
-
- try {
- out.close();
- fail("Expected a RuntimeException");
- } catch (RuntimeException expected) {
- assertThat(expected).hasMessageThat().isEqualTo("foo");
- }
- }
-
- @Test
- public void testFailedForcePropagatesIOException() throws Exception {
- AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
- when(mockChannel.isOpen()).thenReturn(true);
- IOException ex = new IOException("foo");
- Mockito.doThrow(ex).when(mockChannel).force(eq(true));
- Mockito.doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked")
- CompletionHandler<Integer, Void> handler =
- (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
- handler.completed(0, null); // We ignore the arguments.
- return null;
- })
- .when(mockChannel)
- .write(
- any(ByteBuffer.class),
- any(Integer.class),
- eq(null),
- Mockito.<CompletionHandler<Integer, Void>>anyObject());
- out.write("bla");
-
- try {
- out.close();
- fail("Expected an IOException");
- } catch (IOException expected) {
- assertThat(expected).hasMessageThat().isEqualTo("foo");
- }
- }
-
- @Test
- public void testFailedForcePropagatesUncheckedException() throws Exception {
- AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
- when(mockChannel.isOpen()).thenReturn(true);
- RuntimeException ex = new RuntimeException("foo");
- Mockito.doThrow(ex).when(mockChannel).force(eq(true));
- Mockito.doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked")
- CompletionHandler<Integer, Void> handler =
- (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
- handler.completed(0, null); // We ignore the arguments.
- return null;
- })
- .when(mockChannel)
- .write(
- any(ByteBuffer.class),
- any(Integer.class),
- eq(null),
- Mockito.<CompletionHandler<Integer, Void>>anyObject());
+ OutputStream failingOutputStream = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ }
+ @Override
+ public void close() throws IOException {
+ throw new RuntimeException("foo");
+ }
+ };
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream("", failingOutputStream);
out.write("bla");
-
try {
out.close();
fail("Expected a RuntimeException");
@@ -287,26 +202,18 @@ public class AsynchronousFileOutputStreamTest {
@Test
public void testFailedWritePropagatesIOException() throws Exception {
- AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
- when(mockChannel.isOpen()).thenReturn(true);
- IOException ex = new IOException("foo");
- Mockito.doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked")
- CompletionHandler<Integer, Void> handler =
- (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
- handler.failed(ex, null);
- return null;
- })
- .when(mockChannel)
- .write(
- any(ByteBuffer.class),
- any(Integer.class),
- eq(null),
- Mockito.<CompletionHandler<Integer, Void>>anyObject());
+ OutputStream failingOutputStream = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ throw new IOException("foo");
+ }
+ @Override
+ public void close() throws IOException {
+ }
+ };
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream("", failingOutputStream);
out.write("bla");
out.write("blo");
-
try {
out.close();
fail("Expected an IOException");
@@ -317,26 +224,18 @@ public class AsynchronousFileOutputStreamTest {
@Test
public void testFailedWritePropagatesUncheckedException() throws Exception {
- AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
- when(mockChannel.isOpen()).thenReturn(true);
- RuntimeException ex = new RuntimeException("foo");
- Mockito.doAnswer(
- invocationOnMock -> {
- @SuppressWarnings("unchecked")
- CompletionHandler<Integer, Void> handler =
- (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
- handler.failed(ex, null);
- return null;
- })
- .when(mockChannel)
- .write(
- any(ByteBuffer.class),
- any(Integer.class),
- eq(null),
- Mockito.<CompletionHandler<Integer, Void>>anyObject());
+ OutputStream failingOutputStream = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ throw new RuntimeException("foo");
+ }
+ @Override
+ public void close() throws IOException {
+ }
+ };
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream("", failingOutputStream);
out.write("bla");
out.write("blo");
-
try {
out.close();
fail("Expected a RuntimeException");
@@ -347,15 +246,10 @@ public class AsynchronousFileOutputStreamTest {
@Test
public void testWriteAfterCloseThrowsException() throws Exception {
- Path logPath = tmp.newFile().toPath();
- AsynchronousFileChannel ch = AsynchronousFileChannel.open(
- logPath,
- StandardOpenOption.WRITE,
- StandardOpenOption.CREATE,
- StandardOpenOption.TRUNCATE_EXISTING);
- AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(ch);
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(
+ "", new ByteArrayOutputStream());
out.write("bla");
- ch.close();
+ out.close();
try {
out.write("blo");