aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main
diff options
context:
space:
mode:
authorGravatar olaola <olaola@google.com>2018-03-06 15:42:50 -0800
committerGravatar Copybara-Service <copybara-piper@google.com>2018-03-06 15:44:43 -0800
commita854d6c0d3d222bbd4ff2a532d48ddd91718908c (patch)
treeb08cd852566686c3df0d26aac9c76a723f05662a /src/main
parentf00d624ba6b39de6aa0ba5a9626526f239ab413d (diff)
Adding async proto or text logging utility class.
WANT_LGTM=buchgr TESTED=unit tests, 500 runs per test RELNOTES: None PiperOrigin-RevId: 188093043
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/google/devtools/build/lib/BUILD1
-rw-r--r--src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java236
2 files changed, 237 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD
index 814716d5b8..5dcca6bb65 100644
--- a/src/main/java/com/google/devtools/build/lib/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/BUILD
@@ -118,6 +118,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//third_party:guava",
+ "//third_party/protobuf:protobuf_java",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java b/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java
new file mode 100644
index 0000000000..70284d08c9
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java
@@ -0,0 +1,236 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.util.io;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.concurrent.ThreadSafety;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.Message;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An output stream supporting anynchronous writes, backed by a file.
+ *
+ * <p>We use an {@link AsynchronousFileChannel} to perform non-blocking writes to a file. It gets
+ * tricky when it comes to {@link #closeAsync()}, as we may only complete the returned future when
+ * all writes have completed (succeeded or failed). Thus, we use a field {@link #outstandingWrites}
+ * to keep track of the number of writes that have not completed yet. It's incremented before a new
+ * write and decremented after a write has completed. When it's {@code 0} it's safe to complete the
+ * close future.
+ */
+@ThreadSafety.ThreadSafe
+public class AsynchronousFileOutputStream extends OutputStream {
+ private final AsynchronousFileChannel ch;
+ private final WriteCompletionHandler completionHandler = new WriteCompletionHandler();
+ // The offset in the file to begin the next write at.
+ private long writeOffset;
+ // Number of writes that haven't completed yet.
+ private long outstandingWrites;
+ // The future returned by closeAsync().
+ private SettableFuture<Void> closeFuture;
+ // To store any exception raised from the writes.
+ private final AtomicReference<Throwable> exception = new AtomicReference<>();
+
+ public AsynchronousFileOutputStream(String filename) throws IOException {
+ this(
+ AsynchronousFileChannel.open(
+ Paths.get(filename),
+ StandardOpenOption.WRITE,
+ StandardOpenOption.CREATE,
+ StandardOpenOption.TRUNCATE_EXISTING));
+ }
+
+ @VisibleForTesting
+ public AsynchronousFileOutputStream(AsynchronousFileChannel ch) throws IOException {
+ this.ch = ch;
+ }
+
+ public void write(String message) {
+ write(message.getBytes(UTF_8));
+ }
+
+ /**
+ * Writes a delimited protocol buffer message in the same format as {@link
+ * MessageLite#writeDelimitedTo(java.io.OutputStream)}.
+ *
+ * <p>Unfortunately, {@link MessageLite#writeDelimitedTo(java.io.OutputStream)} may result in
+ * multiple calls to write on the underlying stream, so we have to provide this method here
+ * instead of the caller using it directly.
+ */
+ public void write(Message m) {
+ Preconditions.checkNotNull(m);
+ final int size = m.getSerializedSize();
+ ByteArrayOutputStream bos =
+ new ByteArrayOutputStream(CodedOutputStream.computeRawVarint32Size(size) + size);
+ try {
+ m.writeDelimitedTo(bos);
+ } catch (IOException e) {
+ // This should never happen with an in-memory stream.
+ exception.compareAndSet(null, new IllegalStateException(e.toString()));
+ return;
+ }
+ write(bos.toByteArray());
+ }
+
+ @Override
+ public void write(int b) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Writes the byte buffer into the file asynchronously.
+ *
+ * <p>The writes are guaranteed to land in the output file in the same order that they were
+ * called; However, some writes may fail, leaving the file partially corrupted. In case a write
+ * fails, an exception will be propagated in close, but remaining writes will be allowed to
+ * continue.
+ */
+ @Override
+ public synchronized void write(byte[] data) {
+ Preconditions.checkNotNull(data);
+ Preconditions.checkState(ch.isOpen());
+
+ if (closeFuture != null) {
+ throw new IllegalStateException("Attempting to write to stream after close");
+ }
+
+ outstandingWrites++;
+ ch.write(ByteBuffer.wrap(data), writeOffset, null, completionHandler);
+ writeOffset += data.length;
+ }
+
+ /* Returns whether the stream is open for writing. */
+ public boolean isOpen() {
+ return ch.isOpen();
+ }
+
+ /**
+ * Closes the stream without waiting until pending writes are committed, and supressing errors.
+ *
+ * <p>Pending writes will still continue asynchronously, but any errors will be ignored.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ public void closeNow() {
+ closeAsync();
+ }
+
+ /**
+ * Closes the stream and blocks until all pending writes are completed.
+ *
+ * Throws an exception if any of the writes or the close itself have failed.
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ closeAsync().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Write interrupted");
+ } catch (ExecutionException e) {
+ Throwable c = e.getCause();
+ Throwables.throwIfUnchecked(c);
+ if (c instanceof IOException) {
+ throw (IOException) c;
+ }
+ throw new IOException("Exception within stream close: " + c);
+ }
+ }
+
+ /**
+ * Flushes the currently ongoing writes into the channel.
+ *
+ * Throws an exception if any of the writes or the close itself have failed.
+ */
+ @Override
+ public void flush() throws IOException {
+ ch.force(true);
+ }
+
+ /**
+ * Closes the channel, if close was invoked and there are no outstanding writes. Should only be
+ * called in a synchronized context.
+ */
+ private void closeIfNeeded() {
+ if (closeFuture == null || outstandingWrites > 0) {
+ return;
+ }
+ try {
+ flush();
+ ch.close();
+ } catch (Exception e) {
+ exception.compareAndSet(null, e);
+ } finally {
+ Throwable e = exception.get();
+ if (e == null) {
+ closeFuture.set(null);
+ } else {
+ closeFuture.setException(e);
+ }
+ }
+ }
+
+ /**
+ * Returns a future that will close the stream when all pending writes are completed.
+ *
+ * Any failed writes will propagate an exception.
+ */
+ public synchronized ListenableFuture<Void> closeAsync() {
+ if (closeFuture != null) {
+ return closeFuture;
+ }
+ closeFuture = SettableFuture.create();
+ closeIfNeeded();
+ return closeFuture;
+ }
+
+ /**
+ * Handler that's notified when a write completes.
+ */
+ private final class WriteCompletionHandler implements CompletionHandler<Integer, Void> {
+
+ @Override
+ public void completed(Integer result, Void attachment) {
+ countWritesAndTryClose();
+ }
+
+ @Override
+ public void failed(Throwable e, Void attachment) {
+ exception.compareAndSet(null, e);
+ countWritesAndTryClose();
+ }
+
+ private void countWritesAndTryClose() {
+ synchronized (AsynchronousFileOutputStream.this) {
+ Preconditions.checkState(outstandingWrites > 0);
+ outstandingWrites--;
+ closeIfNeeded();
+ }
+ }
+ }
+}