aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/shell/Consumers.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/shell/Consumers.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/shell/Consumers.java359
1 files changed, 359 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/shell/Consumers.java b/src/main/java/com/google/devtools/build/lib/shell/Consumers.java
new file mode 100644
index 0000000000..3ed5b7e5ae
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/shell/Consumers.java
@@ -0,0 +1,359 @@
+// Copyright 2014 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.shell;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This class provides convenience methods for consuming (actively reading)
+ * output and error streams with different consumption policies:
+ * discarding ({@link #createDiscardingConsumers()},
+ * accumulating ({@link #createAccumulatingConsumers()},
+ * and streaming ({@link #createStreamingConsumers(OutputStream, OutputStream)}).
+ */
+class Consumers {
+
+ private static final Logger log =
+ Logger.getLogger("com.google.devtools.build.lib.shell.Command");
+
+ private Consumers() {}
+
+ private static final ExecutorService pool =
+ Executors.newCachedThreadPool(new AccumulatorThreadFactory());
+
+ static OutErrConsumers createDiscardingConsumers() {
+ return new OutErrConsumers(new DiscardingConsumer(),
+ new DiscardingConsumer());
+ }
+
+ static OutErrConsumers createAccumulatingConsumers() {
+ return new OutErrConsumers(new AccumulatingConsumer(),
+ new AccumulatingConsumer());
+ }
+
+ static OutErrConsumers createStreamingConsumers(OutputStream out,
+ OutputStream err) {
+ return new OutErrConsumers(new StreamingConsumer(out),
+ new StreamingConsumer(err));
+ }
+
+ static class OutErrConsumers {
+
+ private final OutputConsumer out;
+ private final OutputConsumer err;
+
+ private OutErrConsumers(final OutputConsumer out, final OutputConsumer err){
+ this.out = out;
+ this.err = err;
+ }
+
+ void registerInputs(InputStream outInput, InputStream errInput, boolean closeStreams){
+ out.registerInput(outInput, closeStreams);
+ err.registerInput(errInput, closeStreams);
+ }
+
+ void cancel() {
+ out.cancel();
+ err.cancel();
+ }
+
+ void waitForCompletion() throws IOException {
+ out.waitForCompletion();
+ err.waitForCompletion();
+ }
+
+ ByteArrayOutputStream getAccumulatedOut(){
+ return out.getAccumulatedOut();
+ }
+
+ ByteArrayOutputStream getAccumulatedErr() {
+ return err.getAccumulatedOut();
+ }
+
+ void logConsumptionStrategy() {
+ // The creation methods guarantee that the consumption strategy is
+ // the same for out and err - doesn't matter whether we call out or err,
+ // let's pick out.
+ out.logConsumptionStrategy();
+ }
+
+ }
+
+ /**
+ * This interface describes just one consumer, which consumes the
+ * InputStream provided by {@link #registerInput(InputStream, boolean)}.
+ * Implementations implement different consumption strategies.
+ */
+ private static interface OutputConsumer {
+ /**
+ * Returns whatever the consumer accumulated internally, or
+ * {@link CommandResult#NO_OUTPUT_COLLECTED} if it doesn't accumulate
+ * any output.
+ *
+ * @see AccumulatingConsumer
+ */
+ ByteArrayOutputStream getAccumulatedOut();
+
+ void logConsumptionStrategy();
+
+ void registerInput(InputStream in, boolean closeConsumer);
+
+ void cancel();
+
+ void waitForCompletion() throws IOException;
+ }
+
+ /**
+ * This consumer sends the input to a stream while consuming it.
+ */
+ private static class StreamingConsumer extends FutureConsumption
+ implements OutputConsumer {
+ private OutputStream out;
+
+ StreamingConsumer(OutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public ByteArrayOutputStream getAccumulatedOut() {
+ return CommandResult.NO_OUTPUT_COLLECTED;
+ }
+
+ @Override
+ public void logConsumptionStrategy() {
+ log.finer("Output will be sent to streams provided by client");
+ }
+
+ @Override protected Runnable createConsumingAndClosingSink(InputStream in,
+ boolean closeConsumer) {
+ return new ClosingSink(in, out, closeConsumer);
+ }
+ }
+
+ /**
+ * This consumer sends the input to a {@link ByteArrayOutputStream}
+ * while consuming it. This accumulated stream can be obtained by
+ * calling {@link #getAccumulatedOut()}.
+ */
+ private static class AccumulatingConsumer extends FutureConsumption
+ implements OutputConsumer {
+ private ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ @Override
+ public ByteArrayOutputStream getAccumulatedOut() {
+ return out;
+ }
+
+ @Override
+ public void logConsumptionStrategy() {
+ log.finer("Output will be accumulated (promptly read off) and returned");
+ }
+
+ @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) {
+ return new ClosingSink(in, out);
+ }
+ }
+
+ /**
+ * This consumer just discards whatever it reads.
+ */
+ private static class DiscardingConsumer extends FutureConsumption
+ implements OutputConsumer {
+ private DiscardingConsumer() {
+ }
+
+ @Override
+ public ByteArrayOutputStream getAccumulatedOut() {
+ return CommandResult.NO_OUTPUT_COLLECTED;
+ }
+
+ @Override
+ public void logConsumptionStrategy() {
+ log.finer("Output will be ignored");
+ }
+
+ @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) {
+ return new ClosingSink(in);
+ }
+ }
+
+ /**
+ * A mixin that makes consumers active - this is where we kick of
+ * multithreading ({@link #registerInput(InputStream, boolean)}), cancel actions
+ * and wait for the consumers to complete.
+ */
+ private abstract static class FutureConsumption implements OutputConsumer {
+
+ private Future<?> future;
+
+ @Override
+ public void registerInput(InputStream in, boolean closeConsumer){
+ Runnable sink = createConsumingAndClosingSink(in, closeConsumer);
+ future = pool.submit(sink);
+ }
+
+ protected abstract Runnable createConsumingAndClosingSink(InputStream in, boolean close);
+
+ @Override
+ public void cancel() {
+ future.cancel(true);
+ }
+
+ @Override
+ public void waitForCompletion() throws IOException {
+ boolean wasInterrupted = false;
+ try {
+ while (true) {
+ try {
+ future.get();
+ break;
+ } catch (InterruptedException ie) {
+ wasInterrupted = true;
+ // continue waiting
+ } catch (ExecutionException ee) {
+ // Runnable threw a RuntimeException
+ Throwable nested = ee.getCause();
+ if (nested instanceof RuntimeException) {
+ final RuntimeException re = (RuntimeException) nested;
+ // The stream sink classes, unfortunately, tunnel IOExceptions
+ // out of run() in a RuntimeException. If that's the case,
+ // unpack and re-throw the IOException. Otherwise, re-throw
+ // this unexpected RuntimeException
+ final Throwable cause = re.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw re;
+ }
+ } else if (nested instanceof OutOfMemoryError) {
+ // OutOfMemoryError does not support exception chaining.
+ throw (OutOfMemoryError) nested;
+ } else if (nested instanceof Error) {
+ throw new Error("unhandled Error in worker thread", ee);
+ } else {
+ throw new RuntimeException("unknown execution problem", ee);
+ }
+ }
+ }
+ } finally {
+ // Read this for detailed explanation:
+ // http://www-128.ibm.com/developerworks/java/library/j-jtp05236.html
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt(); // preserve interrupted status
+ }
+ }
+ }
+ }
+
+ /**
+ * Factory which produces threads with a 32K stack size.
+ */
+ private static class AccumulatorThreadFactory implements ThreadFactory {
+
+ private static final int THREAD_STACK_SIZE = 32 * 1024;
+
+ private static int threadInitNumber;
+
+ private static synchronized int nextThreadNum() {
+ return threadInitNumber++;
+ }
+
+ @Override
+ public Thread newThread(final Runnable runnable) {
+ final Thread t =
+ new Thread(null,
+ runnable,
+ "Command-Accumulator-Thread-" + nextThreadNum(),
+ THREAD_STACK_SIZE);
+ // Don't let this thread hold up JVM exit
+ t.setDaemon(true);
+ return t;
+ }
+
+ }
+
+ /**
+ * A sink that closes its input stream once its done.
+ */
+ private static class ClosingSink implements Runnable {
+
+ private final InputStream in;
+ private final OutputStream out;
+ private final Runnable sink;
+ private final boolean close;
+
+ /**
+ * Creates a sink that will pump InputStream <code>in</code>
+ * into OutputStream <code>out</code>.
+ */
+ ClosingSink(final InputStream in, OutputStream out) {
+ this(in, out, false);
+ }
+
+ /**
+ * Creates a sink that will read <code>in</code> and discard it.
+ */
+ ClosingSink(final InputStream in) {
+ this.sink = InputStreamSink.newRunnableSink(in);
+ this.in = in;
+ this.close = false;
+ this.out = null;
+ }
+
+ ClosingSink(final InputStream in, OutputStream out, boolean close){
+ this.sink = InputStreamSink.newRunnableSink(in, out);
+ this.in = in;
+ this.out = out;
+ this.close = close;
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ sink.run();
+ } finally {
+ silentClose(in);
+ if (close && out != null) {
+ silentClose(out);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Close the <code>in</code> stream and log a warning if anything happens.
+ */
+ private static void silentClose(final Closeable closeable) {
+ try {
+ closeable.close();
+ } catch (IOException ioe) {
+ String message = "Unexpected exception while closing input stream";
+ log.log(Level.WARNING, message, ioe);
+ }
+ }
+
+}