aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/runtime/SynchronizedOutputStream.java
blob: 6f7562c247f67d4863e9daefc1513ff7bbc2e3a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright 2017 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.runtime;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.IOException;
import java.io.OutputStream;

/**
 * {@link OutputStream} suitably synchronized for producer-consumer use cases.
 * The method {@link #readAndReset()} allows to read the bytes accumulated so far
 * and simultaneously truncate precisely the bytes read. Moreover, upon such a reset
 * the amount of memory retained is reset to a small constant. This is a difference
 * with resecpt to the behaviour of the standard classes {@link ByteArrayOutputStream}
 * which only resets the index but keeps the array. This difference matters, as we need
 * to support output peeks without retaining this ammount of memory for the rest of the
 * build.
 *
 * <p>This class is expected to be used with the {@link BuildEventStreamer}.
 */
public class SynchronizedOutputStream extends OutputStream {

  // The maximal amount of bytes we intend to store in the buffer. However,
  // the requirement that a single write be written in one go is more important,
  // so the actual size we store in this buffer can be the maximum (not the sum)
  // of this value and the amount of bytes written in a single call to the
  // {@link write(byte[] buffer, int offset, int count)} method.
  private final long maxBufferedLength;

  private byte[] buf;
  private long count;
  private boolean discardAll;

  // The event streamer that is supposed to flush stdout/stderr.
  private BuildEventStreamer streamer;

  public SynchronizedOutputStream(long maxBufferedLength) {
    buf = new byte[64];
    count = 0;
    discardAll = false;
    this.maxBufferedLength = maxBufferedLength;
  }

  public void registerStreamer(BuildEventStreamer streamer) {
    this.streamer = streamer;
  }

  public synchronized void setDiscardAll() {
    discardAll = true;
    count = 0;
    buf = null;
  }

  /**
   * Read the contents of the stream and simultaneously clear them. Also, reset the amount of
   * memory retained to a constant amount.
   */
  public synchronized String readAndReset() {
    String content = new String(buf, 0, (int) count, UTF_8);
    buf = new byte[64];
    count = 0;
    return content;
  }

  @Override
  public void write(int oneByte) throws IOException {
    if (discardAll) {
      return;
    }
    // We change the dependency with respect to that of the super class: write(int)
    // now calls write(int[], int, int) which is implemented without any dependencies.
    write(new byte[] {(byte) oneByte}, 0, 1);
  }

  @Override
  public void write(byte[] buffer, int offset, int count) throws IOException {
    // As we base the less common write(int) on this method, we may not depend not call write(int)
    // directly or indirectly (e.g., by calling super.write(int[], int, int)).
    synchronized (this) {
      if (discardAll) {
        return;
      }
    }
    boolean shouldFlush = false;
    // As we have to do the flushing outside the synchronized block, we have to expect
    // other writes to come immediately after flushing, so we have to do the check inside
    // a while loop.
    boolean didWrite = false;
    while (!didWrite) {
      synchronized (this) {
        if (this.count + (long) count < maxBufferedLength || this.count == 0) {
          if (this.count + (long) count >= (long) buf.length) {
            // We need to increase the buffer; if within the permissible range range for array
            // sizes, we at least double it, otherwise we only increase as far as needed.
            long newsize;
            if (2 * (long) buf.length + count < (long) Integer.MAX_VALUE) {
              newsize = 2 * (long) buf.length + count;
            } else {
              newsize = this.count + count;
            }
            byte[] newbuf = new byte[(int) newsize];
            System.arraycopy(buf, 0, newbuf, 0, (int) this.count);
            this.buf = newbuf;
          }
          System.arraycopy(buffer, offset, buf, (int) this.count, count);
          this.count += (long) count;
          didWrite = true;
        } else {
          shouldFlush = true;
        }
        if (this.count >= maxBufferedLength) {
          shouldFlush = true;
        }
      }
      if (shouldFlush && streamer != null) {
        streamer.flush();
        shouldFlush = false;
      }
    }
  }
}