aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/BUILD1
-rw-r--r--src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java236
-rw-r--r--src/test/java/com/google/devtools/build/lib/BUILD2
-rw-r--r--src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java367
4 files changed, 606 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD
index 814716d5b8..5dcca6bb65 100644
--- a/src/main/java/com/google/devtools/build/lib/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/BUILD
@@ -118,6 +118,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//third_party:guava",
+ "//third_party/protobuf:protobuf_java",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java b/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java
new file mode 100644
index 0000000000..70284d08c9
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java
@@ -0,0 +1,236 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.util.io;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.concurrent.ThreadSafety;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An output stream supporting anynchronous writes, backed by a file.
+ *
+ * <p>We use an {@link AsynchronousFileChannel} to perform non-blocking writes to a file. It gets
+ * tricky when it comes to {@link #closeAsync()}, as we may only complete the returned future when
+ * all writes have completed (succeeded or failed). Thus, we use a field {@link #outstandingWrites}
+ * to keep track of the number of writes that have not completed yet. It's incremented before a new
+ * write and decremented after a write has completed. When it's {@code 0} it's safe to complete the
+ * close future.
+ */
+@ThreadSafety.ThreadSafe
+public class AsynchronousFileOutputStream extends OutputStream {
+ private final AsynchronousFileChannel ch;
+ private final WriteCompletionHandler completionHandler = new WriteCompletionHandler();
+ // The offset in the file to begin the next write at.
+ private long writeOffset;
+ // Number of writes that haven't completed yet.
+ private long outstandingWrites;
+ // The future returned by closeAsync().
+ private SettableFuture<Void> closeFuture;
+ // To store any exception raised from the writes.
+ private final AtomicReference<Throwable> exception = new AtomicReference<>();
+
+ public AsynchronousFileOutputStream(String filename) throws IOException {
+ this(
+ AsynchronousFileChannel.open(
+ Paths.get(filename),
+ StandardOpenOption.WRITE,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING));
+ }
+
+ @VisibleForTesting
+ public AsynchronousFileOutputStream(AsynchronousFileChannel ch) throws IOException {
+ this.ch = ch;
+ }
+
+ public void write(String message) {
+ write(message.getBytes(UTF_8));
+ }
+
+ /**
+ * Writes a delimited protocol buffer message in the same format as {@link
+ * MessageLite#writeDelimitedTo(java.io.OutputStream)}.
+ *
+ * <p>Unfortunately, {@link MessageLite#writeDelimitedTo(java.io.OutputStream)} may result in
+ * multiple calls to write on the underlying stream, so we have to provide this method here
+ * instead of the caller using it directly.
+ */
+ public void write(Message m) {
+ Preconditions.checkNotNull(m);
+ final int size = m.getSerializedSize();
+ ByteArrayOutputStream bos =
+ new ByteArrayOutputStream(CodedOutputStream.computeRawVarint32Size(size) + size);
+ try {
+ m.writeDelimitedTo(bos);
+ } catch (IOException e) {
+ // This should never happen with an in-memory stream.
+ exception.compareAndSet(null, new IllegalStateException(e.toString()));
+ return;
+ }
+ write(bos.toByteArray());
+ }
+
+ @Override
+ public void write(int b) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Writes the byte buffer into the file asynchronously.
+ *
+ * <p>The writes are guaranteed to land in the output file in the same order that they were
+ * called; However, some writes may fail, leaving the file partially corrupted. In case a write
+ * fails, an exception will be propagated in close, but remaining writes will be allowed to
+ * continue.
+ */
+ @Override
+ public synchronized void write(byte[] data) {
+ Preconditions.checkNotNull(data);
+ Preconditions.checkState(ch.isOpen());
+
+ if (closeFuture != null) {
+ throw new IllegalStateException("Attempting to write to stream after close");
+ }
+
+ outstandingWrites++;
+ ch.write(ByteBuffer.wrap(data), writeOffset, null, completionHandler);
+ writeOffset += data.length;
+ }
+
+ /* Returns whether the stream is open for writing. */
+ public boolean isOpen() {
+ return ch.isOpen();
+ }
+
+ /**
+ * Closes the stream without waiting until pending writes are committed, and supressing errors.
+ *
+ * <p>Pending writes will still continue asynchronously, but any errors will be ignored.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public void closeNow() {
+ closeAsync();
+ }
+
+ /**
+ * Closes the stream and blocks until all pending writes are completed.
+ *
+ * Throws an exception if any of the writes or the close itself have failed.
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ closeAsync().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Write interrupted");
+ } catch (ExecutionException e) {
+ Throwable c = e.getCause();
+ Throwables.throwIfUnchecked(c);
+ if (c instanceof IOException) {
+ throw (IOException) c;
+ }
+ throw new IOException("Exception within stream close: " + c);
+ }
+ }
+
+ /**
+ * Flushes the currently ongoing writes into the channel.
+ *
+ * Throws an exception if any of the writes or the close itself have failed.
+ */
+ @Override
+ public void flush() throws IOException {
+ ch.force(true);
+ }
+
+ /**
+ * Closes the channel, if close was invoked and there are no outstanding writes. Should only be
+ * called in a synchronized context.
+ */
+ private void closeIfNeeded() {
+ if (closeFuture == null || outstandingWrites > 0) {
+ return;
+ }
+ try {
+ flush();
+ ch.close();
+ } catch (Exception e) {
+ exception.compareAndSet(null, e);
+ } finally {
+ Throwable e = exception.get();
+ if (e == null) {
+ closeFuture.set(null);
+ } else {
+ closeFuture.setException(e);
+ }
+ }
+ }
+
+ /**
+ * Returns a future that will close the stream when all pending writes are completed.
+ *
+ * Any failed writes will propagate an exception.
+ */
+ public synchronized ListenableFuture<Void> closeAsync() {
+ if (closeFuture != null) {
+ return closeFuture;
+ }
+ closeFuture = SettableFuture.create();
+ closeIfNeeded();
+ return closeFuture;
+ }
+
+ /**
+ * Handler that's notified when a write completes.
+ */
+ private final class WriteCompletionHandler implements CompletionHandler<Integer, Void> {
+
+ @Override
+ public void completed(Integer result, Void attachment) {
+ countWritesAndTryClose();
+ }
+
+ @Override
+ public void failed(Throwable e, Void attachment) {
+ exception.compareAndSet(null, e);
+ countWritesAndTryClose();
+ }
+
+ private void countWritesAndTryClose() {
+ synchronized (AsynchronousFileOutputStream.this) {
+ Preconditions.checkState(outstandingWrites > 0);
+ outstandingWrites--;
+ closeIfNeeded();
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index c7d819a0ee..99d4335954 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -314,6 +314,8 @@ java_test(
":testutil",
"//src/main/java/com/google/devtools/build/lib:io",
"//src/main/java/com/google/devtools/build/lib:util",
+ "//src/main/protobuf:bazel_flags_java_proto",
+ "//third_party:mockito",
],
)
diff --git a/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java b/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java
new file mode 100644
index 0000000000..39c4d6bd76
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java
@@ -0,0 +1,367 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.util.io;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
+import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.runtime.commands.proto.BazelFlagsProto.FlagInfo;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests {@link AsynchronousFileOutputStream}. */
+@RunWith(JUnit4.class)
+public class AsynchronousFileOutputStreamTest {
+ @Rule public TemporaryFolder tmp = new TemporaryFolder();
+ @Mock AsynchronousFileChannel mockChannel;
+ Random random = ThreadLocalRandom.current();
+ static final char[] RAND_CHARS = "abcdefghijklmnopqrstuvwxzy0123456789-".toCharArray();
+ static final int RAND_STRING_LENGTH = 10;
+
+ @Before
+ public void initMocks() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @After
+ public void validateMocks() {
+ Mockito.validateMockitoUsage();
+ }
+
+ private FlagInfo generateRandomMessage() {
+ FlagInfo.Builder b = FlagInfo.newBuilder();
+ b.setName(generateRandomString() + "a"); // Name is required, cannot be empty.
+ b.setHasNegativeFlag(random.nextBoolean());
+ b.setDocumentation(generateRandomString());
+ int commandsSize = random.nextInt(5);
+ for (int i = 0; i < commandsSize; ++i) {
+ b.addCommands(generateRandomString());
+ }
+ return b.build();
+ }
+
+ private String generateRandomString() {
+ int len = random.nextInt(RAND_STRING_LENGTH + 1);
+ char[] data = new char[len];
+ for (int i = 0; i < len; ++i) {
+ data[i] = RAND_CHARS[random.nextInt(RAND_CHARS.length)];
+ }
+ return new String(data);
+ }
+
+ @Test
+ public void testConcurrentWrites() throws Exception {
+ Path logPath = tmp.newFile().toPath();
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(logPath.toString());
+ Thread[] writers = new Thread[10];
+ final CountDownLatch start = new CountDownLatch(writers.length);
+ for (int i = 0; i < writers.length; ++i) {
+ String name = "Thread # " + i;
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ start.countDown();
+ start.await();
+ } catch (InterruptedException e) {
+ return;
+ }
+ for (int j = 0; j < 10; ++j) {
+ out.write(name + " time # " + j + "\n");
+ }
+ }
+ };
+ writers[i] = thread;
+ thread.start();
+ }
+ for (int i = 0; i < writers.length; ++i) {
+ writers[i].join();
+ }
+ out.close();
+ String contents =
+ new String(ByteStreams.toByteArray(Files.newInputStream(logPath)), StandardCharsets.UTF_8);
+ for (int i = 0; i < writers.length; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ assertThat(contents).contains("Thread # " + i + " time # " + j + "\n");
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentProtoWrites() throws Exception {
+ Path logPath = tmp.newFile().toPath();
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(logPath.toString());
+ ArrayList<FlagInfo> messages = new ArrayList<>();
+ for (int i = 0; i < 100; ++i) {
+ messages.add(generateRandomMessage());
+ }
+ Thread[] writers = new Thread[messages.size() / 10];
+ final CountDownLatch start = new CountDownLatch(writers.length);
+ for (int i = 0; i < writers.length; ++i) {
+ int startIndex = i * 10;
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ start.countDown();
+ start.await();
+ } catch (InterruptedException e) {
+ return;
+ }
+ for (int j = startIndex; j < startIndex + 10; ++j) {
+ out.write(messages.get(j));
+ }
+ }
+ };
+ writers[i] = thread;
+ thread.start();
+ }
+ for (int i = 0; i < writers.length; ++i) {
+ writers[i].join();
+ }
+ out.close();
+ ArrayList<FlagInfo> readMessages = new ArrayList<>();
+ try (InputStream in = Files.newInputStream(logPath)) {
+ for (int i = 0; i < messages.size(); ++i) {
+ readMessages.add(FlagInfo.parseDelimitedFrom(in));
+ }
+ }
+ assertThat(readMessages).containsExactlyElementsIn(messages);
+ }
+
+ @Test
+ public void testFailedClosePropagatesIOException() throws Exception {
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+ when(mockChannel.isOpen()).thenReturn(true);
+ IOException ex = new IOException("foo");
+ Mockito.doThrow(ex).when(mockChannel).close();
+ Mockito.doAnswer(
+ invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ CompletionHandler<Integer, Void> handler =
+ (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+ handler.completed(0, null); // We ignore the arguments.
+ return null;
+ })
+ .when(mockChannel)
+ .write(
+ any(ByteBuffer.class),
+ any(Integer.class),
+ eq(null),
+ Mockito.<CompletionHandler<Integer, Void>>anyObject());
+ out.write("bla");
+
+ try {
+ out.close();
+ fail("Expected an IOException");
+ } catch (IOException expected) {
+ assertThat(expected).hasMessageThat().isEqualTo("foo");
+ }
+ }
+
+ @Test
+ public void testFailedClosePropagatesUncheckedException() throws Exception {
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+ when(mockChannel.isOpen()).thenReturn(true);
+ RuntimeException ex = new RuntimeException("foo");
+ Mockito.doThrow(ex).when(mockChannel).close();
+ Mockito.doAnswer(
+ invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ CompletionHandler<Integer, Void> handler =
+ (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+ handler.completed(0, null); // We ignore the arguments.
+ return null;
+ })
+ .when(mockChannel)
+ .write(
+ any(ByteBuffer.class),
+ any(Integer.class),
+ eq(null),
+ Mockito.<CompletionHandler<Integer, Void>>anyObject());
+ out.write("bla");
+
+ try {
+ out.close();
+ fail("Expected a RuntimeException");
+ } catch (RuntimeException expected) {
+ assertThat(expected).hasMessageThat().isEqualTo("foo");
+ }
+ }
+
+ @Test
+ public void testFailedForcePropagatesIOException() throws Exception {
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+ when(mockChannel.isOpen()).thenReturn(true);
+ IOException ex = new IOException("foo");
+ Mockito.doThrow(ex).when(mockChannel).force(eq(true));
+ Mockito.doAnswer(
+ invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ CompletionHandler<Integer, Void> handler =
+ (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+ handler.completed(0, null); // We ignore the arguments.
+ return null;
+ })
+ .when(mockChannel)
+ .write(
+ any(ByteBuffer.class),
+ any(Integer.class),
+ eq(null),
+ Mockito.<CompletionHandler<Integer, Void>>anyObject());
+ out.write("bla");
+
+ try {
+ out.close();
+ fail("Expected an IOException");
+ } catch (IOException expected) {
+ assertThat(expected).hasMessageThat().isEqualTo("foo");
+ }
+ }
+
+ @Test
+ public void testFailedForcePropagatesUncheckedException() throws Exception {
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+ when(mockChannel.isOpen()).thenReturn(true);
+ RuntimeException ex = new RuntimeException("foo");
+ Mockito.doThrow(ex).when(mockChannel).force(eq(true));
+ Mockito.doAnswer(
+ invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ CompletionHandler<Integer, Void> handler =
+ (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+ handler.completed(0, null); // We ignore the arguments.
+ return null;
+ })
+ .when(mockChannel)
+ .write(
+ any(ByteBuffer.class),
+ any(Integer.class),
+ eq(null),
+ Mockito.<CompletionHandler<Integer, Void>>anyObject());
+ out.write("bla");
+
+ try {
+ out.close();
+ fail("Expected a RuntimeException");
+ } catch (RuntimeException expected) {
+ assertThat(expected).hasMessageThat().isEqualTo("foo");
+ }
+ }
+
+ @Test
+ public void testFailedWritePropagatesIOException() throws Exception {
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+ when(mockChannel.isOpen()).thenReturn(true);
+ IOException ex = new IOException("foo");
+ Mockito.doAnswer(
+ invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ CompletionHandler<Integer, Void> handler =
+ (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+ handler.failed(ex, null);
+ return null;
+ })
+ .when(mockChannel)
+ .write(
+ any(ByteBuffer.class),
+ any(Integer.class),
+ eq(null),
+ Mockito.<CompletionHandler<Integer, Void>>anyObject());
+ out.write("bla");
+ out.write("blo");
+
+ try {
+ out.close();
+ fail("Expected an IOException");
+ } catch (IOException expected) {
+ assertThat(expected).hasMessageThat().isEqualTo("foo");
+ }
+ }
+
+ @Test
+ public void testFailedWritePropagatesUncheckedException() throws Exception {
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel);
+ when(mockChannel.isOpen()).thenReturn(true);
+ RuntimeException ex = new RuntimeException("foo");
+ Mockito.doAnswer(
+ invocationOnMock -> {
+ @SuppressWarnings("unchecked")
+ CompletionHandler<Integer, Void> handler =
+ (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3];
+ handler.failed(ex, null);
+ return null;
+ })
+ .when(mockChannel)
+ .write(
+ any(ByteBuffer.class),
+ any(Integer.class),
+ eq(null),
+ Mockito.<CompletionHandler<Integer, Void>>anyObject());
+ out.write("bla");
+ out.write("blo");
+
+ try {
+ out.close();
+ fail("Expected a RuntimeException");
+ } catch (RuntimeException expected) {
+ assertThat(expected).hasMessageThat().isEqualTo("foo");
+ }
+ }
+
+ @Test
+ public void testWriteAfterCloseThrowsException() throws Exception {
+ Path logPath = tmp.newFile().toPath();
+ AsynchronousFileChannel ch = AsynchronousFileChannel.open(
+ logPath,
+ StandardOpenOption.WRITE,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING);
+ AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(ch);
+ out.write("bla");
+ ch.close();
+
+ try {
+ out.write("blo");
+ fail("Expected an IllegalStateException");
+ } catch (IllegalStateException expected) {
+ // Expected.
+ }
+ }
+}