aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
diff options
context:
space:
mode:
authorGravatar ulfjack <ulfjack@google.com>2018-06-11 09:46:50 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-06-11 09:48:24 -0700
commit15b8c259db111012b4642287172cb4d1d82151f3 (patch)
tree0d46f77b5b25bfd67440c102de54c7de5ff05add /src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
parent6841a748109250f65448627bc5695d537990b686 (diff)
Refactor profiler
- move the save method to an inner class - don't use a timer, use a blocking queue instead - add a format enum (in anticipation of adding a json output format) - update the test to use an in memory buffer, and avoid FoundationTestCase PiperOrigin-RevId: 200065404
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/profiler/Profiler.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/profiler/Profiler.java294
1 files changed, 160 insertions, 134 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
index 0bf0e4117b..652756b58c 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
@@ -37,11 +37,11 @@ import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.logging.Logger;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
@@ -130,10 +130,6 @@ public final class Profiler {
// EOF marker. Must be < 0.
public static final int EOF_MARKER = -1;
- // Profiler will check for gathered data and persist all of it in the
- // separate thread every SAVE_DELAY ms.
- private static final int SAVE_DELAY = 2000; // ms
-
/**
* The profiler (a static singleton instance). Inactive by default.
*/
@@ -141,6 +137,13 @@ public final class Profiler {
private static final int HISTOGRAM_BUCKETS = 20;
+ private static final TaskData POISON_PILL = new TaskData(0, 0, null, null, "poison pill");
+
+ /** File format enum. */
+ public static enum Format {
+ BINARY_BAZEL_FORMAT;
+ }
+
/** A task that was very slow. */
public static final class SlowTask implements Comparable<SlowTask> {
final long durationNanos;
@@ -186,7 +189,7 @@ public final class Profiler {
* methods is.
*/
@ThreadCompatible
- private final class TaskData {
+ private static final class TaskData {
final long threadId;
final long startTimeNanos;
final int id;
@@ -198,12 +201,11 @@ public final class Profiler {
int[] counts; // number of invocations per ProfilerTask type
long[] durations; // time spend in the task per ProfilerTask type
- TaskData(long startTimeNanos, TaskData parent, ProfilerTask eventType, String description) {
- threadId = Thread.currentThread().getId();
- counts = null;
- durations = null;
- id = taskId.incrementAndGet();
- parentId = (parent == null ? 0 : parent.id);
+ TaskData(
+ int id, long startTimeNanos, TaskData parent, ProfilerTask eventType, String description) {
+ this.id = id;
+ this.threadId = Thread.currentThread().getId();
+ this.parentId = (parent == null ? 0 : parent.id);
this.startTimeNanos = startTimeNanos;
this.type = eventType;
this.description = Preconditions.checkNotNull(description);
@@ -238,7 +240,6 @@ public final class Profiler {
*/
@ThreadSafe
private final class TaskStack extends ThreadLocal<List<TaskData>> {
-
@Override
public List<TaskData> initialValue() {
return new ArrayList<>();
@@ -266,7 +267,7 @@ public final class Profiler {
}
public TaskData create(long startTimeNanos, ProfilerTask eventType, String description) {
- return new TaskData(startTimeNanos, peek(), eventType, description);
+ return new TaskData(taskId.incrementAndGet(), startTimeNanos, peek(), eventType, description);
}
@Override
@@ -432,15 +433,13 @@ public final class Profiler {
private volatile boolean recordAllDurations = false;
private AtomicInteger taskId = new AtomicInteger();
+ private Thread writerThread;
private TaskStack taskStack;
- private Queue<TaskData> taskQueue;
- private DataOutputStream out;
- private Timer timer;
+ private BlockingQueue<TaskData> taskQueue;
private IOException saveException;
- private ObjectDescriber describer;
@SuppressWarnings("unchecked")
private final SlowestTaskAggregator[] slowestTasks =
- new SlowestTaskAggregator[ProfilerTask.values().length];
+ new SlowestTaskAggregator[ProfilerTask.values().length];
private final StatRecorder[] tasksHistograms = new StatRecorder[ProfilerTask.values().length];
@@ -508,15 +507,15 @@ public final class Profiler {
public synchronized void start(
ProfiledTaskKinds profiledTaskKinds,
OutputStream stream,
+ Format format,
String comment,
boolean recordAllDurations,
Clock clock,
- long execStartTimeNanos)
- throws IOException {
+ long execStartTimeNanos) {
Preconditions.checkState(!isActive(), "Profiler already active");
taskStack = new TaskStack();
- taskQueue = new ConcurrentLinkedQueue<>();
- describer = new ObjectDescriber();
+ taskQueue = new LinkedBlockingDeque<>();
+ initHistograms();
this.profiledTaskKinds = profiledTaskKinds;
this.clock = clock;
@@ -530,37 +529,27 @@ public final class Profiler {
taskId.set(0);
this.recordAllDurations = recordAllDurations;
this.saveException = null;
+ this.writerThread = null;
if (stream != null) {
- this.timer = new Timer("ProfilerTimer", true);
- // Wrapping deflater stream in the buffered stream proved to reduce CPU consumption caused by
- // the save() method. Values for buffer sizes were chosen by running small amount of tests
- // and identifying point of diminishing returns - but I have not really tried to optimize
- // them.
- this.out = new DataOutputStream(new BufferedOutputStream(new DeflaterOutputStream(
- stream, new Deflater(Deflater.BEST_SPEED, false), 65536), 262144));
-
- this.out.writeInt(MAGIC); // magic
- this.out.writeInt(VERSION); // protocol_version
- this.out.writeUTF(comment);
- // ProfileTask.values() method sorts enums using their ordinal() value, so
- // there there is no need to store ordinal() value for each entry.
- this.out.writeInt(TASK_COUNT);
- for (ProfilerTask type : ProfilerTask.values()) {
- this.out.writeUTF(type.toString());
+ if (format == Format.BINARY_BAZEL_FORMAT) {
+ BinaryFormatWriter writer =
+ new BinaryFormatWriter(
+ stream, taskQueue, profileStartTime, comment, (IOException e) -> abortWriting(e));
+ // Start save thread
+ writerThread = new Thread(writer);
+ writerThread.start();
}
-
- // Start save thread
- timer.schedule(new TimerTask() {
- @Override public void run() { save(); }
- }, SAVE_DELAY, SAVE_DELAY);
- } else {
- this.out = null;
}
// activate profiler
profileStartTime = execStartTimeNanos;
}
+ private synchronized void abortWriting(IOException e) {
+ saveException = e;
+ writerThread = null;
+ }
+
public synchronized Iterable<SlowTask> getSlowestTasks() {
List<Iterable<SlowTask>> slowestTasksByType = new ArrayList<>();
@@ -587,8 +576,21 @@ public final class Profiler {
}
// Log a final event to update the duration of ProfilePhase.FINISH.
logEvent(ProfilerTask.INFO, "Finishing");
- save();
- clear();
+ if (writerThread != null) {
+ // Add poison pill to queue and then wait for writer thread to shut down.
+ taskQueue.add(POISON_PILL);
+ try {
+ writerThread.join();
+ } catch (InterruptedException e) {
+ writerThread.interrupt();
+ Thread.currentThread().interrupt();
+ }
+ writerThread = null;
+ }
+ initHistograms();
+ profileStartTime = 0L;
+ taskStack = null;
+ taskQueue = null;
for (SlowestTaskAggregator aggregator : slowestTasks) {
if (aggregator != null) {
@@ -599,11 +601,6 @@ public final class Profiler {
if (saveException != null) {
throw saveException;
}
- if (out != null) {
- out.writeInt(EOF_MARKER);
- out.close();
- out = null;
- }
}
/**
@@ -618,81 +615,6 @@ public final class Profiler {
}
/**
- * Saves all gathered information from taskQueue queue to the file.
- * Method is invoked internally by the Timer-based thread and at the end of
- * profiling session.
- */
- private synchronized void save() {
- if (out == null) {
- return;
- }
- try {
- // Allocate the sink once to avoid GC
- ByteBuffer sink = ByteBuffer.allocate(1024);
- TaskData data;
- while ((data = taskQueue.poll()) != null) {
- sink.clear();
-
- VarInt.putVarLong(data.threadId, sink);
- VarInt.putVarInt(data.id, sink);
- VarInt.putVarInt(data.parentId, sink);
- VarInt.putVarLong(data.startTimeNanos - profileStartTime, sink);
- VarInt.putVarLong(data.duration, sink);
-
- // To save space (and improve performance), convert all description
- // strings to the canonical object and use IdentityHashMap to assign
- // unique numbers for each string.
- int descIndex = describer.getDescriptionIndex(data.description);
- VarInt.putVarInt(descIndex + 1, sink); // Add 1 to avoid encoding negative values.
-
- // Save types using their ordinal() value
- sink.put((byte) data.type.ordinal());
-
- // Save aggregated data stats.
- if (data.counts != null) {
- for (int i = 0; i < TASK_COUNT; i++) {
- if (data.counts[i] > 0) {
- sink.put((byte) i); // aggregated type ordinal value
- VarInt.putVarInt(data.counts[i], sink);
- VarInt.putVarLong(data.durations[i], sink);
- }
- }
- }
-
- this.out.writeInt(sink.position());
- this.out.write(sink.array(), 0, sink.position());
- if (describer.isUnassigned(descIndex)) {
- this.out.writeUTF(describer.memoizeDescription(data.description));
- }
- }
- this.out.flush();
- } catch (IOException e) {
- saveException = e;
- clear();
- try {
- out.close();
- } catch (IOException e2) {
- // ignore it
- }
- }
- }
-
- private synchronized void clear() {
- initHistograms();
- profileStartTime = 0L;
- if (timer != null) {
- timer.cancel();
- timer = null;
- }
- taskStack = null;
- taskQueue = null;
- describer = null;
-
- // Note that slowest task aggregator are not cleared here because clearing happens
- // periodically over the course of a command invocation.
- }
-
- /**
* Unless --record_full_profiler_data is given we drop small tasks and add their time to the
* parents duration.
*/
@@ -735,9 +657,7 @@ public final class Profiler {
if (wasTaskSlowEnoughToRecord(type, duration)) {
TaskData data = localStack.create(startTimeNanos, type, description);
data.duration = duration;
- if (out != null) {
- localQueue.add(data);
- }
+ localQueue.add(data);
SlowestTaskAggregator aggregator = slowestTasks[type.ordinal()];
@@ -863,7 +783,7 @@ public final class Profiler {
taskStack.peek().aggregateChild(data.type, data.duration);
}
boolean shouldRecordTask = wasTaskSlowEnoughToRecord(type, data.duration);
- if (out != null && (shouldRecordTask || data.counts != null)) {
+ if (shouldRecordTask || data.counts != null) {
taskQueue.add(data);
}
@@ -885,4 +805,110 @@ public final class Profiler {
logEvent(ProfilerTask.PHASE, phase.description);
}
}
+
+ /** Writes the profile in the binary Bazel profile format. */
+ private static class BinaryFormatWriter implements Runnable {
+ private final DataOutputStream out;
+ private final BlockingQueue<TaskData> queue;
+ private final long profileStartTime;
+ private final String comment;
+ private final Consumer<? super IOException> failureHandler;
+
+ BinaryFormatWriter(
+ OutputStream out,
+ BlockingQueue<TaskData> queue,
+ long profileStartTime,
+ String comment,
+ Consumer<? super IOException> failureHandler) {
+ // Wrapping deflater stream in the buffered stream proved to reduce CPU consumption caused by
+ // the write() method. Values for buffer sizes were chosen by running small amount of tests
+ // and identifying point of diminishing returns - but I have not really tried to optimize
+ // them.
+ this.out =
+ new DataOutputStream(
+ new BufferedOutputStream(
+ new DeflaterOutputStream(
+ // the DeflaterOutputStream has its own output buffer of 65k, chosen at random
+ out, new Deflater(Deflater.BEST_SPEED, false), 65536),
+ 262144)); // buffer size, basically chosen at random
+ this.queue = queue;
+ this.profileStartTime = profileStartTime;
+ this.comment = comment;
+ this.failureHandler = failureHandler;
+ }
+
+ private void writeHeader() throws IOException {
+ out.writeInt(MAGIC); // magic
+ out.writeInt(VERSION); // protocol_version
+ out.writeUTF(comment);
+ // ProfileTask.values() method sorts enums using their ordinal() value, so
+ // there there is no need to store ordinal() value for each entry.
+ out.writeInt(TASK_COUNT);
+ for (ProfilerTask type : ProfilerTask.values()) {
+ out.writeUTF(type.toString());
+ }
+ }
+
+ /**
+ * Saves all gathered information from taskQueue queue to the file.
+ * Method is invoked internally by the Timer-based thread and at the end of
+ * profiling session.
+ */
+ @Override
+ public void run() {
+ try {
+ writeHeader();
+ // Allocate the sink once to avoid GC
+ ByteBuffer sink = ByteBuffer.allocate(1024);
+ ObjectDescriber describer = new ObjectDescriber();
+ TaskData data;
+ while ((data = queue.take()) != POISON_PILL) {
+ sink.clear();
+
+ VarInt.putVarLong(data.threadId, sink);
+ VarInt.putVarInt(data.id, sink);
+ VarInt.putVarInt(data.parentId, sink);
+ VarInt.putVarLong(data.startTimeNanos - profileStartTime, sink);
+ VarInt.putVarLong(data.duration, sink);
+
+ // To save space (and improve performance), convert all description
+ // strings to the canonical object and use IdentityHashMap to assign
+ // unique numbers for each string.
+ int descIndex = describer.getDescriptionIndex(data.description);
+ VarInt.putVarInt(descIndex + 1, sink); // Add 1 to avoid encoding negative values.
+
+ // Save types using their ordinal() value
+ sink.put((byte) data.type.ordinal());
+
+ // Save aggregated data stats.
+ if (data.counts != null) {
+ for (int i = 0; i < TASK_COUNT; i++) {
+ if (data.counts[i] > 0) {
+ sink.put((byte) i); // aggregated type ordinal value
+ VarInt.putVarInt(data.counts[i], sink);
+ VarInt.putVarLong(data.durations[i], sink);
+ }
+ }
+ }
+
+ out.writeInt(sink.position());
+ out.write(sink.array(), 0, sink.position());
+ if (describer.isUnassigned(descIndex)) {
+ out.writeUTF(describer.memoizeDescription(data.description));
+ }
+ }
+ out.writeInt(EOF_MARKER);
+ out.close();
+ } catch (IOException e) {
+ failureHandler.accept(e);
+ try {
+ out.close();
+ } catch (IOException e2) {
+ // ignore it
+ }
+ } catch (InterruptedException e) {
+ // Exit silently.
+ }
+ }
+ }
}