diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java | 132 |
1 files changed, 28 insertions, 104 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java index 528e6d6d25..6a59e96c12 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java @@ -14,20 +14,14 @@ package com.google.devtools.build.lib.buildeventstream.transports; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.buildeventstream.BuildEventTransport; +import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream; +import com.google.protobuf.Message; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousFileChannel; -import java.nio.channels.CompletionHandler; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; @@ -39,121 +33,51 @@ import java.util.logging.Logger; */ abstract class FileTransport implements BuildEventTransport { - /** - * We use an {@link AsynchronousFileChannel} to perform non-blocking writes to a file. It get's - * tricky when it comes to {@link #close()}, as we may only complete the returned future when all - * writes have completed (succeeded or failed). Thus, we use a field {@link #outstandingWrites} to - * keep track of the number of writes that have not completed yet. It's simply incremented before - * a new write and decremented after a write has completed. When it's {@code 0} it's safe to - * complete the close future. - */ private static final Logger logger = Logger.getLogger(FileTransport.class.getName()); - @VisibleForTesting - final AsynchronousFileChannel ch; - private final WriteCompletionHandler completionHandler = new WriteCompletionHandler(); - // The offset in the file to begin the next write at. - private long writeOffset; - // Number of writes that haven't completed yet. - private long outstandingWrites; - // The future returned by close() - private SettableFuture<Void> closeFuture; + final AsynchronousFileOutputStream out; FileTransport(String path) { try { - ch = AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE); + out = new AsynchronousFileOutputStream(path); } catch (IOException e) { throw new RuntimeException(e); } } - synchronized void writeData(byte[] data) { - checkNotNull(data); - if (!ch.isOpen()) { - @SuppressWarnings({"unused", "nullness"}) - Future<?> possiblyIgnoredError = close(); - return; - } - if (closing()) { - return; - } - - outstandingWrites++; + // Silent wrappers to AsynchronousFileOutputStream methods. - ch.write(ByteBuffer.wrap(data), writeOffset, null, completionHandler); - - writeOffset += data.length; - } - - @Override - public synchronized ListenableFuture<Void> close() { - if (closing()) { - return closeFuture; - } - closeFuture = SettableFuture.create(); - - if (writesComplete()) { - doClose(); + protected void write(Message m) { + try { + out.write(m); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); } - - return closeFuture; } - private void doClose() { + protected void write(String s) { try { - ch.force(true); - ch.close(); - } catch (IOException e) { + out.write(s); + } catch (Exception e) { logger.log(Level.SEVERE, e.getMessage(), e); - } finally { - closeFuture.set(null); } } - @Override - @SuppressWarnings("FutureReturnValueIgnored") - public void closeNow() { - close(); - } - - private boolean closing() { - return closeFuture != null; - } - private boolean writesComplete() { - return outstandingWrites == 0; + @Override + public synchronized ListenableFuture<Void> close() { + return Futures.catching( + out.closeAsync(), + Throwable.class, + (t) -> { + logger.log(Level.SEVERE, t.getMessage(), t); + return null; + }, + MoreExecutors.directExecutor()); } - /** - * Handler that's notified when a write completes. - */ - private final class WriteCompletionHandler implements CompletionHandler<Integer, Void> { - - @Override - public void completed(Integer result, Void attachment) { - countWriteAndTryClose(); - } - - @Override - public void failed(Throwable exc, Void attachment) { - logger.log(Level.SEVERE, exc.getMessage(), exc); - countWriteAndTryClose(); - // There is no point in trying to continue. Close the transport. - @SuppressWarnings({"unused", "nullness"}) - Future<?> possiblyIgnoredError = close(); - } - - private void countWriteAndTryClose() { - synchronized (FileTransport.this) { - checkState(outstandingWrites > 0); - - outstandingWrites--; - - if (closing() && writesComplete()) { - doClose(); - } - } - } + @Override + public void closeNow() { + out.closeNow(); } } |