diff options
author | 2018-03-06 15:42:50 -0800 | |
---|---|---|
committer | 2018-03-06 15:44:43 -0800 | |
commit | a854d6c0d3d222bbd4ff2a532d48ddd91718908c (patch) | |
tree | b08cd852566686c3df0d26aac9c76a723f05662a /src/main/java | |
parent | f00d624ba6b39de6aa0ba5a9626526f239ab413d (diff) |
Adding async proto or text logging utility class.
WANT_LGTM=buchgr
TESTED=unit tests, 500 runs per test
RELNOTES: None
PiperOrigin-RevId: 188093043
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/BUILD | 1 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java | 236 |
2 files changed, 237 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD index 814716d5b8..5dcca6bb65 100644 --- a/src/main/java/com/google/devtools/build/lib/BUILD +++ b/src/main/java/com/google/devtools/build/lib/BUILD @@ -118,6 +118,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/vfs", "//third_party:guava", + "//third_party/protobuf:protobuf_java", ], ) diff --git a/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java b/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java new file mode 100644 index 0000000000..70284d08c9 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java @@ -0,0 +1,236 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.util.io; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.devtools.build.lib.concurrent.ThreadSafety; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.Message; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An output stream supporting anynchronous writes, backed by a file. + * + * <p>We use an {@link AsynchronousFileChannel} to perform non-blocking writes to a file. It gets + * tricky when it comes to {@link #closeAsync()}, as we may only complete the returned future when + * all writes have completed (succeeded or failed). Thus, we use a field {@link #outstandingWrites} + * to keep track of the number of writes that have not completed yet. It's incremented before a new + * write and decremented after a write has completed. When it's {@code 0} it's safe to complete the + * close future. + */ +@ThreadSafety.ThreadSafe +public class AsynchronousFileOutputStream extends OutputStream { + private final AsynchronousFileChannel ch; + private final WriteCompletionHandler completionHandler = new WriteCompletionHandler(); + // The offset in the file to begin the next write at. + private long writeOffset; + // Number of writes that haven't completed yet. + private long outstandingWrites; + // The future returned by closeAsync(). + private SettableFuture<Void> closeFuture; + // To store any exception raised from the writes. + private final AtomicReference<Throwable> exception = new AtomicReference<>(); + + public AsynchronousFileOutputStream(String filename) throws IOException { + this( + AsynchronousFileChannel.open( + Paths.get(filename), + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING)); + } + + @VisibleForTesting + public AsynchronousFileOutputStream(AsynchronousFileChannel ch) throws IOException { + this.ch = ch; + } + + public void write(String message) { + write(message.getBytes(UTF_8)); + } + + /** + * Writes a delimited protocol buffer message in the same format as {@link + * MessageLite#writeDelimitedTo(java.io.OutputStream)}. + * + * <p>Unfortunately, {@link MessageLite#writeDelimitedTo(java.io.OutputStream)} may result in + * multiple calls to write on the underlying stream, so we have to provide this method here + * instead of the caller using it directly. + */ + public void write(Message m) { + Preconditions.checkNotNull(m); + final int size = m.getSerializedSize(); + ByteArrayOutputStream bos = + new ByteArrayOutputStream(CodedOutputStream.computeRawVarint32Size(size) + size); + try { + m.writeDelimitedTo(bos); + } catch (IOException e) { + // This should never happen with an in-memory stream. + exception.compareAndSet(null, new IllegalStateException(e.toString())); + return; + } + write(bos.toByteArray()); + } + + @Override + public void write(int b) { + throw new UnsupportedOperationException(); + } + + /** + * Writes the byte buffer into the file asynchronously. + * + * <p>The writes are guaranteed to land in the output file in the same order that they were + * called; However, some writes may fail, leaving the file partially corrupted. In case a write + * fails, an exception will be propagated in close, but remaining writes will be allowed to + * continue. + */ + @Override + public synchronized void write(byte[] data) { + Preconditions.checkNotNull(data); + Preconditions.checkState(ch.isOpen()); + + if (closeFuture != null) { + throw new IllegalStateException("Attempting to write to stream after close"); + } + + outstandingWrites++; + ch.write(ByteBuffer.wrap(data), writeOffset, null, completionHandler); + writeOffset += data.length; + } + + /* Returns whether the stream is open for writing. */ + public boolean isOpen() { + return ch.isOpen(); + } + + /** + * Closes the stream without waiting until pending writes are committed, and supressing errors. + * + * <p>Pending writes will still continue asynchronously, but any errors will be ignored. + */ + @SuppressWarnings("FutureReturnValueIgnored") + public void closeNow() { + closeAsync(); + } + + /** + * Closes the stream and blocks until all pending writes are completed. + * + * Throws an exception if any of the writes or the close itself have failed. + */ + @Override + public void close() throws IOException { + try { + closeAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Write interrupted"); + } catch (ExecutionException e) { + Throwable c = e.getCause(); + Throwables.throwIfUnchecked(c); + if (c instanceof IOException) { + throw (IOException) c; + } + throw new IOException("Exception within stream close: " + c); + } + } + + /** + * Flushes the currently ongoing writes into the channel. + * + * Throws an exception if any of the writes or the close itself have failed. + */ + @Override + public void flush() throws IOException { + ch.force(true); + } + + /** + * Closes the channel, if close was invoked and there are no outstanding writes. Should only be + * called in a synchronized context. + */ + private void closeIfNeeded() { + if (closeFuture == null || outstandingWrites > 0) { + return; + } + try { + flush(); + ch.close(); + } catch (Exception e) { + exception.compareAndSet(null, e); + } finally { + Throwable e = exception.get(); + if (e == null) { + closeFuture.set(null); + } else { + closeFuture.setException(e); + } + } + } + + /** + * Returns a future that will close the stream when all pending writes are completed. + * + * Any failed writes will propagate an exception. + */ + public synchronized ListenableFuture<Void> closeAsync() { + if (closeFuture != null) { + return closeFuture; + } + closeFuture = SettableFuture.create(); + closeIfNeeded(); + return closeFuture; + } + + /** + * Handler that's notified when a write completes. + */ + private final class WriteCompletionHandler implements CompletionHandler<Integer, Void> { + + @Override + public void completed(Integer result, Void attachment) { + countWritesAndTryClose(); + } + + @Override + public void failed(Throwable e, Void attachment) { + exception.compareAndSet(null, e); + countWritesAndTryClose(); + } + + private void countWritesAndTryClose() { + synchronized (AsynchronousFileOutputStream.this) { + Preconditions.checkState(outstandingWrites > 0); + outstandingWrites--; + closeIfNeeded(); + } + } + } +} |