diff options
author | ulfjack <ulfjack@google.com> | 2018-06-11 09:46:50 -0700 |
---|---|---|
committer | Copybara-Service <copybara-piper@google.com> | 2018-06-11 09:48:24 -0700 |
commit | 15b8c259db111012b4642287172cb4d1d82151f3 (patch) | |
tree | 0d46f77b5b25bfd67440c102de54c7de5ff05add /src/main/java/com/google/devtools/build/lib/profiler/Profiler.java | |
parent | 6841a748109250f65448627bc5695d537990b686 (diff) |
Refactor profiler
- move the save method to an inner class
- don't use a timer, use a blocking queue instead
- add a format enum (in anticipation of adding a json output format)
- update the test to use an in memory buffer, and avoid FoundationTestCase
PiperOrigin-RevId: 200065404
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/profiler/Profiler.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/profiler/Profiler.java | 294 |
1 files changed, 160 insertions, 134 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java index 0bf0e4117b..652756b58c 100644 --- a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java +++ b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java @@ -37,11 +37,11 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.logging.Logger; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; @@ -130,10 +130,6 @@ public final class Profiler { // EOF marker. Must be < 0. public static final int EOF_MARKER = -1; - // Profiler will check for gathered data and persist all of it in the - // separate thread every SAVE_DELAY ms. - private static final int SAVE_DELAY = 2000; // ms - /** * The profiler (a static singleton instance). Inactive by default. */ @@ -141,6 +137,13 @@ public final class Profiler { private static final int HISTOGRAM_BUCKETS = 20; + private static final TaskData POISON_PILL = new TaskData(0, 0, null, null, "poison pill"); + + /** File format enum. */ + public static enum Format { + BINARY_BAZEL_FORMAT; + } + /** A task that was very slow. */ public static final class SlowTask implements Comparable<SlowTask> { final long durationNanos; @@ -186,7 +189,7 @@ public final class Profiler { * methods is. */ @ThreadCompatible - private final class TaskData { + private static final class TaskData { final long threadId; final long startTimeNanos; final int id; @@ -198,12 +201,11 @@ public final class Profiler { int[] counts; // number of invocations per ProfilerTask type long[] durations; // time spend in the task per ProfilerTask type - TaskData(long startTimeNanos, TaskData parent, ProfilerTask eventType, String description) { - threadId = Thread.currentThread().getId(); - counts = null; - durations = null; - id = taskId.incrementAndGet(); - parentId = (parent == null ? 0 : parent.id); + TaskData( + int id, long startTimeNanos, TaskData parent, ProfilerTask eventType, String description) { + this.id = id; + this.threadId = Thread.currentThread().getId(); + this.parentId = (parent == null ? 0 : parent.id); this.startTimeNanos = startTimeNanos; this.type = eventType; this.description = Preconditions.checkNotNull(description); @@ -238,7 +240,6 @@ public final class Profiler { */ @ThreadSafe private final class TaskStack extends ThreadLocal<List<TaskData>> { - @Override public List<TaskData> initialValue() { return new ArrayList<>(); @@ -266,7 +267,7 @@ public final class Profiler { } public TaskData create(long startTimeNanos, ProfilerTask eventType, String description) { - return new TaskData(startTimeNanos, peek(), eventType, description); + return new TaskData(taskId.incrementAndGet(), startTimeNanos, peek(), eventType, description); } @Override @@ -432,15 +433,13 @@ public final class Profiler { private volatile boolean recordAllDurations = false; private AtomicInteger taskId = new AtomicInteger(); + private Thread writerThread; private TaskStack taskStack; - private Queue<TaskData> taskQueue; - private DataOutputStream out; - private Timer timer; + private BlockingQueue<TaskData> taskQueue; private IOException saveException; - private ObjectDescriber describer; @SuppressWarnings("unchecked") private final SlowestTaskAggregator[] slowestTasks = - new SlowestTaskAggregator[ProfilerTask.values().length]; + new SlowestTaskAggregator[ProfilerTask.values().length]; private final StatRecorder[] tasksHistograms = new StatRecorder[ProfilerTask.values().length]; @@ -508,15 +507,15 @@ public final class Profiler { public synchronized void start( ProfiledTaskKinds profiledTaskKinds, OutputStream stream, + Format format, String comment, boolean recordAllDurations, Clock clock, - long execStartTimeNanos) - throws IOException { + long execStartTimeNanos) { Preconditions.checkState(!isActive(), "Profiler already active"); taskStack = new TaskStack(); - taskQueue = new ConcurrentLinkedQueue<>(); - describer = new ObjectDescriber(); + taskQueue = new LinkedBlockingDeque<>(); + initHistograms(); this.profiledTaskKinds = profiledTaskKinds; this.clock = clock; @@ -530,37 +529,27 @@ public final class Profiler { taskId.set(0); this.recordAllDurations = recordAllDurations; this.saveException = null; + this.writerThread = null; if (stream != null) { - this.timer = new Timer("ProfilerTimer", true); - // Wrapping deflater stream in the buffered stream proved to reduce CPU consumption caused by - // the save() method. Values for buffer sizes were chosen by running small amount of tests - // and identifying point of diminishing returns - but I have not really tried to optimize - // them. - this.out = new DataOutputStream(new BufferedOutputStream(new DeflaterOutputStream( - stream, new Deflater(Deflater.BEST_SPEED, false), 65536), 262144)); - - this.out.writeInt(MAGIC); // magic - this.out.writeInt(VERSION); // protocol_version - this.out.writeUTF(comment); - // ProfileTask.values() method sorts enums using their ordinal() value, so - // there there is no need to store ordinal() value for each entry. - this.out.writeInt(TASK_COUNT); - for (ProfilerTask type : ProfilerTask.values()) { - this.out.writeUTF(type.toString()); + if (format == Format.BINARY_BAZEL_FORMAT) { + BinaryFormatWriter writer = + new BinaryFormatWriter( + stream, taskQueue, profileStartTime, comment, (IOException e) -> abortWriting(e)); + // Start save thread + writerThread = new Thread(writer); + writerThread.start(); } - - // Start save thread - timer.schedule(new TimerTask() { - @Override public void run() { save(); } - }, SAVE_DELAY, SAVE_DELAY); - } else { - this.out = null; } // activate profiler profileStartTime = execStartTimeNanos; } + private synchronized void abortWriting(IOException e) { + saveException = e; + writerThread = null; + } + public synchronized Iterable<SlowTask> getSlowestTasks() { List<Iterable<SlowTask>> slowestTasksByType = new ArrayList<>(); @@ -587,8 +576,21 @@ public final class Profiler { } // Log a final event to update the duration of ProfilePhase.FINISH. logEvent(ProfilerTask.INFO, "Finishing"); - save(); - clear(); + if (writerThread != null) { + // Add poison pill to queue and then wait for writer thread to shut down. + taskQueue.add(POISON_PILL); + try { + writerThread.join(); + } catch (InterruptedException e) { + writerThread.interrupt(); + Thread.currentThread().interrupt(); + } + writerThread = null; + } + initHistograms(); + profileStartTime = 0L; + taskStack = null; + taskQueue = null; for (SlowestTaskAggregator aggregator : slowestTasks) { if (aggregator != null) { @@ -599,11 +601,6 @@ public final class Profiler { if (saveException != null) { throw saveException; } - if (out != null) { - out.writeInt(EOF_MARKER); - out.close(); - out = null; - } } /** @@ -618,81 +615,6 @@ public final class Profiler { } /** - * Saves all gathered information from taskQueue queue to the file. - * Method is invoked internally by the Timer-based thread and at the end of - * profiling session. - */ - private synchronized void save() { - if (out == null) { - return; - } - try { - // Allocate the sink once to avoid GC - ByteBuffer sink = ByteBuffer.allocate(1024); - TaskData data; - while ((data = taskQueue.poll()) != null) { - sink.clear(); - - VarInt.putVarLong(data.threadId, sink); - VarInt.putVarInt(data.id, sink); - VarInt.putVarInt(data.parentId, sink); - VarInt.putVarLong(data.startTimeNanos - profileStartTime, sink); - VarInt.putVarLong(data.duration, sink); - - // To save space (and improve performance), convert all description - // strings to the canonical object and use IdentityHashMap to assign - // unique numbers for each string. - int descIndex = describer.getDescriptionIndex(data.description); - VarInt.putVarInt(descIndex + 1, sink); // Add 1 to avoid encoding negative values. - - // Save types using their ordinal() value - sink.put((byte) data.type.ordinal()); - - // Save aggregated data stats. - if (data.counts != null) { - for (int i = 0; i < TASK_COUNT; i++) { - if (data.counts[i] > 0) { - sink.put((byte) i); // aggregated type ordinal value - VarInt.putVarInt(data.counts[i], sink); - VarInt.putVarLong(data.durations[i], sink); - } - } - } - - this.out.writeInt(sink.position()); - this.out.write(sink.array(), 0, sink.position()); - if (describer.isUnassigned(descIndex)) { - this.out.writeUTF(describer.memoizeDescription(data.description)); - } - } - this.out.flush(); - } catch (IOException e) { - saveException = e; - clear(); - try { - out.close(); - } catch (IOException e2) { - // ignore it - } - } - } - - private synchronized void clear() { - initHistograms(); - profileStartTime = 0L; - if (timer != null) { - timer.cancel(); - timer = null; - } - taskStack = null; - taskQueue = null; - describer = null; - - // Note that slowest task aggregator are not cleared here because clearing happens - // periodically over the course of a command invocation. - } - - /** * Unless --record_full_profiler_data is given we drop small tasks and add their time to the * parents duration. */ @@ -735,9 +657,7 @@ public final class Profiler { if (wasTaskSlowEnoughToRecord(type, duration)) { TaskData data = localStack.create(startTimeNanos, type, description); data.duration = duration; - if (out != null) { - localQueue.add(data); - } + localQueue.add(data); SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()]; @@ -863,7 +783,7 @@ public final class Profiler { taskStack.peek().aggregateChild(data.type, data.duration); } boolean shouldRecordTask = wasTaskSlowEnoughToRecord(type, data.duration); - if (out != null && (shouldRecordTask || data.counts != null)) { + if (shouldRecordTask || data.counts != null) { taskQueue.add(data); } @@ -885,4 +805,110 @@ public final class Profiler { logEvent(ProfilerTask.PHASE, phase.description); } } + + /** Writes the profile in the binary Bazel profile format. */ + private static class BinaryFormatWriter implements Runnable { + private final DataOutputStream out; + private final BlockingQueue<TaskData> queue; + private final long profileStartTime; + private final String comment; + private final Consumer<? super IOException> failureHandler; + + BinaryFormatWriter( + OutputStream out, + BlockingQueue<TaskData> queue, + long profileStartTime, + String comment, + Consumer<? super IOException> failureHandler) { + // Wrapping deflater stream in the buffered stream proved to reduce CPU consumption caused by + // the write() method. Values for buffer sizes were chosen by running small amount of tests + // and identifying point of diminishing returns - but I have not really tried to optimize + // them. + this.out = + new DataOutputStream( + new BufferedOutputStream( + new DeflaterOutputStream( + // the DeflaterOutputStream has its own output buffer of 65k, chosen at random + out, new Deflater(Deflater.BEST_SPEED, false), 65536), + 262144)); // buffer size, basically chosen at random + this.queue = queue; + this.profileStartTime = profileStartTime; + this.comment = comment; + this.failureHandler = failureHandler; + } + + private void writeHeader() throws IOException { + out.writeInt(MAGIC); // magic + out.writeInt(VERSION); // protocol_version + out.writeUTF(comment); + // ProfileTask.values() method sorts enums using their ordinal() value, so + // there there is no need to store ordinal() value for each entry. + out.writeInt(TASK_COUNT); + for (ProfilerTask type : ProfilerTask.values()) { + out.writeUTF(type.toString()); + } + } + + /** + * Saves all gathered information from taskQueue queue to the file. + * Method is invoked internally by the Timer-based thread and at the end of + * profiling session. + */ + @Override + public void run() { + try { + writeHeader(); + // Allocate the sink once to avoid GC + ByteBuffer sink = ByteBuffer.allocate(1024); + ObjectDescriber describer = new ObjectDescriber(); + TaskData data; + while ((data = queue.take()) != POISON_PILL) { + sink.clear(); + + VarInt.putVarLong(data.threadId, sink); + VarInt.putVarInt(data.id, sink); + VarInt.putVarInt(data.parentId, sink); + VarInt.putVarLong(data.startTimeNanos - profileStartTime, sink); + VarInt.putVarLong(data.duration, sink); + + // To save space (and improve performance), convert all description + // strings to the canonical object and use IdentityHashMap to assign + // unique numbers for each string. + int descIndex = describer.getDescriptionIndex(data.description); + VarInt.putVarInt(descIndex + 1, sink); // Add 1 to avoid encoding negative values. + + // Save types using their ordinal() value + sink.put((byte) data.type.ordinal()); + + // Save aggregated data stats. + if (data.counts != null) { + for (int i = 0; i < TASK_COUNT; i++) { + if (data.counts[i] > 0) { + sink.put((byte) i); // aggregated type ordinal value + VarInt.putVarInt(data.counts[i], sink); + VarInt.putVarLong(data.durations[i], sink); + } + } + } + + out.writeInt(sink.position()); + out.write(sink.array(), 0, sink.position()); + if (describer.isUnassigned(descIndex)) { + out.writeUTF(describer.memoizeDescription(data.description)); + } + } + out.writeInt(EOF_MARKER); + out.close(); + } catch (IOException e) { + failureHandler.accept(e); + try { + out.close(); + } catch (IOException e2) { + // ignore it + } + } catch (InterruptedException e) { + // Exit silently. + } + } + } } |