diff options
author | 2018-03-06 15:42:50 -0800 | |
---|---|---|
committer | 2018-03-06 15:44:43 -0800 | |
commit | a854d6c0d3d222bbd4ff2a532d48ddd91718908c (patch) | |
tree | b08cd852566686c3df0d26aac9c76a723f05662a | |
parent | f00d624ba6b39de6aa0ba5a9626526f239ab413d (diff) |
Adding async proto or text logging utility class.
WANT_LGTM=buchgr
TESTED=unit tests, 500 runs per test
RELNOTES: None
PiperOrigin-RevId: 188093043
4 files changed, 606 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/BUILD b/src/main/java/com/google/devtools/build/lib/BUILD index 814716d5b8..5dcca6bb65 100644 --- a/src/main/java/com/google/devtools/build/lib/BUILD +++ b/src/main/java/com/google/devtools/build/lib/BUILD @@ -118,6 +118,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/vfs", "//third_party:guava", + "//third_party/protobuf:protobuf_java", ], ) diff --git a/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java b/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java new file mode 100644 index 0000000000..70284d08c9 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStream.java @@ -0,0 +1,236 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.util.io; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.devtools.build.lib.concurrent.ThreadSafety; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.Message; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An output stream supporting anynchronous writes, backed by a file. + * + * <p>We use an {@link AsynchronousFileChannel} to perform non-blocking writes to a file. It gets + * tricky when it comes to {@link #closeAsync()}, as we may only complete the returned future when + * all writes have completed (succeeded or failed). Thus, we use a field {@link #outstandingWrites} + * to keep track of the number of writes that have not completed yet. It's incremented before a new + * write and decremented after a write has completed. When it's {@code 0} it's safe to complete the + * close future. + */ +@ThreadSafety.ThreadSafe +public class AsynchronousFileOutputStream extends OutputStream { + private final AsynchronousFileChannel ch; + private final WriteCompletionHandler completionHandler = new WriteCompletionHandler(); + // The offset in the file to begin the next write at. + private long writeOffset; + // Number of writes that haven't completed yet. + private long outstandingWrites; + // The future returned by closeAsync(). + private SettableFuture<Void> closeFuture; + // To store any exception raised from the writes. + private final AtomicReference<Throwable> exception = new AtomicReference<>(); + + public AsynchronousFileOutputStream(String filename) throws IOException { + this( + AsynchronousFileChannel.open( + Paths.get(filename), + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING)); + } + + @VisibleForTesting + public AsynchronousFileOutputStream(AsynchronousFileChannel ch) throws IOException { + this.ch = ch; + } + + public void write(String message) { + write(message.getBytes(UTF_8)); + } + + /** + * Writes a delimited protocol buffer message in the same format as {@link + * MessageLite#writeDelimitedTo(java.io.OutputStream)}. + * + * <p>Unfortunately, {@link MessageLite#writeDelimitedTo(java.io.OutputStream)} may result in + * multiple calls to write on the underlying stream, so we have to provide this method here + * instead of the caller using it directly. + */ + public void write(Message m) { + Preconditions.checkNotNull(m); + final int size = m.getSerializedSize(); + ByteArrayOutputStream bos = + new ByteArrayOutputStream(CodedOutputStream.computeRawVarint32Size(size) + size); + try { + m.writeDelimitedTo(bos); + } catch (IOException e) { + // This should never happen with an in-memory stream. + exception.compareAndSet(null, new IllegalStateException(e.toString())); + return; + } + write(bos.toByteArray()); + } + + @Override + public void write(int b) { + throw new UnsupportedOperationException(); + } + + /** + * Writes the byte buffer into the file asynchronously. + * + * <p>The writes are guaranteed to land in the output file in the same order that they were + * called; However, some writes may fail, leaving the file partially corrupted. In case a write + * fails, an exception will be propagated in close, but remaining writes will be allowed to + * continue. + */ + @Override + public synchronized void write(byte[] data) { + Preconditions.checkNotNull(data); + Preconditions.checkState(ch.isOpen()); + + if (closeFuture != null) { + throw new IllegalStateException("Attempting to write to stream after close"); + } + + outstandingWrites++; + ch.write(ByteBuffer.wrap(data), writeOffset, null, completionHandler); + writeOffset += data.length; + } + + /* Returns whether the stream is open for writing. */ + public boolean isOpen() { + return ch.isOpen(); + } + + /** + * Closes the stream without waiting until pending writes are committed, and supressing errors. + * + * <p>Pending writes will still continue asynchronously, but any errors will be ignored. + */ + @SuppressWarnings("FutureReturnValueIgnored") + public void closeNow() { + closeAsync(); + } + + /** + * Closes the stream and blocks until all pending writes are completed. + * + * Throws an exception if any of the writes or the close itself have failed. + */ + @Override + public void close() throws IOException { + try { + closeAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Write interrupted"); + } catch (ExecutionException e) { + Throwable c = e.getCause(); + Throwables.throwIfUnchecked(c); + if (c instanceof IOException) { + throw (IOException) c; + } + throw new IOException("Exception within stream close: " + c); + } + } + + /** + * Flushes the currently ongoing writes into the channel. + * + * Throws an exception if any of the writes or the close itself have failed. + */ + @Override + public void flush() throws IOException { + ch.force(true); + } + + /** + * Closes the channel, if close was invoked and there are no outstanding writes. Should only be + * called in a synchronized context. + */ + private void closeIfNeeded() { + if (closeFuture == null || outstandingWrites > 0) { + return; + } + try { + flush(); + ch.close(); + } catch (Exception e) { + exception.compareAndSet(null, e); + } finally { + Throwable e = exception.get(); + if (e == null) { + closeFuture.set(null); + } else { + closeFuture.setException(e); + } + } + } + + /** + * Returns a future that will close the stream when all pending writes are completed. + * + * Any failed writes will propagate an exception. + */ + public synchronized ListenableFuture<Void> closeAsync() { + if (closeFuture != null) { + return closeFuture; + } + closeFuture = SettableFuture.create(); + closeIfNeeded(); + return closeFuture; + } + + /** + * Handler that's notified when a write completes. + */ + private final class WriteCompletionHandler implements CompletionHandler<Integer, Void> { + + @Override + public void completed(Integer result, Void attachment) { + countWritesAndTryClose(); + } + + @Override + public void failed(Throwable e, Void attachment) { + exception.compareAndSet(null, e); + countWritesAndTryClose(); + } + + private void countWritesAndTryClose() { + synchronized (AsynchronousFileOutputStream.this) { + Preconditions.checkState(outstandingWrites > 0); + outstandingWrites--; + closeIfNeeded(); + } + } + } +} diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD index c7d819a0ee..99d4335954 100644 --- a/src/test/java/com/google/devtools/build/lib/BUILD +++ b/src/test/java/com/google/devtools/build/lib/BUILD @@ -314,6 +314,8 @@ java_test( ":testutil", "//src/main/java/com/google/devtools/build/lib:io", "//src/main/java/com/google/devtools/build/lib:util", + "//src/main/protobuf:bazel_flags_java_proto", + "//third_party:mockito", ], ) diff --git a/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java b/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java new file mode 100644 index 0000000000..39c4d6bd76 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/util/io/AsynchronousFileOutputStreamTest.java @@ -0,0 +1,367 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.util.io; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.runtime.commands.proto.BazelFlagsProto.FlagInfo; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** Tests {@link AsynchronousFileOutputStream}. */ +@RunWith(JUnit4.class) +public class AsynchronousFileOutputStreamTest { + @Rule public TemporaryFolder tmp = new TemporaryFolder(); + @Mock AsynchronousFileChannel mockChannel; + Random random = ThreadLocalRandom.current(); + static final char[] RAND_CHARS = "abcdefghijklmnopqrstuvwxzy0123456789-".toCharArray(); + static final int RAND_STRING_LENGTH = 10; + + @Before + public void initMocks() { + MockitoAnnotations.initMocks(this); + } + + @After + public void validateMocks() { + Mockito.validateMockitoUsage(); + } + + private FlagInfo generateRandomMessage() { + FlagInfo.Builder b = FlagInfo.newBuilder(); + b.setName(generateRandomString() + "a"); // Name is required, cannot be empty. + b.setHasNegativeFlag(random.nextBoolean()); + b.setDocumentation(generateRandomString()); + int commandsSize = random.nextInt(5); + for (int i = 0; i < commandsSize; ++i) { + b.addCommands(generateRandomString()); + } + return b.build(); + } + + private String generateRandomString() { + int len = random.nextInt(RAND_STRING_LENGTH + 1); + char[] data = new char[len]; + for (int i = 0; i < len; ++i) { + data[i] = RAND_CHARS[random.nextInt(RAND_CHARS.length)]; + } + return new String(data); + } + + @Test + public void testConcurrentWrites() throws Exception { + Path logPath = tmp.newFile().toPath(); + AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(logPath.toString()); + Thread[] writers = new Thread[10]; + final CountDownLatch start = new CountDownLatch(writers.length); + for (int i = 0; i < writers.length; ++i) { + String name = "Thread # " + i; + Thread thread = new Thread() { + @Override + public void run() { + try { + start.countDown(); + start.await(); + } catch (InterruptedException e) { + return; + } + for (int j = 0; j < 10; ++j) { + out.write(name + " time # " + j + "\n"); + } + } + }; + writers[i] = thread; + thread.start(); + } + for (int i = 0; i < writers.length; ++i) { + writers[i].join(); + } + out.close(); + String contents = + new String(ByteStreams.toByteArray(Files.newInputStream(logPath)), StandardCharsets.UTF_8); + for (int i = 0; i < writers.length; ++i) { + for (int j = 0; j < 10; ++j) { + assertThat(contents).contains("Thread # " + i + " time # " + j + "\n"); + } + } + } + + @Test + public void testConcurrentProtoWrites() throws Exception { + Path logPath = tmp.newFile().toPath(); + AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(logPath.toString()); + ArrayList<FlagInfo> messages = new ArrayList<>(); + for (int i = 0; i < 100; ++i) { + messages.add(generateRandomMessage()); + } + Thread[] writers = new Thread[messages.size() / 10]; + final CountDownLatch start = new CountDownLatch(writers.length); + for (int i = 0; i < writers.length; ++i) { + int startIndex = i * 10; + Thread thread = new Thread() { + @Override + public void run() { + try { + start.countDown(); + start.await(); + } catch (InterruptedException e) { + return; + } + for (int j = startIndex; j < startIndex + 10; ++j) { + out.write(messages.get(j)); + } + } + }; + writers[i] = thread; + thread.start(); + } + for (int i = 0; i < writers.length; ++i) { + writers[i].join(); + } + out.close(); + ArrayList<FlagInfo> readMessages = new ArrayList<>(); + try (InputStream in = Files.newInputStream(logPath)) { + for (int i = 0; i < messages.size(); ++i) { + readMessages.add(FlagInfo.parseDelimitedFrom(in)); + } + } + assertThat(readMessages).containsExactlyElementsIn(messages); + } + + @Test + public void testFailedClosePropagatesIOException() throws Exception { + AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel); + when(mockChannel.isOpen()).thenReturn(true); + IOException ex = new IOException("foo"); + Mockito.doThrow(ex).when(mockChannel).close(); + Mockito.doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") + CompletionHandler<Integer, Void> handler = + (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3]; + handler.completed(0, null); // We ignore the arguments. + return null; + }) + .when(mockChannel) + .write( + any(ByteBuffer.class), + any(Integer.class), + eq(null), + Mockito.<CompletionHandler<Integer, Void>>anyObject()); + out.write("bla"); + + try { + out.close(); + fail("Expected an IOException"); + } catch (IOException expected) { + assertThat(expected).hasMessageThat().isEqualTo("foo"); + } + } + + @Test + public void testFailedClosePropagatesUncheckedException() throws Exception { + AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel); + when(mockChannel.isOpen()).thenReturn(true); + RuntimeException ex = new RuntimeException("foo"); + Mockito.doThrow(ex).when(mockChannel).close(); + Mockito.doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") + CompletionHandler<Integer, Void> handler = + (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3]; + handler.completed(0, null); // We ignore the arguments. + return null; + }) + .when(mockChannel) + .write( + any(ByteBuffer.class), + any(Integer.class), + eq(null), + Mockito.<CompletionHandler<Integer, Void>>anyObject()); + out.write("bla"); + + try { + out.close(); + fail("Expected a RuntimeException"); + } catch (RuntimeException expected) { + assertThat(expected).hasMessageThat().isEqualTo("foo"); + } + } + + @Test + public void testFailedForcePropagatesIOException() throws Exception { + AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel); + when(mockChannel.isOpen()).thenReturn(true); + IOException ex = new IOException("foo"); + Mockito.doThrow(ex).when(mockChannel).force(eq(true)); + Mockito.doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") + CompletionHandler<Integer, Void> handler = + (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3]; + handler.completed(0, null); // We ignore the arguments. + return null; + }) + .when(mockChannel) + .write( + any(ByteBuffer.class), + any(Integer.class), + eq(null), + Mockito.<CompletionHandler<Integer, Void>>anyObject()); + out.write("bla"); + + try { + out.close(); + fail("Expected an IOException"); + } catch (IOException expected) { + assertThat(expected).hasMessageThat().isEqualTo("foo"); + } + } + + @Test + public void testFailedForcePropagatesUncheckedException() throws Exception { + AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel); + when(mockChannel.isOpen()).thenReturn(true); + RuntimeException ex = new RuntimeException("foo"); + Mockito.doThrow(ex).when(mockChannel).force(eq(true)); + Mockito.doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") + CompletionHandler<Integer, Void> handler = + (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3]; + handler.completed(0, null); // We ignore the arguments. + return null; + }) + .when(mockChannel) + .write( + any(ByteBuffer.class), + any(Integer.class), + eq(null), + Mockito.<CompletionHandler<Integer, Void>>anyObject()); + out.write("bla"); + + try { + out.close(); + fail("Expected a RuntimeException"); + } catch (RuntimeException expected) { + assertThat(expected).hasMessageThat().isEqualTo("foo"); + } + } + + @Test + public void testFailedWritePropagatesIOException() throws Exception { + AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel); + when(mockChannel.isOpen()).thenReturn(true); + IOException ex = new IOException("foo"); + Mockito.doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") + CompletionHandler<Integer, Void> handler = + (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3]; + handler.failed(ex, null); + return null; + }) + .when(mockChannel) + .write( + any(ByteBuffer.class), + any(Integer.class), + eq(null), + Mockito.<CompletionHandler<Integer, Void>>anyObject()); + out.write("bla"); + out.write("blo"); + + try { + out.close(); + fail("Expected an IOException"); + } catch (IOException expected) { + assertThat(expected).hasMessageThat().isEqualTo("foo"); + } + } + + @Test + public void testFailedWritePropagatesUncheckedException() throws Exception { + AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(mockChannel); + when(mockChannel.isOpen()).thenReturn(true); + RuntimeException ex = new RuntimeException("foo"); + Mockito.doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") + CompletionHandler<Integer, Void> handler = + (CompletionHandler<Integer, Void>) invocationOnMock.getArguments()[3]; + handler.failed(ex, null); + return null; + }) + .when(mockChannel) + .write( + any(ByteBuffer.class), + any(Integer.class), + eq(null), + Mockito.<CompletionHandler<Integer, Void>>anyObject()); + out.write("bla"); + out.write("blo"); + + try { + out.close(); + fail("Expected a RuntimeException"); + } catch (RuntimeException expected) { + assertThat(expected).hasMessageThat().isEqualTo("foo"); + } + } + + @Test + public void testWriteAfterCloseThrowsException() throws Exception { + Path logPath = tmp.newFile().toPath(); + AsynchronousFileChannel ch = AsynchronousFileChannel.open( + logPath, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING); + AsynchronousFileOutputStream out = new AsynchronousFileOutputStream(ch); + out.write("bla"); + ch.close(); + + try { + out.write("blo"); + fail("Expected an IllegalStateException"); + } catch (IllegalStateException expected) { + // Expected. + } + } +} |