// Copyright 2014 Google Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.util.io; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import java.io.IOException; import java.io.OutputStream; /** * Instances of this class are multiplexers, which redirect multiple * output streams into a single output stream with tagging so it can be * de-multiplexed into multiple streams as needed. This allows us to * use one connection for multiple streams, but more importantly it avoids * multiple threads or select etc. on the receiving side: A client on the other * end of a networking connection can simply read the tagged lines and then act * on them within a sigle thread. * * The format of the tagged output stream is as follows: * *
 * combined :: = [ control_line payload ... ]+
 * control_line :: = '@' marker '@'? '\n'
 * payload :: = r'^[^\n]*\n'
* * So basically: * * * This format is optimized for easy interpretation by a Python client, but it's * also a compromise in that it's still easy to interpret by a human (let's say * you have to read the traffic over a wire for some reason). */ @ThreadSafe public final class StreamMultiplexer { public static final byte STDOUT_MARKER = '1'; public static final byte STDERR_MARKER = '2'; public static final byte CONTROL_MARKER = '3'; private static final byte AT = '@'; private final Object mutex = new Object(); private final OutputStream multiplexed; public StreamMultiplexer(OutputStream multiplexed) { this.multiplexed = multiplexed; } private class MarkingStream extends LineFlushingOutputStream { private final byte markerByte; MarkingStream(byte markerByte) { this.markerByte = markerByte; } @Override protected void flushingHook() throws IOException { synchronized (mutex) { if (len == 0) { multiplexed.flush(); return; } byte lastByte = buffer[len - 1]; boolean lineIsIncomplete = lastByte != NEWLINE; multiplexed.write(AT); multiplexed.write(markerByte); if (lineIsIncomplete) { multiplexed.write(AT); } multiplexed.write(NEWLINE); multiplexed.write(buffer, 0, len); if (lineIsIncomplete) { multiplexed.write(NEWLINE); } multiplexed.flush(); } len = 0; } } /** * Create a stream that will tag its contributions into the multiplexed stream * with the marker '1', which means 'stdout'. Each newline byte leads * to a forced automatic flush. Also, this stream never closes the underlying * stream it delegates to - calling its {@code close()} method is equivalent * to calling {@code flush}. */ public OutputStream createStdout() { return new MarkingStream(STDOUT_MARKER); } /** * Like {@link #createStdout()}, except it tags with the marker '2' to * indicate 'stderr'. */ public OutputStream createStderr() { return new MarkingStream(STDERR_MARKER); } /** * Like {@link #createStdout()}, except it tags with the marker '3' to * indicate control flow.. */ public OutputStream createControl() { return new MarkingStream(CONTROL_MARKER); } }