From d08b27fa9701fecfdb69e1b0d1ac2459efc2129b Mon Sep 17 00:00:00 2001 From: Han-Wen Nienhuys Date: Wed, 25 Feb 2015 16:45:20 +0100 Subject: Update from Google. -- MOE_MIGRATED_REVID=85702957 --- .../google/devtools/build/lib/shell/Consumers.java | 359 +++++++++++++++++++++ 1 file changed, 359 insertions(+) create mode 100644 src/main/java/com/google/devtools/build/lib/shell/Consumers.java (limited to 'src/main/java/com/google/devtools/build/lib/shell/Consumers.java') diff --git a/src/main/java/com/google/devtools/build/lib/shell/Consumers.java b/src/main/java/com/google/devtools/build/lib/shell/Consumers.java new file mode 100644 index 0000000000..3ed5b7e5ae --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/shell/Consumers.java @@ -0,0 +1,359 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.shell; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This class provides convenience methods for consuming (actively reading) + * output and error streams with different consumption policies: + * discarding ({@link #createDiscardingConsumers()}, + * accumulating ({@link #createAccumulatingConsumers()}, + * and streaming ({@link #createStreamingConsumers(OutputStream, OutputStream)}). + */ +class Consumers { + + private static final Logger log = + Logger.getLogger("com.google.devtools.build.lib.shell.Command"); + + private Consumers() {} + + private static final ExecutorService pool = + Executors.newCachedThreadPool(new AccumulatorThreadFactory()); + + static OutErrConsumers createDiscardingConsumers() { + return new OutErrConsumers(new DiscardingConsumer(), + new DiscardingConsumer()); + } + + static OutErrConsumers createAccumulatingConsumers() { + return new OutErrConsumers(new AccumulatingConsumer(), + new AccumulatingConsumer()); + } + + static OutErrConsumers createStreamingConsumers(OutputStream out, + OutputStream err) { + return new OutErrConsumers(new StreamingConsumer(out), + new StreamingConsumer(err)); + } + + static class OutErrConsumers { + + private final OutputConsumer out; + private final OutputConsumer err; + + private OutErrConsumers(final OutputConsumer out, final OutputConsumer err){ + this.out = out; + this.err = err; + } + + void registerInputs(InputStream outInput, InputStream errInput, boolean closeStreams){ + out.registerInput(outInput, closeStreams); + err.registerInput(errInput, closeStreams); + } + + void cancel() { + out.cancel(); + err.cancel(); + } + + void waitForCompletion() throws IOException { + out.waitForCompletion(); + err.waitForCompletion(); + } + + ByteArrayOutputStream getAccumulatedOut(){ + return out.getAccumulatedOut(); + } + + ByteArrayOutputStream getAccumulatedErr() { + return err.getAccumulatedOut(); + } + + void logConsumptionStrategy() { + // The creation methods guarantee that the consumption strategy is + // the same for out and err - doesn't matter whether we call out or err, + // let's pick out. + out.logConsumptionStrategy(); + } + + } + + /** + * This interface describes just one consumer, which consumes the + * InputStream provided by {@link #registerInput(InputStream, boolean)}. + * Implementations implement different consumption strategies. + */ + private static interface OutputConsumer { + /** + * Returns whatever the consumer accumulated internally, or + * {@link CommandResult#NO_OUTPUT_COLLECTED} if it doesn't accumulate + * any output. + * + * @see AccumulatingConsumer + */ + ByteArrayOutputStream getAccumulatedOut(); + + void logConsumptionStrategy(); + + void registerInput(InputStream in, boolean closeConsumer); + + void cancel(); + + void waitForCompletion() throws IOException; + } + + /** + * This consumer sends the input to a stream while consuming it. + */ + private static class StreamingConsumer extends FutureConsumption + implements OutputConsumer { + private OutputStream out; + + StreamingConsumer(OutputStream out) { + this.out = out; + } + + @Override + public ByteArrayOutputStream getAccumulatedOut() { + return CommandResult.NO_OUTPUT_COLLECTED; + } + + @Override + public void logConsumptionStrategy() { + log.finer("Output will be sent to streams provided by client"); + } + + @Override protected Runnable createConsumingAndClosingSink(InputStream in, + boolean closeConsumer) { + return new ClosingSink(in, out, closeConsumer); + } + } + + /** + * This consumer sends the input to a {@link ByteArrayOutputStream} + * while consuming it. This accumulated stream can be obtained by + * calling {@link #getAccumulatedOut()}. + */ + private static class AccumulatingConsumer extends FutureConsumption + implements OutputConsumer { + private ByteArrayOutputStream out = new ByteArrayOutputStream(); + + @Override + public ByteArrayOutputStream getAccumulatedOut() { + return out; + } + + @Override + public void logConsumptionStrategy() { + log.finer("Output will be accumulated (promptly read off) and returned"); + } + + @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) { + return new ClosingSink(in, out); + } + } + + /** + * This consumer just discards whatever it reads. + */ + private static class DiscardingConsumer extends FutureConsumption + implements OutputConsumer { + private DiscardingConsumer() { + } + + @Override + public ByteArrayOutputStream getAccumulatedOut() { + return CommandResult.NO_OUTPUT_COLLECTED; + } + + @Override + public void logConsumptionStrategy() { + log.finer("Output will be ignored"); + } + + @Override public Runnable createConsumingAndClosingSink(InputStream in, boolean closeConsumer) { + return new ClosingSink(in); + } + } + + /** + * A mixin that makes consumers active - this is where we kick of + * multithreading ({@link #registerInput(InputStream, boolean)}), cancel actions + * and wait for the consumers to complete. + */ + private abstract static class FutureConsumption implements OutputConsumer { + + private Future future; + + @Override + public void registerInput(InputStream in, boolean closeConsumer){ + Runnable sink = createConsumingAndClosingSink(in, closeConsumer); + future = pool.submit(sink); + } + + protected abstract Runnable createConsumingAndClosingSink(InputStream in, boolean close); + + @Override + public void cancel() { + future.cancel(true); + } + + @Override + public void waitForCompletion() throws IOException { + boolean wasInterrupted = false; + try { + while (true) { + try { + future.get(); + break; + } catch (InterruptedException ie) { + wasInterrupted = true; + // continue waiting + } catch (ExecutionException ee) { + // Runnable threw a RuntimeException + Throwable nested = ee.getCause(); + if (nested instanceof RuntimeException) { + final RuntimeException re = (RuntimeException) nested; + // The stream sink classes, unfortunately, tunnel IOExceptions + // out of run() in a RuntimeException. If that's the case, + // unpack and re-throw the IOException. Otherwise, re-throw + // this unexpected RuntimeException + final Throwable cause = re.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw re; + } + } else if (nested instanceof OutOfMemoryError) { + // OutOfMemoryError does not support exception chaining. + throw (OutOfMemoryError) nested; + } else if (nested instanceof Error) { + throw new Error("unhandled Error in worker thread", ee); + } else { + throw new RuntimeException("unknown execution problem", ee); + } + } + } + } finally { + // Read this for detailed explanation: + // http://www-128.ibm.com/developerworks/java/library/j-jtp05236.html + if (wasInterrupted) { + Thread.currentThread().interrupt(); // preserve interrupted status + } + } + } + } + + /** + * Factory which produces threads with a 32K stack size. + */ + private static class AccumulatorThreadFactory implements ThreadFactory { + + private static final int THREAD_STACK_SIZE = 32 * 1024; + + private static int threadInitNumber; + + private static synchronized int nextThreadNum() { + return threadInitNumber++; + } + + @Override + public Thread newThread(final Runnable runnable) { + final Thread t = + new Thread(null, + runnable, + "Command-Accumulator-Thread-" + nextThreadNum(), + THREAD_STACK_SIZE); + // Don't let this thread hold up JVM exit + t.setDaemon(true); + return t; + } + + } + + /** + * A sink that closes its input stream once its done. + */ + private static class ClosingSink implements Runnable { + + private final InputStream in; + private final OutputStream out; + private final Runnable sink; + private final boolean close; + + /** + * Creates a sink that will pump InputStream in + * into OutputStream out. + */ + ClosingSink(final InputStream in, OutputStream out) { + this(in, out, false); + } + + /** + * Creates a sink that will read in and discard it. + */ + ClosingSink(final InputStream in) { + this.sink = InputStreamSink.newRunnableSink(in); + this.in = in; + this.close = false; + this.out = null; + } + + ClosingSink(final InputStream in, OutputStream out, boolean close){ + this.sink = InputStreamSink.newRunnableSink(in, out); + this.in = in; + this.out = out; + this.close = close; + } + + + @Override + public void run() { + try { + sink.run(); + } finally { + silentClose(in); + if (close && out != null) { + silentClose(out); + } + } + } + + } + + /** + * Close the in stream and log a warning if anything happens. + */ + private static void silentClose(final Closeable closeable) { + try { + closeable.close(); + } catch (IOException ioe) { + String message = "Unexpected exception while closing input stream"; + log.log(Level.WARNING, message, ioe); + } + } + +} -- cgit v1.2.3