aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
blob: 11f28965b3fbe890aaa0f155db3a78773bff9cf5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.profiler;

import static com.google.devtools.build.lib.profiler.ProfilerTask.TASK_COUNT;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.collect.Extrema;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.profiler.PredicateBasedStatRecorder.RecorderAndPredicate;
import com.google.devtools.build.lib.profiler.StatRecorder.VfsHeuristics;
import com.google.devtools.build.lib.util.VarInt;
import com.google.gson.stream.JsonWriter;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.GZIPOutputStream;

/**
 * Blaze internal profiler. Provides facility to report various Blaze tasks and store them
 * (asynchronously) in the file for future analysis.
 *
 * <p>Implemented as singleton so any caller should use Profiler.instance() to obtain reference.
 *
 * <p>Internally, profiler uses two data structures - ThreadLocal task stack to track nested tasks
 * and single ConcurrentLinkedQueue to gather all completed tasks.
 *
 * <p>Also, due to the nature of the provided functionality (instrumentation of all Blaze
 * components), build.lib.profiler package will be used by almost every other Blaze package, so
 * special attention should be paid to avoid any dependencies on the rest of the Blaze code,
 * including build.lib.util and build.lib.vfs. This is important because build.lib.util and
 * build.lib.vfs contain Profiler invocations and any dependency on those two packages would create
 * circular relationship.
 *
 * <p>All gathered instrumentation data will be stored in the file. Please, note, that while file
 * format is described here it is considered internal and can change at any time. For scripting,
 * using blaze analyze-profile --dump=raw would be more robust and stable solution.
 *
 * <p>
 *
 * <pre>
 * Profiler file consists of the deflated stream with following overall structure:
 *   HEADER
 *   TASK_TYPE_TABLE
 *   TASK_RECORD...
 *   EOF_MARKER
 *
 * HEADER:
 *   int32: magic token (Profiler.MAGIC)
 *   int32: version format (Profiler.VERSION)
 *   string: file comment
 *
 * TASK_TYPE_TABLE:
 *   int32: number of type names below
 *   string... : type names. Each of the type names is assigned id according to
 *               their position in this table starting from 0.
 *
 * TASK_RECORD:
 *   int32 size: size of the encoded task record
 *   byte[size] encoded_task_record:
 *     varint64: thread id - as was returned by Thread.getId()
 *     varint32: task id - starting from 1.
 *     varint32: parent task id for subtasks or 0 for root tasks
 *     varint64: start time in ns, relative to the Profiler.start() invocation
 *     varint64: task duration in ns
 *     byte:     task type id (see TASK_TYPE_TABLE)
 *     varint32: description string index incremented by 1 (>0) or 0 this is
 *               a first occurrence of the description string
 *     AGGREGATED_STAT...: remainder of the field (if present) represents
 *                         aggregated stats for that task
 *   string: *optional* description string, will appear only if description
 *           string index above was 0. In that case this string will be
 *           assigned next sequential id so every unique description string
 *           will appear in the file only once - after that it will be
 *           referenced by id.
 *
 * AGGREGATE_STAT:
 *   byte:     stat type
 *   varint32: total number of subtask invocations
 *   varint64: cumulative duration of subtask invocations in ns.
 *
 * EOF_MARKER:
 *   int64: -1 - please note that this corresponds to the thread id in the
 *               TASK_RECORD which is always > 0
 * </pre>
 *
 * @see ProfilerTask enum for recognized task types.
 */
@ThreadSafe
public final class Profiler {
  private static final Logger logger = Logger.getLogger(Profiler.class.getName());

  public static final int MAGIC = 0x11223344;

  // File version number. Note that merely adding new record types in
  // the ProfilerTask does not require bumping version number as long as original
  // enum values are not renamed or deleted.
  public static final int VERSION = 0x03;

  // EOF marker. Must be < 0.
  public static final int EOF_MARKER = -1;

  /** The profiler (a static singleton instance). Inactive by default. */
  private static final Profiler instance = new Profiler();

  private static final int HISTOGRAM_BUCKETS = 20;

  private static final TaskData POISON_PILL = new TaskData(0, 0, null, null, "poison pill");

  /** File format enum. */
  public enum Format {
    BINARY_BAZEL_FORMAT,
    JSON_TRACE_FILE_FORMAT,
    JSON_TRACE_FILE_COMPRESSED_FORMAT;
  }

  /** A task that was very slow. */
  public static final class SlowTask implements Comparable<SlowTask> {
    final long durationNanos;
    final String description;
    final ProfilerTask type;

    private SlowTask(TaskData taskData) {
      this.durationNanos = taskData.duration;
      this.description = taskData.description;
      this.type = taskData.type;
    }

    @Override
    public int compareTo(SlowTask other) {
      long delta = durationNanos - other.durationNanos;
      if (delta < 0) {  // Very clumsy
        return -1;
      } else if (delta > 0) {
        return 1;
      } else {
        return 0;
      }
    }

    public long getDurationNanos() {
      return durationNanos;
    }

    public String getDescription() {
      return description;
    }

    public ProfilerTask getType() {
      return type;
    }
  }

  /**
   * Container for the single task record.
   * Should never be instantiated directly - use TaskStack.create() instead.
   *
   * Class itself is not thread safe, but all access to it from Profiler
   * methods is.
   */
  @ThreadCompatible
  private static final class TaskData {
    final long threadId;
    final long startTimeNanos;
    final int id;
    final int parentId;
    final ProfilerTask type;
    final String description;

    long duration;
    int[] counts; // number of invocations per ProfilerTask type
    long[] durations; // time spend in the task per ProfilerTask type

    TaskData(
        int id, long startTimeNanos, TaskData parent, ProfilerTask eventType, String description) {
      this.id = id;
      this.threadId = Thread.currentThread().getId();
      this.parentId = (parent == null  ? 0 : parent.id);
      this.startTimeNanos = startTimeNanos;
      this.type = eventType;
      this.description = Preconditions.checkNotNull(description);
    }

    /** Aggregates information about an *immediate* subtask. */
    public void aggregateChild(ProfilerTask type, long duration) {
      int index = type.ordinal();
      if (counts == null) {
        // one entry for each ProfilerTask type
        counts = new int[TASK_COUNT];
        durations = new long[TASK_COUNT];
      }
      counts[index]++;
      durations[index] += duration;
    }

    @Override
    public String toString() {
      return "Thread " + threadId + ", task " + id + ", type " + type + ", " + description;
    }
  }

  /**
   * Tracks nested tasks for each thread.
   *
   * java.util.ArrayDeque is the most efficient stack implementation in the
   * Java Collections Framework (java.util.Stack class is older synchronized
   * alternative). It is, however, used here strictly for LIFO operations.
   * However, ArrayDeque is 1.6 only. For 1.5 best approach would be to utilize
   * ArrayList and emulate stack using it.
   */
  @ThreadSafe
  private final class TaskStack extends ThreadLocal<List<TaskData>> {
    @Override
    public List<TaskData> initialValue() {
      return new ArrayList<>();
    }

    public TaskData peek() {
      List<TaskData> list = get();
      if (list.isEmpty()) {
        return null;
      }
      return list.get(list.size() - 1);
    }

    public TaskData pop() {
      List<TaskData> list = get();
      return list.remove(list.size() - 1);
    }

    public boolean isEmpty() {
      return get().isEmpty();
    }

    public void push(ProfilerTask eventType, String description) {
      get().add(create(clock.nanoTime(), eventType, description));
    }

    public TaskData create(long startTimeNanos, ProfilerTask eventType, String description) {
      return new TaskData(taskId.incrementAndGet(), startTimeNanos, peek(), eventType, description);
    }

    @Override
    public String toString() {
      StringBuilder builder = new StringBuilder(
          "Current task stack for thread " + Thread.currentThread().getName() + ":\n");
      List<TaskData> list = get();
      for (int i = list.size() - 1; i >= 0; i--) {
        builder.append(list.get(i));
        builder.append("\n");
      }
      return builder.toString();
    }
  }

  /**
   * Implements datastore for object description indices. Intended to be used only by the
   * Profiler.save() method.
   */
  @ThreadCompatible
  private static final class ObjectDescriber {
    private Map<Object, Integer> descMap = new IdentityHashMap<>(2000);
    private int indexCounter = 0;

    ObjectDescriber() { }

    int getDescriptionIndex(String description) {
      Integer index = descMap.get(description);
      return (index != null) ? index : -1;
    }

    String memoizeDescription(String description) {
      Integer oldIndex = descMap.put(description, indexCounter++);
      // Do not use Preconditions class below due to the rather expensive
      // toString() calls used in the message.
      if (oldIndex != null) {
        throw new IllegalStateException(
            description
                + "' @ "
                + System.identityHashCode(description)
                + " already had description index "
                + oldIndex
                + " while assigning index "
                + descMap.get(description));
      } else if (description.length() > 20000) {
        // Note size 64k byte limitation in DataOutputStream#writeUTF().
        description = description.substring(0, 20000);
      }
      return description;
    }

    boolean isUnassigned(int index) {
      return (index < 0);
    }
  }

  /**
   * Aggregator class that keeps track of the slowest tasks of the specified type.
   *
   * <p><code>extremaAggregators</p> is sharded so that all threads need not compete for the same
   * lock if they do the same operation at the same time. Access to an individual {@link Extrema}
   * is synchronized on the {@link Extrema} instance itself.
   */
  private static final class SlowestTaskAggregator {
    private static final int SHARDS = 16;
    private final int size;

    @SuppressWarnings({"unchecked", "rawtypes"})
    private final Extrema<SlowTask>[] extremaAggregators = new Extrema[SHARDS];

    SlowestTaskAggregator(int size) {
      this.size = size;

      for (int i = 0; i < SHARDS; i++) {
        extremaAggregators[i] = Extrema.max(size);
      }
    }

    // @ThreadSafe
    void add(TaskData taskData) {
      Extrema<SlowTask> extrema =
          extremaAggregators[(int) (Thread.currentThread().getId() % SHARDS)];
      synchronized (extrema) {
        extrema.aggregate(new SlowTask(taskData));
      }
    }

    // @ThreadSafe
    void clear() {
      for (int i = 0; i < SHARDS; i++) {
        Extrema<SlowTask> extrema = extremaAggregators[i];
        synchronized (extrema) {
          extrema.clear();
        }
      }
    }

    // @ThreadSafe
    Iterable<SlowTask> getSlowestTasks() {
      // This is slow, but since it only happens during the end of the invocation, it's OK
      Extrema<SlowTask> mergedExtrema = Extrema.max(size);
      for (int i = 0; i < SHARDS; i++) {
        Extrema<SlowTask> extrema = extremaAggregators[i];
        synchronized (extrema) {
          for (SlowTask task : extrema.getExtremeElements()) {
            mergedExtrema.aggregate(task);
          }
        }
      }
      return mergedExtrema.getExtremeElements();
    }
  }

  /**
   * Which {@link ProfilerTask}s are profiled.
   */
  public enum ProfiledTaskKinds {
    /**
     * Do not profile anything.
     *
     * <p>Performance is best with this case, but we lose critical path analysis and slowest
     * operation tracking.
     */
    NONE {
      @Override
      boolean isProfiling(ProfilerTask type) {
        return false;
      }
    },

    /**
     * Profile on a few, known-to-be-slow tasks.
     *
     * <p>Performance is somewhat decreased in comparison to {@link #NONE}, but we still track the
     * slowest operations (VFS).
     */
    SLOWEST {
      @Override
      boolean isProfiling(ProfilerTask type) {
        return type.collectsSlowestInstances();
      }
    },

    /** A set of tasks that's useful for the Json trace output. */
    ALL_FOR_TRACE {
      @Override
      boolean isProfiling(ProfilerTask type) {
        return !type.isVfs()
            // CRITICAL_PATH corresponds to writing the file.
            && type != ProfilerTask.CRITICAL_PATH
            && type != ProfilerTask.SKYFUNCTION
            && type != ProfilerTask.ACTION_EXECUTE
            && type != ProfilerTask.ACTION_COMPLETE
            && !type.isSkylark();
      }
    },

    /**
     * Profile all tasks.
     *
     * <p>This is in use when {@code --profile} is specified.
     */
    ALL {
      @Override
      boolean isProfiling(ProfilerTask type) {
        return true;
      }
    };

    /** Whether the Profiler collects data for the given task type. */
    abstract boolean isProfiling(ProfilerTask type);
  }

  private Clock clock;
  private ProfiledTaskKinds profiledTaskKinds;
  private volatile long profileStartTime;
  private volatile boolean recordAllDurations = false;

  /** This counter provides a unique id for every task, used to provide a parent/child relation. */
  private AtomicInteger taskId = new AtomicInteger();

  /**
   * The reference to the current writer, if any. If the referenced writer is null, then disk writes
   * are disabled. This can happen when slowest task recording is enabled.
   */
  private AtomicReference<FileWriter> writerRef = new AtomicReference<>();

  /**
   * This is a per-thread data structure that's used to track the current stack of open tasks, the
   * purpose of which is to track the parent id of every task. This is also used to ensure that
   * {@link #profile} and {@link #completeTask} calls always occur in pairs.
   */
  // TODO(ulfjack): We can infer the parent/child relationship after the fact instead of tracking it
  // at runtime. That would allow us to remove this data structure entirely.
  private TaskStack taskStack;

  private final SlowestTaskAggregator[] slowestTasks =
      new SlowestTaskAggregator[ProfilerTask.values().length];

  private final StatRecorder[] tasksHistograms = new StatRecorder[ProfilerTask.values().length];

  private Profiler() {
    initHistograms();
    for (ProfilerTask task : ProfilerTask.values()) {
      if (task.slowestInstancesCount != 0) {
        slowestTasks[task.ordinal()] = new SlowestTaskAggregator(task.slowestInstancesCount);
      }
    }
  }

  private void initHistograms() {
    for (ProfilerTask task : ProfilerTask.values()) {
      if (task.isVfs()) {
        Map<String, ? extends Predicate<? super String>> vfsHeuristics =
            VfsHeuristics.vfsTypeHeuristics;
        List<RecorderAndPredicate> recorders = new ArrayList<>(vfsHeuristics.size());
        for (Map.Entry<String, ? extends Predicate<? super String>> e : vfsHeuristics.entrySet()) {
          recorders.add(new RecorderAndPredicate(
              new SingleStatRecorder(task + " " + e.getKey(), HISTOGRAM_BUCKETS), e.getValue()));
        }
        tasksHistograms[task.ordinal()] = new PredicateBasedStatRecorder(recorders);
      } else {
        tasksHistograms[task.ordinal()] = new SingleStatRecorder(task, HISTOGRAM_BUCKETS);
      }
    }
  }

  /**
   * Returns task histograms. This must be called between calls to {@link #start} and {@link #stop},
   * or the returned recorders are all empty. Note that the returned recorders may still be modified
   * concurrently (but at least they are thread-safe, so that's good).
   *
   * <p>The stat recorders are indexed by {@code ProfilerTask#ordinal}.
   */
  // TODO(ulfjack): This returns incomplete data by design. Maybe we should return the histograms on
  // stop instead? However, this is currently only called from one location in a module, and that
  // can't call stop itself. What to do?
  public ImmutableList<StatRecorder> getTasksHistograms() {
    return ImmutableList.copyOf(tasksHistograms);
  }

  public static Profiler instance() {
    return instance;
  }

  /**
   * Returns the nanoTime of the current profiler instance, or an arbitrary
   * constant if not active.
   */
  public static long nanoTimeMaybe() {
    if (instance.isActive()) {
      return instance.clock.nanoTime();
    }
    return -1;
  }

  /**
   * Enable profiling.
   *
   * <p>Subsequent calls to beginTask/endTask will be recorded in the provided output stream. Please
   * note that stream performance is extremely important and buffered streams should be utilized.
   *
   * @param profiledTaskKinds which kinds of {@link ProfilerTask}s to track
   * @param stream output stream to store profile data. Note: passing unbuffered stream object
   *     reference may result in significant performance penalties
   * @param comment a comment to insert in the profile data
   * @param recordAllDurations iff true, record all tasks regardless of their duration; otherwise
   *     some tasks may get aggregated if they finished quick enough
   * @param clock a {@code BlazeClock.instance()}
   * @param execStartTimeNanos execution start time in nanos obtained from {@code clock.nanoTime()}
   */
  public synchronized void start(
      ProfiledTaskKinds profiledTaskKinds,
      OutputStream stream,
      Format format,
      String comment,
      boolean recordAllDurations,
      Clock clock,
      long execStartTimeNanos)
      throws IOException {
    Preconditions.checkState(!isActive(), "Profiler already active");
    initHistograms();

    this.profiledTaskKinds = profiledTaskKinds;
    this.clock = clock;

    // sanity check for current limitation on the number of supported types due
    // to using enum.ordinal() to store them instead of EnumSet for performance reasons.
    Preconditions.checkState(TASK_COUNT < 256,
        "The profiler implementation supports only up to 255 different ProfilerTask values.");

    // reset state for the new profiling session
    taskId.set(0);
    this.recordAllDurations = recordAllDurations;
    this.taskStack = new TaskStack();
    FileWriter writer = null;
    if (stream != null && format != null) {
      switch (format) {
        case BINARY_BAZEL_FORMAT:
          writer = new BinaryFormatWriter(stream, execStartTimeNanos, comment);
          break;
        case JSON_TRACE_FILE_FORMAT:
          writer = new JsonTraceFileWriter(stream, execStartTimeNanos);
          break;
        case JSON_TRACE_FILE_COMPRESSED_FORMAT:
          writer = new JsonTraceFileWriter(new GZIPOutputStream(stream), execStartTimeNanos);
      }
      writer.start();
    }
    this.writerRef.set(writer);

    // activate profiler
    profileStartTime = execStartTimeNanos;
  }

  /**
   * Returns task histograms. This must be called between calls to {@link #start} and {@link #stop},
   * or the returned list is empty.
   */
  // TODO(ulfjack): This returns incomplete data by design. Also see getTasksHistograms.
  public synchronized Iterable<SlowTask> getSlowestTasks() {
    List<Iterable<SlowTask>> slowestTasksByType = new ArrayList<>();

    for (SlowestTaskAggregator aggregator : slowestTasks) {
      if (aggregator != null) {
        slowestTasksByType.add(aggregator.getSlowestTasks());
      }
    }

    return Iterables.concat(slowestTasksByType);
  }

  /**
   * Disable profiling and complete profile file creation.
   * Subsequent calls to beginTask/endTask will no longer
   * be recorded in the profile.
   */
  public synchronized void stop() throws IOException {
    if (!isActive()) {
      return;
    }
    // Log a final event to update the duration of ProfilePhase.FINISH.
    logEvent(ProfilerTask.INFO, "Finishing");
    FileWriter writer = writerRef.getAndSet(null);
    if (writer != null) {
      writer.shutdown();
      writer = null;
    }
    taskStack = null;
    initHistograms();
    profileStartTime = 0L;

    for (SlowestTaskAggregator aggregator : slowestTasks) {
      if (aggregator != null) {
        aggregator.clear();
      }
    }
  }

  /**
   *  Returns true iff profiling is currently enabled.
   */
  public boolean isActive() {
    return profileStartTime != 0L;
  }

  public boolean isProfiling(ProfilerTask type) {
    return profiledTaskKinds.isProfiling(type);
  }

  /**
   * Unless --record_full_profiler_data is given we drop small tasks and add their time to the
   * parents duration.
   */
  private boolean wasTaskSlowEnoughToRecord(ProfilerTask type, long duration) {
    return (recordAllDurations || duration >= type.minDuration);
  }

  /**
   * Adds task directly to the main queue bypassing task stack. Used for simple tasks that are known
   * to not have any subtasks.
   *
   * @param startTimeNanos task start time (obtained through {@link Profiler#nanoTimeMaybe()})
   * @param duration task duration
   * @param type task type
   * @param description task description. May be stored until end of build.
   */
  private void logTask(long startTimeNanos, long duration, ProfilerTask type, String description) {
    Preconditions.checkNotNull(description);
    Preconditions.checkState(startTimeNanos > 0, "startTime was %s", startTimeNanos);
    Preconditions.checkState(!"".equals(description), "No description -> not helpful");
    if (duration < 0) {
      // See note in Clock#nanoTime, which is used by Profiler#nanoTimeMaybe.
      duration = 0;
    }

    tasksHistograms[type.ordinal()].addStat(
        (int) TimeUnit.NANOSECONDS.toMillis(duration), description);
    // Store instance fields as local variables so they are not nulled out from under us by #clear.
    TaskStack localStack = taskStack;
    FileWriter currentWriter = writerRef.get();
    if (localStack == null) {
      // Variables have been nulled out by #clear in between the check the caller made and this
      // point in the code. Probably due to an asynchronous crash.
      logger.severe("Variables null in profiler for " + type + ", probably due to async crash");
      return;
    }
    TaskData parent = localStack.peek();
    if (parent != null) {
      parent.aggregateChild(type, duration);
    }
    if (wasTaskSlowEnoughToRecord(type, duration)) {
      TaskData data = localStack.create(startTimeNanos, type, description);
      data.duration = duration;
      if (currentWriter != null) {
        currentWriter.enqueue(data);
      }

      SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];

      if (aggregator != null) {
        aggregator.add(data);
      }
    }
  }

  /**
   * Used externally to submit simple task (one that does not have any subtasks). Depending on the
   * minDuration attribute of the task type, task may be just aggregated into the parent task and
   * not stored directly.
   *
   * @param startTime task start time (obtained through {@link Profiler#nanoTimeMaybe()})
   * @param type task type
   * @param description task description. May be stored until the end of the build.
   */
  public void logSimpleTask(long startTime, ProfilerTask type, String description) {
    if (isActive() && isProfiling(type)) {
      logTask(startTime, clock.nanoTime() - startTime, type, description);
    }
  }

  /**
   * Used externally to submit simple task (one that does not have any subtasks). Depending on the
   * minDuration attribute of the task type, task may be just aggregated into the parent task and
   * not stored directly.
   *
   * <p>Note that start and stop time must both be acquired from the same clock instance.
   *
   * @param startTimeNanos task start time
   * @param stopTimeNanos task stop time
   * @param type task type
   * @param description task description. May be stored until the end of the build.
   */
  public void logSimpleTask(
      long startTimeNanos, long stopTimeNanos, ProfilerTask type, String description) {
    if (isActive() && isProfiling(type)) {
      logTask(startTimeNanos, stopTimeNanos - startTimeNanos, type, description);
    }
  }

  /**
   * Used externally to submit simple task (one that does not have any subtasks). Depending on the
   * minDuration attribute of the task type, task may be just aggregated into the parent task and
   * not stored directly.
   *
   * @param startTimeNanos task start time (obtained through {@link Profiler#nanoTimeMaybe()})
   * @param duration the duration of the task
   * @param type task type
   * @param description task description. May be stored until the end of the build.
   */
  public void logSimpleTaskDuration(
      long startTimeNanos, Duration duration, ProfilerTask type, String description) {
    if (isActive() && isProfiling(type)) {
      logTask(startTimeNanos, duration.toNanos(), type, description);
    }
  }

  /** Used to log "events" - tasks with zero duration. */
  void logEvent(ProfilerTask type, String description) {
    if (isActive() && isProfiling(type)) {
      logTask(clock.nanoTime(), 0, type, description);
    }
  }

  /**
   * Records the beginning of a task as specified, and returns a {@link SilentCloseable} instance
   * that ends the task. This lets the system do the work of ending the task, with the compiler
   * giving a warning if the returned instance is not closed.
   *
   * <p>Use of this method allows to support nested task monitoring. For tasks that are known to not
   * have any subtasks, logSimpleTask() should be used instead.
   *
   * <p>Use like this:
   * <pre>
   * {@code
   * try (SilentCloseable c = Profiler.instance().profile(type, "description")) {
   *   // Your code here.
   * }
   * }
   * </pre>
   *
   * @param type predefined task type - see ProfilerTask for available types.
   * @param description task description. May be stored until the end of the build.
   */
  public SilentCloseable profile(ProfilerTask type, String description) {
    // ProfilerInfo.allTasksById is supposed to be an id -> Task map, but it is in fact a List,
    // which means that we cannot drop tasks to which we had already assigned ids. Therefore,
    // non-leaf tasks must not have a minimum duration. However, we don't quite consistently
    // enforce this, and Blaze only works because we happen not to add child tasks to those parent
    // tasks that have a minimum duration.
    Preconditions.checkNotNull(description);
    if (isActive() && isProfiling(type)) {
      taskStack.push(type, description);
      return () -> completeTask(type);
    } else {
      return () -> {};
    }
  }

  /**
   * Records the beginning of a task as specified, and returns a {@link SilentCloseable} instance
   * that ends the task. This lets the system do the work of ending the task, with the compiler
   * giving a warning if the returned instance is not closed.
   *
   * <p>Use of this method allows to support nested task monitoring. For tasks that are known to not
   * have any subtasks, logSimpleTask() should be used instead.
   *
   * <p>This is a convenience method that uses {@link ProfilerTask#INFO}.
   *
   * <p>Use like this:
   * <pre>
   * {@code
   * try (SilentCloseable c = Profiler.instance().profile("description")) {
   *   // Your code here.
   * }
   * }
   * </pre>
   *
   * @param description task description. May be stored until the end of the build.
   */
  public SilentCloseable profile(String description) {
    return profile(ProfilerTask.INFO, description);
  }

  /**
   * Records the end of the task and moves tasks from the thread-local stack to
   * the main queue. Will validate that given task type matches task at the top
   * of the stack.
   *
   * @param type task type.
   */
  private void completeTask(ProfilerTask type) {
    if (isActive() && isProfiling(type)) {
      long endTime = clock.nanoTime();
      TaskData data = taskStack.pop();
      Preconditions.checkState(
          data.type == type,
          "Inconsistent Profiler.completeTask() call: should have been %s but got %s (%s, %s)",
          data.type,
          type,
          data,
          taskStack);
      data.duration = endTime - data.startTimeNanos;
      if (data.parentId > 0) {
        taskStack.peek().aggregateChild(data.type, data.duration);
      }
      boolean shouldRecordTask = wasTaskSlowEnoughToRecord(type, data.duration);
      FileWriter writer = writerRef.get();
      if ((shouldRecordTask || data.counts != null) && writer != null) {
        writer.enqueue(data);
      }

      if (shouldRecordTask) {
        SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];
        if (aggregator != null) {
          aggregator.add(data);
        }
      }
    }
  }

  /** Convenience method to log phase marker tasks. */
  public void markPhase(ProfilePhase phase) throws InterruptedException {
    MemoryProfiler.instance().markPhase(phase);
    if (isActive() && isProfiling(ProfilerTask.PHASE)) {
      Preconditions.checkState(taskStack.isEmpty(), "Phase tasks must not be nested");
      logEvent(ProfilerTask.PHASE, phase.description);
    }
  }

  private abstract static class FileWriter implements Runnable {
    protected final BlockingQueue<TaskData> queue;
    protected final Thread thread;
    protected IOException savedException;

    FileWriter() {
      this.queue = new LinkedBlockingDeque<>();
      this.thread = new Thread(this);
    }

    public void shutdown() throws IOException {
      // Add poison pill to queue and then wait for writer thread to shut down.
      queue.add(POISON_PILL);
      try {
        thread.join();
      } catch (InterruptedException e) {
        thread.interrupt();
        Thread.currentThread().interrupt();
      }
      if (savedException != null) {
        throw savedException;
      }
    }

    public void start() {
      thread.start();
    }

    public void enqueue(TaskData data) {
      queue.add(data);
    }
  }

  /** Writes the profile in the binary Bazel profile format. */
  private static class BinaryFormatWriter extends FileWriter {
    private final OutputStream outStream;
    private final long profileStartTime;
    private final String comment;

    BinaryFormatWriter(OutputStream outStream, long profileStartTime, String comment) {
      // Wrapping deflater stream in the buffered stream proved to reduce CPU consumption caused by
      // the write() method. Values for buffer sizes were chosen by running small amount of tests
      // and identifying point of diminishing returns - but I have not really tried to optimize
      // them.
      this.outStream = outStream;
      this.profileStartTime = profileStartTime;
      this.comment = comment;
    }

    private static void writeHeader(DataOutputStream out, String comment) throws IOException {
      out.writeInt(MAGIC); // magic
      out.writeInt(VERSION); // protocol_version
      out.writeUTF(comment);
      // ProfileTask.values() method sorts enums using their ordinal() value, so
      // there there is no need to store ordinal() value for each entry.
      out.writeInt(TASK_COUNT);
      for (ProfilerTask type : ProfilerTask.values()) {
        out.writeUTF(type.toString());
      }
    }

    /**
     * Saves all gathered information from taskQueue queue to the file.
     * Method is invoked internally by the Timer-based thread and at the end of
     * profiling session.
     */
    @Override
    public void run() {
      try {
        boolean receivedPoisonPill = false;
        try (DataOutputStream out =
            new DataOutputStream(
                new BufferedOutputStream(
                    new DeflaterOutputStream(
                        // the DeflaterOutputStream has its own output buffer of 65k, chosen at
                        // random
                        outStream, new Deflater(Deflater.BEST_SPEED, false), 65536),
                    // buffer size, basically chosen at random
                    262144))) {
          writeHeader(out, comment);
          // Allocate the sink once to avoid GC
          ByteBuffer sink = ByteBuffer.allocate(1024);
          ObjectDescriber describer = new ObjectDescriber();
          TaskData data;
          while ((data = queue.take()) != POISON_PILL) {
            ((Buffer) sink).clear();

            VarInt.putVarLong(data.threadId, sink);
            VarInt.putVarInt(data.id, sink);
            VarInt.putVarInt(data.parentId, sink);
            VarInt.putVarLong(data.startTimeNanos - profileStartTime, sink);
            VarInt.putVarLong(data.duration, sink);

            // To save space (and improve performance), convert all description
            // strings to the canonical object and use IdentityHashMap to assign
            // unique numbers for each string.
            int descIndex = describer.getDescriptionIndex(data.description);
            VarInt.putVarInt(descIndex + 1, sink); // Add 1 to avoid encoding negative values.

            // Save types using their ordinal() value
            sink.put((byte) data.type.ordinal());

            // Save aggregated data stats.
            if (data.counts != null) {
              for (int i = 0; i < TASK_COUNT; i++) {
                if (data.counts[i] > 0) {
                  sink.put((byte) i); // aggregated type ordinal value
                  VarInt.putVarInt(data.counts[i], sink);
                  VarInt.putVarLong(data.durations[i], sink);
                }
              }
            }

            out.writeInt(sink.position());
            out.write(sink.array(), 0, sink.position());
            if (describer.isUnassigned(descIndex)) {
              out.writeUTF(describer.memoizeDescription(data.description));
            }
          }
          receivedPoisonPill = true;
          out.writeInt(EOF_MARKER);
        } catch (IOException e) {
          this.savedException = e;
          if (!receivedPoisonPill) {
            while (queue.take() != POISON_PILL) {
              // We keep emptying the queue, but we can't write anything.
            }
          }
        }
      } catch (InterruptedException e) {
        // Exit silently.
      }
    }
  }

  /** Writes the profile in Json Trace file format. */
  private static class JsonTraceFileWriter extends FileWriter {
    private final OutputStream outStream;
    private final long profileStartTimeNanos;
    private final ThreadLocal<Boolean> metadataPosted =
        ThreadLocal.withInitial(() -> Boolean.FALSE);
    // The JDK never returns 0 as thread id so we use that as fake thread id for the critical path.
    private static final long CRITICAL_PATH_THREAD_ID = 0;

    JsonTraceFileWriter(OutputStream outStream, long profileStartTimeNanos) {
      this.outStream = outStream;
      this.profileStartTimeNanos = profileStartTimeNanos;
    }

    @Override
    public void enqueue(TaskData data) {
      if (!metadataPosted.get().booleanValue()) {
        metadataPosted.set(Boolean.TRUE);
        // Create a TaskData object that is special-cased below.
        queue.add(
            new TaskData(
                /* id= */ 0,
                /* startTimeNanos= */ -1,
                /* parent= */ null,
                ProfilerTask.THREAD_NAME,
                Thread.currentThread().getName()));
      }
      queue.add(data);
    }

    /**
     * Saves all gathered information from taskQueue queue to the file.
     * Method is invoked internally by the Timer-based thread and at the end of
     * profiling session.
     */
    @Override
    public void run() {
      try {
        boolean receivedPoisonPill = false;
        try (JsonWriter writer =
            new JsonWriter(
                // The buffer size of 262144 is chosen at random.
                new OutputStreamWriter(
                    new BufferedOutputStream(outStream, 262144), StandardCharsets.UTF_8))) {
          writer.beginArray();
          TaskData data;

          // Generate metadata event for the critical path as thread 0 in disguise.
          writer.setIndent("  ");
          writer.beginObject();
          writer.setIndent("");
          writer.name("name").value("thread_name");
          writer.name("ph").value("M");
          writer.name("pid").value(1);
          writer.name("tid").value(CRITICAL_PATH_THREAD_ID);
          writer.name("args");
          writer.beginObject();
          writer.name("name").value("Critical Path");
          writer.endObject();
          writer.endObject();

          while ((data = queue.take()) != POISON_PILL) {
            if (data.duration == 0 && data.type != ProfilerTask.THREAD_NAME) {
              continue;
            }
            if (data.type == ProfilerTask.THREAD_NAME) {
              writer.setIndent("  ");
              writer.beginObject();
              writer.setIndent("");
              writer.name("name").value("thread_name");
              writer.name("ph").value("M");
              writer.name("pid").value(1);
              writer.name("tid").value(data.threadId);
              writer.name("args");

              writer.beginObject();
              writer.name("name").value(data.description);
              writer.endObject();

              writer.endObject();
              continue;
            }
            String eventType = data.duration == 0 ? "i" : "X";
            writer.setIndent("  ");
            writer.beginObject();
            writer.setIndent("");
            writer.name("cat").value(data.type.description);
            writer.name("name").value(data.description);
            writer.name("ph").value(eventType);
            writer.name("ts")
                .value(TimeUnit.NANOSECONDS.toMicros(data.startTimeNanos - profileStartTimeNanos));
            if (data.duration != 0) {
              writer.name("dur").value(TimeUnit.NANOSECONDS.toMicros(data.duration));
            }
            writer.name("pid").value(1);
            long threadId =
                data.type == ProfilerTask.CRITICAL_PATH_COMPONENT
                    ? CRITICAL_PATH_THREAD_ID
                    : data.threadId;
            writer.name("tid").value(threadId);
            writer.endObject();
          }
          receivedPoisonPill = true;
          writer.setIndent("  ");
          writer.endArray();
        } catch (IOException e) {
          this.savedException = e;
          if (!receivedPoisonPill) {
            while (queue.take() != POISON_PILL) {
              // We keep emptying the queue, but we can't write anything.
            }
          }
        }
      } catch (InterruptedException e) {
        // Exit silently.
      }
    }
  }
}