diff options
author | jmmv <jmmv@google.com> | 2018-06-11 13:30:38 -0700 |
---|---|---|
committer | Copybara-Service <copybara-piper@google.com> | 2018-06-11 13:32:59 -0700 |
commit | 4915e823976adf6738ec1c89abda8ed8f1f5e368 (patch) | |
tree | b9faefc694537c68e06b00e560ea19f4b89c7326 /src/main/java/com/google/devtools/build/lib/profiler/Profiler.java | |
parent | c7eef96da80ace358e1d19c7b090765747281cfd (diff) |
Automated rollback of commit 15b8c259db111012b4642287172cb4d1d82151f3.
*** Reason for rollback ***
Breaks internal performance tests.
PiperOrigin-RevId: 200103033
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/profiler/Profiler.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/profiler/Profiler.java | 294 |
1 files changed, 134 insertions, 160 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java index 652756b58c..0bf0e4117b 100644 --- a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java +++ b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java @@ -37,11 +37,11 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import java.util.logging.Logger; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; @@ -130,6 +130,10 @@ public final class Profiler { // EOF marker. Must be < 0. public static final int EOF_MARKER = -1; + // Profiler will check for gathered data and persist all of it in the + // separate thread every SAVE_DELAY ms. + private static final int SAVE_DELAY = 2000; // ms + /** * The profiler (a static singleton instance). Inactive by default. */ @@ -137,13 +141,6 @@ public final class Profiler { private static final int HISTOGRAM_BUCKETS = 20; - private static final TaskData POISON_PILL = new TaskData(0, 0, null, null, "poison pill"); - - /** File format enum. */ - public static enum Format { - BINARY_BAZEL_FORMAT; - } - /** A task that was very slow. */ public static final class SlowTask implements Comparable<SlowTask> { final long durationNanos; @@ -189,7 +186,7 @@ public final class Profiler { * methods is. */ @ThreadCompatible - private static final class TaskData { + private final class TaskData { final long threadId; final long startTimeNanos; final int id; @@ -201,11 +198,12 @@ public final class Profiler { int[] counts; // number of invocations per ProfilerTask type long[] durations; // time spend in the task per ProfilerTask type - TaskData( - int id, long startTimeNanos, TaskData parent, ProfilerTask eventType, String description) { - this.id = id; - this.threadId = Thread.currentThread().getId(); - this.parentId = (parent == null ? 0 : parent.id); + TaskData(long startTimeNanos, TaskData parent, ProfilerTask eventType, String description) { + threadId = Thread.currentThread().getId(); + counts = null; + durations = null; + id = taskId.incrementAndGet(); + parentId = (parent == null ? 0 : parent.id); this.startTimeNanos = startTimeNanos; this.type = eventType; this.description = Preconditions.checkNotNull(description); @@ -240,6 +238,7 @@ public final class Profiler { */ @ThreadSafe private final class TaskStack extends ThreadLocal<List<TaskData>> { + @Override public List<TaskData> initialValue() { return new ArrayList<>(); @@ -267,7 +266,7 @@ public final class Profiler { } public TaskData create(long startTimeNanos, ProfilerTask eventType, String description) { - return new TaskData(taskId.incrementAndGet(), startTimeNanos, peek(), eventType, description); + return new TaskData(startTimeNanos, peek(), eventType, description); } @Override @@ -433,13 +432,15 @@ public final class Profiler { private volatile boolean recordAllDurations = false; private AtomicInteger taskId = new AtomicInteger(); - private Thread writerThread; private TaskStack taskStack; - private BlockingQueue<TaskData> taskQueue; + private Queue<TaskData> taskQueue; + private DataOutputStream out; + private Timer timer; private IOException saveException; + private ObjectDescriber describer; @SuppressWarnings("unchecked") private final SlowestTaskAggregator[] slowestTasks = - new SlowestTaskAggregator[ProfilerTask.values().length]; + new SlowestTaskAggregator[ProfilerTask.values().length]; private final StatRecorder[] tasksHistograms = new StatRecorder[ProfilerTask.values().length]; @@ -507,15 +508,15 @@ public final class Profiler { public synchronized void start( ProfiledTaskKinds profiledTaskKinds, OutputStream stream, - Format format, String comment, boolean recordAllDurations, Clock clock, - long execStartTimeNanos) { + long execStartTimeNanos) + throws IOException { Preconditions.checkState(!isActive(), "Profiler already active"); taskStack = new TaskStack(); - taskQueue = new LinkedBlockingDeque<>(); - initHistograms(); + taskQueue = new ConcurrentLinkedQueue<>(); + describer = new ObjectDescriber(); this.profiledTaskKinds = profiledTaskKinds; this.clock = clock; @@ -529,27 +530,37 @@ public final class Profiler { taskId.set(0); this.recordAllDurations = recordAllDurations; this.saveException = null; - this.writerThread = null; if (stream != null) { - if (format == Format.BINARY_BAZEL_FORMAT) { - BinaryFormatWriter writer = - new BinaryFormatWriter( - stream, taskQueue, profileStartTime, comment, (IOException e) -> abortWriting(e)); - // Start save thread - writerThread = new Thread(writer); - writerThread.start(); + this.timer = new Timer("ProfilerTimer", true); + // Wrapping deflater stream in the buffered stream proved to reduce CPU consumption caused by + // the save() method. Values for buffer sizes were chosen by running small amount of tests + // and identifying point of diminishing returns - but I have not really tried to optimize + // them. + this.out = new DataOutputStream(new BufferedOutputStream(new DeflaterOutputStream( + stream, new Deflater(Deflater.BEST_SPEED, false), 65536), 262144)); + + this.out.writeInt(MAGIC); // magic + this.out.writeInt(VERSION); // protocol_version + this.out.writeUTF(comment); + // ProfileTask.values() method sorts enums using their ordinal() value, so + // there there is no need to store ordinal() value for each entry. + this.out.writeInt(TASK_COUNT); + for (ProfilerTask type : ProfilerTask.values()) { + this.out.writeUTF(type.toString()); } + + // Start save thread + timer.schedule(new TimerTask() { + @Override public void run() { save(); } + }, SAVE_DELAY, SAVE_DELAY); + } else { + this.out = null; } // activate profiler profileStartTime = execStartTimeNanos; } - private synchronized void abortWriting(IOException e) { - saveException = e; - writerThread = null; - } - public synchronized Iterable<SlowTask> getSlowestTasks() { List<Iterable<SlowTask>> slowestTasksByType = new ArrayList<>(); @@ -576,21 +587,8 @@ public final class Profiler { } // Log a final event to update the duration of ProfilePhase.FINISH. logEvent(ProfilerTask.INFO, "Finishing"); - if (writerThread != null) { - // Add poison pill to queue and then wait for writer thread to shut down. - taskQueue.add(POISON_PILL); - try { - writerThread.join(); - } catch (InterruptedException e) { - writerThread.interrupt(); - Thread.currentThread().interrupt(); - } - writerThread = null; - } - initHistograms(); - profileStartTime = 0L; - taskStack = null; - taskQueue = null; + save(); + clear(); for (SlowestTaskAggregator aggregator : slowestTasks) { if (aggregator != null) { @@ -601,6 +599,11 @@ public final class Profiler { if (saveException != null) { throw saveException; } + if (out != null) { + out.writeInt(EOF_MARKER); + out.close(); + out = null; + } } /** @@ -615,6 +618,81 @@ public final class Profiler { } /** + * Saves all gathered information from taskQueue queue to the file. + * Method is invoked internally by the Timer-based thread and at the end of + * profiling session. + */ + private synchronized void save() { + if (out == null) { + return; + } + try { + // Allocate the sink once to avoid GC + ByteBuffer sink = ByteBuffer.allocate(1024); + TaskData data; + while ((data = taskQueue.poll()) != null) { + sink.clear(); + + VarInt.putVarLong(data.threadId, sink); + VarInt.putVarInt(data.id, sink); + VarInt.putVarInt(data.parentId, sink); + VarInt.putVarLong(data.startTimeNanos - profileStartTime, sink); + VarInt.putVarLong(data.duration, sink); + + // To save space (and improve performance), convert all description + // strings to the canonical object and use IdentityHashMap to assign + // unique numbers for each string. + int descIndex = describer.getDescriptionIndex(data.description); + VarInt.putVarInt(descIndex + 1, sink); // Add 1 to avoid encoding negative values. + + // Save types using their ordinal() value + sink.put((byte) data.type.ordinal()); + + // Save aggregated data stats. + if (data.counts != null) { + for (int i = 0; i < TASK_COUNT; i++) { + if (data.counts[i] > 0) { + sink.put((byte) i); // aggregated type ordinal value + VarInt.putVarInt(data.counts[i], sink); + VarInt.putVarLong(data.durations[i], sink); + } + } + } + + this.out.writeInt(sink.position()); + this.out.write(sink.array(), 0, sink.position()); + if (describer.isUnassigned(descIndex)) { + this.out.writeUTF(describer.memoizeDescription(data.description)); + } + } + this.out.flush(); + } catch (IOException e) { + saveException = e; + clear(); + try { + out.close(); + } catch (IOException e2) { + // ignore it + } + } + } + + private synchronized void clear() { + initHistograms(); + profileStartTime = 0L; + if (timer != null) { + timer.cancel(); + timer = null; + } + taskStack = null; + taskQueue = null; + describer = null; + + // Note that slowest task aggregator are not cleared here because clearing happens + // periodically over the course of a command invocation. + } + + /** * Unless --record_full_profiler_data is given we drop small tasks and add their time to the * parents duration. */ @@ -657,7 +735,9 @@ public final class Profiler { if (wasTaskSlowEnoughToRecord(type, duration)) { TaskData data = localStack.create(startTimeNanos, type, description); data.duration = duration; - localQueue.add(data); + if (out != null) { + localQueue.add(data); + } SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()]; @@ -783,7 +863,7 @@ public final class Profiler { taskStack.peek().aggregateChild(data.type, data.duration); } boolean shouldRecordTask = wasTaskSlowEnoughToRecord(type, data.duration); - if (shouldRecordTask || data.counts != null) { + if (out != null && (shouldRecordTask || data.counts != null)) { taskQueue.add(data); } @@ -805,110 +885,4 @@ public final class Profiler { logEvent(ProfilerTask.PHASE, phase.description); } } - - /** Writes the profile in the binary Bazel profile format. */ - private static class BinaryFormatWriter implements Runnable { - private final DataOutputStream out; - private final BlockingQueue<TaskData> queue; - private final long profileStartTime; - private final String comment; - private final Consumer<? super IOException> failureHandler; - - BinaryFormatWriter( - OutputStream out, - BlockingQueue<TaskData> queue, - long profileStartTime, - String comment, - Consumer<? super IOException> failureHandler) { - // Wrapping deflater stream in the buffered stream proved to reduce CPU consumption caused by - // the write() method. Values for buffer sizes were chosen by running small amount of tests - // and identifying point of diminishing returns - but I have not really tried to optimize - // them. - this.out = - new DataOutputStream( - new BufferedOutputStream( - new DeflaterOutputStream( - // the DeflaterOutputStream has its own output buffer of 65k, chosen at random - out, new Deflater(Deflater.BEST_SPEED, false), 65536), - 262144)); // buffer size, basically chosen at random - this.queue = queue; - this.profileStartTime = profileStartTime; - this.comment = comment; - this.failureHandler = failureHandler; - } - - private void writeHeader() throws IOException { - out.writeInt(MAGIC); // magic - out.writeInt(VERSION); // protocol_version - out.writeUTF(comment); - // ProfileTask.values() method sorts enums using their ordinal() value, so - // there there is no need to store ordinal() value for each entry. - out.writeInt(TASK_COUNT); - for (ProfilerTask type : ProfilerTask.values()) { - out.writeUTF(type.toString()); - } - } - - /** - * Saves all gathered information from taskQueue queue to the file. - * Method is invoked internally by the Timer-based thread and at the end of - * profiling session. - */ - @Override - public void run() { - try { - writeHeader(); - // Allocate the sink once to avoid GC - ByteBuffer sink = ByteBuffer.allocate(1024); - ObjectDescriber describer = new ObjectDescriber(); - TaskData data; - while ((data = queue.take()) != POISON_PILL) { - sink.clear(); - - VarInt.putVarLong(data.threadId, sink); - VarInt.putVarInt(data.id, sink); - VarInt.putVarInt(data.parentId, sink); - VarInt.putVarLong(data.startTimeNanos - profileStartTime, sink); - VarInt.putVarLong(data.duration, sink); - - // To save space (and improve performance), convert all description - // strings to the canonical object and use IdentityHashMap to assign - // unique numbers for each string. - int descIndex = describer.getDescriptionIndex(data.description); - VarInt.putVarInt(descIndex + 1, sink); // Add 1 to avoid encoding negative values. - - // Save types using their ordinal() value - sink.put((byte) data.type.ordinal()); - - // Save aggregated data stats. - if (data.counts != null) { - for (int i = 0; i < TASK_COUNT; i++) { - if (data.counts[i] > 0) { - sink.put((byte) i); // aggregated type ordinal value - VarInt.putVarInt(data.counts[i], sink); - VarInt.putVarLong(data.durations[i], sink); - } - } - } - - out.writeInt(sink.position()); - out.write(sink.array(), 0, sink.position()); - if (describer.isUnassigned(descIndex)) { - out.writeUTF(describer.memoizeDescription(data.description)); - } - } - out.writeInt(EOF_MARKER); - out.close(); - } catch (IOException e) { - failureHandler.accept(e); - try { - out.close(); - } catch (IOException e2) { - // ignore it - } - } catch (InterruptedException e) { - // Exit silently. - } - } - } } |