aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java132
1 files changed, 28 insertions, 104 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index 528e6d6d25..6a59e96c12 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -14,20 +14,14 @@
package com.google.devtools.build.lib.buildeventstream.transports;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
+import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
+import com.google.protobuf.Message;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousFileChannel;
-import java.nio.channels.CompletionHandler;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -39,121 +33,51 @@ import java.util.logging.Logger;
*/
abstract class FileTransport implements BuildEventTransport {
- /**
- * We use an {@link AsynchronousFileChannel} to perform non-blocking writes to a file. It get's
- * tricky when it comes to {@link #close()}, as we may only complete the returned future when all
- * writes have completed (succeeded or failed). Thus, we use a field {@link #outstandingWrites} to
- * keep track of the number of writes that have not completed yet. It's simply incremented before
- * a new write and decremented after a write has completed. When it's {@code 0} it's safe to
- * complete the close future.
- */
private static final Logger logger = Logger.getLogger(FileTransport.class.getName());
-
@VisibleForTesting
- final AsynchronousFileChannel ch;
- private final WriteCompletionHandler completionHandler = new WriteCompletionHandler();
- // The offset in the file to begin the next write at.
- private long writeOffset;
- // Number of writes that haven't completed yet.
- private long outstandingWrites;
- // The future returned by close()
- private SettableFuture<Void> closeFuture;
+ final AsynchronousFileOutputStream out;
FileTransport(String path) {
try {
- ch = AsynchronousFileChannel.open(Paths.get(path), StandardOpenOption.CREATE,
- StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
+ out = new AsynchronousFileOutputStream(path);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
- synchronized void writeData(byte[] data) {
- checkNotNull(data);
- if (!ch.isOpen()) {
- @SuppressWarnings({"unused", "nullness"})
- Future<?> possiblyIgnoredError = close();
- return;
- }
- if (closing()) {
- return;
- }
-
- outstandingWrites++;
+ // Silent wrappers to AsynchronousFileOutputStream methods.
- ch.write(ByteBuffer.wrap(data), writeOffset, null, completionHandler);
-
- writeOffset += data.length;
- }
-
- @Override
- public synchronized ListenableFuture<Void> close() {
- if (closing()) {
- return closeFuture;
- }
- closeFuture = SettableFuture.create();
-
- if (writesComplete()) {
- doClose();
+ protected void write(Message m) {
+ try {
+ out.write(m);
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
}
-
- return closeFuture;
}
- private void doClose() {
+ protected void write(String s) {
try {
- ch.force(true);
- ch.close();
- } catch (IOException e) {
+ out.write(s);
+ } catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
- } finally {
- closeFuture.set(null);
}
}
- @Override
- @SuppressWarnings("FutureReturnValueIgnored")
- public void closeNow() {
- close();
- }
-
- private boolean closing() {
- return closeFuture != null;
- }
- private boolean writesComplete() {
- return outstandingWrites == 0;
+ @Override
+ public synchronized ListenableFuture<Void> close() {
+ return Futures.catching(
+ out.closeAsync(),
+ Throwable.class,
+ (t) -> {
+ logger.log(Level.SEVERE, t.getMessage(), t);
+ return null;
+ },
+ MoreExecutors.directExecutor());
}
- /**
- * Handler that's notified when a write completes.
- */
- private final class WriteCompletionHandler implements CompletionHandler<Integer, Void> {
-
- @Override
- public void completed(Integer result, Void attachment) {
- countWriteAndTryClose();
- }
-
- @Override
- public void failed(Throwable exc, Void attachment) {
- logger.log(Level.SEVERE, exc.getMessage(), exc);
- countWriteAndTryClose();
- // There is no point in trying to continue. Close the transport.
- @SuppressWarnings({"unused", "nullness"})
- Future<?> possiblyIgnoredError = close();
- }
-
- private void countWriteAndTryClose() {
- synchronized (FileTransport.this) {
- checkState(outstandingWrites > 0);
-
- outstandingWrites--;
-
- if (closing() && writesComplete()) {
- doClose();
- }
- }
- }
+ @Override
+ public void closeNow() {
+ out.closeNow();
}
}