aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-11-03 11:03:48 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-11-03 11:03:48 -0800
commit61ead3e061f685f87e284bf41f7ed1cb44f347b4 (patch)
tree612acee4fed0ca0e594843c9b3260cd10eeecb8b
parent4b65b1dec305edf96b05375155dc918c974a626a (diff)
Lower latency profiling
Current latency profiles have their tails dominated by writing latency logs, which is hugely undesirable. Now when a thread log fills up, push it to a background thread to write to disk. At shutdown, wait for all latency traces to be flushed.
-rw-r--r--.gitignore1
-rw-r--r--src/core/profiling/basic_timers.c185
-rw-r--r--src/core/profiling/timers.h2
-rw-r--r--src/core/support/thd_posix.c9
-rw-r--r--src/core/transport/chttp2/writing.c2
-rw-r--r--test/core/fling/client.c11
-rw-r--r--test/core/fling/server.c7
-rwxr-xr-xtools/profiling/latency_profile/profile_analyzer.py3
8 files changed, 189 insertions, 31 deletions
diff --git a/.gitignore b/.gitignore
index 40f4c4734c..f059eca239 100644
--- a/.gitignore
+++ b/.gitignore
@@ -38,6 +38,7 @@ cache.mk
# Temporary test reports
report.xml
latency_trace.txt
+latency_trace.*.txt
# port server log
portlog.txt
diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c
index 527a160101..f0fce7858d 100644
--- a/src/core/profiling/basic_timers.c
+++ b/src/core/profiling/basic_timers.c
@@ -53,50 +53,186 @@ typedef struct gpr_timer_entry {
short line;
char type;
gpr_uint8 important;
+ int thd;
} gpr_timer_entry;
-#define MAX_COUNT (5 * 1024 * 1024 / sizeof(gpr_timer_entry))
+#define MAX_COUNT 1000000
-static __thread gpr_timer_entry g_log[MAX_COUNT];
-static __thread int g_count;
+typedef struct gpr_timer_log {
+ size_t num_entries;
+ struct gpr_timer_log *next;
+ struct gpr_timer_log *prev;
+ gpr_timer_entry log[MAX_COUNT];
+} gpr_timer_log;
+
+typedef struct gpr_timer_log_list {
+ gpr_timer_log *head;
+ /* valid iff head!=NULL */
+ gpr_timer_log *tail;
+} gpr_timer_log_list;
+
+static __thread gpr_timer_log *g_thread_log;
static gpr_once g_once_init = GPR_ONCE_INIT;
static FILE *output_file;
+static const char *output_filename = "latency_trace.txt";
+static pthread_mutex_t g_mu;
+static pthread_cond_t g_cv;
+static gpr_timer_log_list g_in_progress_logs;
+static gpr_timer_log_list g_done_logs;
+static int g_shutdown;
+static gpr_thd_id g_writing_thread;
+static __thread int g_thread_id;
+static int g_next_thread_id;
-static void close_output() { fclose(output_file); }
+static int timer_log_push_back(gpr_timer_log_list *list, gpr_timer_log *log) {
+ if (list->head == NULL) {
+ list->head = list->tail = log;
+ log->next = log->prev = NULL;
+ return 1;
+ } else {
+ log->prev = list->tail;
+ log->next = NULL;
+ list->tail->next = log;
+ list->tail = log;
+ return 0;
+ }
+}
-static void init_output() {
- output_file = fopen("latency_trace.txt", "w");
- GPR_ASSERT(output_file);
- atexit(close_output);
+static gpr_timer_log *timer_log_pop_front(gpr_timer_log_list *list) {
+ gpr_timer_log *out = list->head;
+ if (out != NULL) {
+ list->head = out->next;
+ if (list->head != NULL) {
+ list->head->prev = NULL;
+ } else {
+ list->tail = NULL;
+ }
+ }
+ return out;
}
-static void log_report() {
- int i;
- gpr_once_init(&g_once_init, init_output);
- for (i = 0; i < g_count; i++) {
- gpr_timer_entry *entry = &(g_log[i]);
+static void timer_log_remove(gpr_timer_log_list *list, gpr_timer_log *log) {
+ if (log->prev == NULL) {
+ list->head = log->next;
+ if (list->head != NULL) {
+ list->head->prev = NULL;
+ }
+ } else {
+ log->prev->next = log->next;
+ }
+ if (log->next == NULL) {
+ list->tail = log->prev;
+ if (list->tail != NULL) {
+ list->tail->next = NULL;
+ }
+ } else {
+ log->next->prev = log->prev;
+ }
+}
+
+static void write_log(gpr_timer_log *log) {
+ size_t i;
+ if (output_file == NULL) {
+ output_file = fopen(output_filename, "w");
+ }
+ for (i = 0; i < log->num_entries; i++) {
+ gpr_timer_entry *entry = &(log->log[i]);
+ if (gpr_time_cmp(entry->tm, gpr_time_0(entry->tm.clock_type)) < 0) {
+ entry->tm = gpr_time_0(entry->tm.clock_type);
+ }
fprintf(output_file,
- "{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": "
+ "{\"t\": %ld.%09d, \"thd\": \"%d\", \"type\": \"%c\", \"tag\": "
"\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n",
- entry->tm.tv_sec, entry->tm.tv_nsec,
- (void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr,
- entry->file, entry->line, entry->important);
+ entry->tm.tv_sec, entry->tm.tv_nsec, entry->thd, entry->type,
+ entry->tagstr, entry->file, entry->line, entry->important);
+ }
+}
+
+static void writing_thread(void *unused) {
+ gpr_timer_log *log;
+ pthread_mutex_lock(&g_mu);
+ for (;;) {
+ while ((log = timer_log_pop_front(&g_done_logs)) == NULL && !g_shutdown) {
+ pthread_cond_wait(&g_cv, &g_mu);
+ }
+ if (log != NULL) {
+ pthread_mutex_unlock(&g_mu);
+ write_log(log);
+ free(log);
+ pthread_mutex_lock(&g_mu);
+ }
+ if (g_shutdown) {
+ pthread_mutex_unlock(&g_mu);
+ return;
+ }
}
+}
- /* Now clear out the log */
- g_count = 0;
+static void flush_logs(gpr_timer_log_list *list) {
+ gpr_timer_log *log;
+ while ((log = timer_log_pop_front(list)) != NULL) {
+ write_log(log);
+ free(log);
+ }
+}
+
+static void finish_writing() {
+ pthread_mutex_lock(&g_mu);
+ g_shutdown = 1;
+ pthread_cond_signal(&g_cv);
+ pthread_mutex_unlock(&g_mu);
+ gpr_thd_join(g_writing_thread);
+
+ gpr_log(GPR_INFO, "flushing logs");
+
+ pthread_mutex_lock(&g_mu);
+ flush_logs(&g_done_logs);
+ flush_logs(&g_in_progress_logs);
+ pthread_mutex_unlock(&g_mu);
+
+ if (output_file) {
+ fclose(output_file);
+ }
+}
+
+void gpr_timers_set_log_filename(const char *filename) {
+ output_filename = filename;
+}
+
+static void init_output() {
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+ gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options);
+ atexit(finish_writing);
+}
+
+static void rotate_log() {
+ gpr_timer_log *new = malloc(sizeof(*new));
+ gpr_once_init(&g_once_init, init_output);
+ new->num_entries = 0;
+ pthread_mutex_lock(&g_mu);
+ if (g_thread_log != NULL) {
+ timer_log_remove(&g_in_progress_logs, g_thread_log);
+ if (timer_log_push_back(&g_done_logs, g_thread_log)) {
+ pthread_cond_signal(&g_cv);
+ }
+ } else {
+ g_thread_id = g_next_thread_id++;
+ }
+ timer_log_push_back(&g_in_progress_logs, new);
+ pthread_mutex_unlock(&g_mu);
+ g_thread_log = new;
}
static void gpr_timers_log_add(const char *tagstr, marker_type type,
int important, const char *file, int line) {
gpr_timer_entry *entry;
- /* TODO (vpai) : Improve concurrency */
- if (g_count == MAX_COUNT) {
- log_report();
+ if (g_thread_log == NULL || g_thread_log->num_entries == MAX_COUNT) {
+ rotate_log();
}
- entry = &g_log[g_count++];
+ entry = &g_thread_log->log[g_thread_log->num_entries++];
entry->tm = gpr_now(GPR_CLOCK_PRECISE);
entry->tagstr = tagstr;
@@ -104,6 +240,7 @@ static void gpr_timers_log_add(const char *tagstr, marker_type type,
entry->file = file;
entry->line = (short)line;
entry->important = important != 0;
+ entry->thd = g_thread_id;
}
/* Latency profiler API implementation. */
@@ -131,4 +268,6 @@ void gpr_timers_global_destroy(void) {}
void gpr_timers_global_init(void) {}
void gpr_timers_global_destroy(void) {}
+
+void gpr_timers_set_log_filename(const char *filename) {}
#endif /* GRPC_BASIC_PROFILER */
diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h
index 0d112e7248..6a188dc566 100644
--- a/src/core/profiling/timers.h
+++ b/src/core/profiling/timers.h
@@ -48,6 +48,8 @@ void gpr_timer_begin(const char *tagstr, int important, const char *file,
void gpr_timer_end(const char *tagstr, int important, const char *file,
int line);
+void gpr_timers_set_log_filename(const char *filename);
+
#if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER))
/* No profiling. No-op all the things. */
#define GPR_TIMER_MARK(tag, important) \
diff --git a/src/core/support/thd_posix.c b/src/core/support/thd_posix.c
index c36d94d044..653a1c88c1 100644
--- a/src/core/support/thd_posix.c
+++ b/src/core/support/thd_posix.c
@@ -53,7 +53,7 @@ struct thd_arg {
/* Body of every thread started via gpr_thd_new. */
static void *thread_body(void *v) {
struct thd_arg a = *(struct thd_arg *)v;
- gpr_free(v);
+ free(v);
(*a.body)(a.arg);
return NULL;
}
@@ -63,7 +63,10 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
int thread_started;
pthread_attr_t attr;
pthread_t p;
- struct thd_arg *a = gpr_malloc(sizeof(*a));
+ /* don't use gpr_malloc as we may cause an infinite recursion with
+ * the profiling code */
+ struct thd_arg *a = malloc(sizeof(*a));
+ GPR_ASSERT(a != NULL);
a->body = thd_body;
a->arg = arg;
@@ -78,7 +81,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0);
GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
if (!thread_started) {
- gpr_free(a);
+ free(a);
}
*t = (gpr_thd_id)p;
return thread_started;
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 353e476e40..7fe5649e5e 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -315,6 +315,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
}
+
+ GPR_TIMER_END("finalize_outbuf", 0);
}
void grpc_chttp2_cleanup_writing(
diff --git a/test/core/fling/client.c b/test/core/fling/client.c
index a53411c2f5..99b30d6c4a 100644
--- a/test/core/fling/client.c
+++ b/test/core/fling/client.c
@@ -41,6 +41,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
+#include "src/core/profiling/timers.h"
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/test_config.h"
@@ -89,6 +90,7 @@ static void init_ping_pong_request(void) {
}
static void step_ping_pong_request(void) {
+ GPR_TIMER_BEGIN("ping_pong", 1);
call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
"/Reflector/reflectUnary", "localhost",
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
@@ -99,6 +101,7 @@ static void step_ping_pong_request(void) {
grpc_call_destroy(call);
grpc_byte_buffer_destroy(response_payload_recv);
call = NULL;
+ GPR_TIMER_END("ping_pong", 1);
}
static void init_ping_pong_stream(void) {
@@ -122,10 +125,12 @@ static void init_ping_pong_stream(void) {
static void step_ping_pong_stream(void) {
grpc_call_error error;
+ GPR_TIMER_BEGIN("ping_pong", 1);
error = grpc_call_start_batch(call, stream_step_ops, 2, (void *)1, NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
grpc_byte_buffer_destroy(response_payload_recv);
+ GPR_TIMER_END("ping_pong", 1);
}
static double now(void) {
@@ -159,12 +164,14 @@ int main(int argc, char **argv) {
char *scenario_name = "ping-pong-request";
scenario sc = {NULL, NULL, NULL};
+ gpr_timers_set_log_filename("latency_trace.fling_client.txt");
+
+ grpc_init();
+
GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc_test_init(1, fake_argv);
- grpc_init();
-
cl = gpr_cmdline_create("fling client");
gpr_cmdline_add_int(cl, "payload_size", "Size of the payload to send",
&payload_size);
diff --git a/test/core/fling/server.c b/test/core/fling/server.c
index 67631e5a07..14a1a815bc 100644
--- a/test/core/fling/server.c
+++ b/test/core/fling/server.c
@@ -44,15 +44,16 @@
#include <unistd.h>
#endif
-#include "test/core/util/grpc_profiler.h"
-#include "test/core/util/test_config.h"
#include <grpc/support/alloc.h>
#include <grpc/support/cmdline.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
+#include "src/core/profiling/timers.h"
#include "test/core/util/port.h"
#include "test/core/end2end/data/ssl_test_data.h"
+#include "test/core/util/grpc_profiler.h"
+#include "test/core/util/test_config.h"
static grpc_completion_queue *cq;
static grpc_server *server;
@@ -192,6 +193,8 @@ int main(int argc, char **argv) {
char *fake_argv[1];
+ gpr_timers_set_log_filename("latency_trace.fling_server.txt");
+
GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc_test_init(1, fake_argv);
diff --git a/tools/profiling/latency_profile/profile_analyzer.py b/tools/profiling/latency_profile/profile_analyzer.py
index b12e1074e5..b2a38ea60a 100755
--- a/tools/profiling/latency_profile/profile_analyzer.py
+++ b/tools/profiling/latency_profile/profile_analyzer.py
@@ -49,7 +49,7 @@ class ScopeBuilder(object):
self.call_stack_builder.lines.append(line_item)
def finish(self, line):
- assert line['tag'] == self.top_line.tag, 'expected %s, got %s' % (self.top_line.tag, line['tag'])
+ assert line['tag'] == self.top_line.tag, 'expected %s, got %s; thread=%s; t0=%f t1=%f' % (self.top_line.tag, line['tag'], line['thd'], self.top_line.start_time, line['t'])
final_time_stamp = line['t']
assert self.top_line.end_time is None
self.top_line.end_time = final_time_stamp
@@ -84,6 +84,7 @@ class CallStackBuilder(object):
self.stk.append(ScopeBuilder(self, line))
return False
elif line_type == '}':
+ assert self.stk, 'expected non-empty stack for closing %s; thread=%s; t=%f' % (line['tag'], line['thd'], line['t'])
self.stk.pop().finish(line)
if not self.stk:
self.finish()