aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java186
1 files changed, 186 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java b/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java
new file mode 100644
index 0000000000..ffe0c19340
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java
@@ -0,0 +1,186 @@
+// Copyright 2014 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.util.io;
+
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * The dual of {@link StreamMultiplexer}: This is an output stream into which
+ * you can dump the multiplexed stream, and it delegates the de-multiplexed
+ * content back into separate channels (instances of {@link OutputStream}).
+ *
+ * The format of the tagged output stream is as follows:
+ *
+ * <pre>
+ * combined :: = [ control_line payload ... ]+
+ * control_line :: = '@' marker '@'? '\n'
+ * payload :: = r'^[^\n]*\n'
+ * </pre>
+ *
+ * For more details, please see {@link StreamMultiplexer}.
+ */
+@ThreadCompatible
+public final class StreamDemultiplexer extends OutputStream {
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (selectedStream != null) {
+ selectedStream.flush();
+ }
+ }
+
+ private static final byte AT = '@';
+ private static final byte NEWLINE = '\n';
+
+ /**
+ * The output streams, conveniently in an array indexed by the marker byte.
+ * Some of these will be null, most likely.
+ */
+ private final OutputStream[] outputStreams =
+ new OutputStream[Byte.MAX_VALUE + 1];
+
+ /**
+ * Each state in this FSM corresponds to a position in the grammar, which is
+ * simple enough that we can just move through it from beginning to end as we
+ * parse things.
+ */
+ private enum State {
+ EXPECT_CONTROL_STARTING_AT,
+ EXPECT_MARKER_BYTE,
+ EXPECT_AT_OR_NEWLINE,
+ EXPECT_PAYLOAD_OR_NEWLINE
+ }
+
+ private State state = State.EXPECT_CONTROL_STARTING_AT;
+ private boolean addNewlineToPayload;
+ private OutputStream selectedStream;
+
+ /**
+ * Construct a new demultiplexer. The {@code smallestMarkerByte} indicates
+ * the marker byte we would expect for {@code outputStreams[0]} to be used.
+ * So, if this first stream is your stdout and you're using the
+ * {@link StreamMultiplexer}, then you will need to set this to
+ * {@code '1'}. Because {@link StreamDemultiplexer} extends
+ * {@link OutputStream}, this constructor effectively creates an
+ * {@link OutputStream} instance which demultiplexes the tagged data client
+ * code writes to it into {@code outputStreams}.
+ */
+ public StreamDemultiplexer(byte smallestMarkerByte,
+ OutputStream... outputStreams) {
+ for (int i = 0; i < outputStreams.length; i++) {
+ this.outputStreams[smallestMarkerByte + i] = outputStreams[i];
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ // This dispatch traverses the finite state machine / grammar.
+ switch (state) {
+ case EXPECT_CONTROL_STARTING_AT:
+ parseControlStartingAt((byte) b);
+ resetFields();
+ break;
+ case EXPECT_MARKER_BYTE:
+ parseMarkerByte((byte) b);
+ break;
+ case EXPECT_AT_OR_NEWLINE:
+ parseAtOrNewline((byte) b);
+ break;
+ case EXPECT_PAYLOAD_OR_NEWLINE:
+ parsePayloadOrNewline((byte) b);
+ break;
+ }
+ }
+
+ /**
+ * Handles {@link State#EXPECT_PAYLOAD_OR_NEWLINE}, which is the payload
+ * we are actually transporting over the wire. At this point we can rely
+ * on a stream having been preselected into {@link #selectedStream}, and
+ * also we will add a newline if {@link #addNewlineToPayload} is set.
+ * Flushes at the end of every payload segment.
+ */
+ private void parsePayloadOrNewline(byte b) throws IOException {
+ if (b == NEWLINE) {
+ if (addNewlineToPayload) {
+ selectedStream.write(NEWLINE);
+ }
+ selectedStream.flush();
+ state = State.EXPECT_CONTROL_STARTING_AT;
+ } else {
+ selectedStream.write(b);
+ selectedStream.flush(); // slow?
+ }
+ }
+
+ /**
+ * Handles {@link State#EXPECT_AT_OR_NEWLINE}, which is either the
+ * suppress newline indicator (at) at the end of a control line, or the end
+ * of a control line.
+ */
+ private void parseAtOrNewline(byte b) throws IOException {
+ if (b == NEWLINE) {
+ state = State.EXPECT_PAYLOAD_OR_NEWLINE;
+ } else if (b == AT) {
+ addNewlineToPayload = false;
+ } else {
+ throw new IOException("Expected @ or \\n. (" + b + ")");
+ }
+ }
+
+ /**
+ * Reset the fields that are affected by our state.
+ */
+ private void resetFields() {
+ selectedStream = null;
+ addNewlineToPayload = true;
+ }
+
+ /**
+ * Handles {@link State#EXPECT_MARKER_BYTE}. The byte determines which stream
+ * we will be using, and will set {@link #selectedStream}.
+ */
+ private void parseMarkerByte(byte markerByte) throws IOException {
+ if (markerByte < 0 || markerByte > Byte.MAX_VALUE) {
+ String msg = "Illegal marker byte (" + markerByte + ")";
+ throw new IllegalArgumentException(msg);
+ }
+ if (markerByte > outputStreams.length
+ || outputStreams[markerByte] == null) {
+ throw new IOException("stream " + markerByte + " not registered.");
+ }
+ selectedStream = outputStreams[markerByte];
+ state = State.EXPECT_AT_OR_NEWLINE;
+ }
+
+ /**
+ * Handles {@link State#EXPECT_CONTROL_STARTING_AT}, the very first '@' with
+ * which each message starts.
+ */
+ private void parseControlStartingAt(byte b) throws IOException {
+ if (b != AT) {
+ throw new IOException("Expected control starting @. (" + b + ", "
+ + (char) b + ")");
+ }
+ state = State.EXPECT_MARKER_BYTE;
+ }
+
+}