aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/util/io/StreamDemultiplexer.java
blob: 051f218515a04d1d900457e6abbcc3872929def8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.util.io;

import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;

import java.io.IOException;
import java.io.OutputStream;

/**
 * The dual of {@link StreamMultiplexer}: This is an output stream into which
 * you can dump the multiplexed stream, and it delegates the de-multiplexed
 * content back into separate channels (instances of {@link OutputStream}).
 *
 * The format of the tagged output stream is as follows:
 *
 * <pre>
 * combined :: = [ control_line payload ... ]+
 * control_line :: = '@' marker '@'? '\n'
 * payload :: = r'^[^\n]*\n'
 * </pre>
 *
 * For more details, please see {@link StreamMultiplexer}.
 */
@ThreadCompatible
public final class StreamDemultiplexer extends OutputStream {

  @Override
  public void close() throws IOException {
    flush();
  }

  @Override
  public void flush() throws IOException {
    if (selectedStream != null) {
      selectedStream.flush();
    }
  }

  private static final byte AT = '@';
  private static final byte NEWLINE = '\n';

  /**
   * The output streams, conveniently in an array indexed by the marker byte.
   * Some of these will be null, most likely.
   */
  private final OutputStream[] outputStreams =
    new OutputStream[Byte.MAX_VALUE + 1];

  /**
   * Each state in this FSM corresponds to a position in the grammar, which is
   * simple enough that we can just move through it from beginning to end as we
   * parse things.
   */
  private enum State {
    EXPECT_CONTROL_STARTING_AT,
    EXPECT_MARKER_BYTE,
    EXPECT_AT_OR_NEWLINE,
    EXPECT_PAYLOAD_OR_NEWLINE
  }

  private State state = State.EXPECT_CONTROL_STARTING_AT;
  private boolean addNewlineToPayload;
  private OutputStream selectedStream;

  /**
   * Construct a new demultiplexer. The {@code smallestMarkerByte} indicates
   * the marker byte we would expect for {@code outputStreams[0]} to be used.
   * So, if this first stream is your stdout and you're using the
   * {@link StreamMultiplexer}, then you will need to set this to
   * {@code '1'}. Because {@link StreamDemultiplexer} extends
   * {@link OutputStream}, this constructor effectively creates an
   * {@link OutputStream} instance which demultiplexes the tagged data client
   * code writes to it into {@code outputStreams}.
   */
  public StreamDemultiplexer(byte smallestMarkerByte,
                             OutputStream... outputStreams) {
    for (int i = 0; i < outputStreams.length; i++) {
      this.outputStreams[smallestMarkerByte + i] = outputStreams[i];
    }
  }

  @Override
  public void write(int b) throws IOException {
    // This dispatch traverses the finite state machine / grammar.
    switch (state) {
      case EXPECT_CONTROL_STARTING_AT:
        parseControlStartingAt((byte) b);
        resetFields();
        break;
      case EXPECT_MARKER_BYTE:
        parseMarkerByte((byte) b);
        break;
      case EXPECT_AT_OR_NEWLINE:
        parseAtOrNewline((byte) b);
        break;
      case EXPECT_PAYLOAD_OR_NEWLINE:
        parsePayloadOrNewline((byte) b);
        break;
    }
  }

  /**
   * Handles {@link State#EXPECT_PAYLOAD_OR_NEWLINE}, which is the payload
   * we are actually transporting over the wire. At this point we can rely
   * on a stream having been preselected into {@link #selectedStream}, and
   * also we will add a newline if {@link #addNewlineToPayload} is set.
   * Flushes at the end of every payload segment.
   */
  private void parsePayloadOrNewline(byte b) throws IOException {
    if (b == NEWLINE) {
      if (addNewlineToPayload) {
        selectedStream.write(NEWLINE);
      }
      selectedStream.flush();
      state = State.EXPECT_CONTROL_STARTING_AT;
    } else {
      selectedStream.write(b);
      selectedStream.flush(); // slow?
    }
  }

  /**
   * Handles {@link State#EXPECT_AT_OR_NEWLINE}, which is either the
   * suppress newline indicator (at) at the end of a control line, or the end
   * of a control line.
   */
  private void parseAtOrNewline(byte b) throws IOException {
    if (b == NEWLINE) {
      state = State.EXPECT_PAYLOAD_OR_NEWLINE;
    } else if (b == AT) {
      addNewlineToPayload = false;
    } else {
      throw new IOException("Expected @ or \\n. (" + b + ")");
    }
  }

  /**
   * Reset the fields that are affected by our state.
   */
  private void resetFields() {
    selectedStream = null;
    addNewlineToPayload = true;
  }

  /**
   * Handles {@link State#EXPECT_MARKER_BYTE}. The byte determines which stream
   * we will be using, and will set {@link #selectedStream}.
   */
  private void parseMarkerByte(byte markerByte) throws IOException {
    if (markerByte < 0 || markerByte > Byte.MAX_VALUE) {
      String msg = "Illegal marker byte (" + markerByte + ")";
      throw new IllegalArgumentException(msg);
    }
    if (markerByte > outputStreams.length
        || outputStreams[markerByte] == null) {
      throw new IOException("stream " + markerByte + " not registered.");
    }
    selectedStream = outputStreams[markerByte];
    state = State.EXPECT_AT_OR_NEWLINE;
  }

  /**
   * Handles {@link State#EXPECT_CONTROL_STARTING_AT}, the very first '@' with
   * which each message starts.
   */
  private void parseControlStartingAt(byte b) throws IOException {
    if (b != AT) {
      throw new IOException("Expected control starting @. (" + b + ", "
          + (char) b + ")");
    }
    state = State.EXPECT_MARKER_BYTE;
  }

}