aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-11-03 11:03:48 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-11-03 11:03:48 -0800
commit61ead3e061f685f87e284bf41f7ed1cb44f347b4 (patch)
tree612acee4fed0ca0e594843c9b3260cd10eeecb8b /src/core
parent4b65b1dec305edf96b05375155dc918c974a626a (diff)
Lower latency profiling
Current latency profiles have their tails dominated by writing latency logs, which is hugely undesirable. Now when a thread log fills up, push it to a background thread to write to disk. At shutdown, wait for all latency traces to be flushed.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/profiling/basic_timers.c185
-rw-r--r--src/core/profiling/timers.h2
-rw-r--r--src/core/support/thd_posix.c9
-rw-r--r--src/core/transport/chttp2/writing.c2
4 files changed, 172 insertions, 26 deletions
diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c
index 527a160101..f0fce7858d 100644
--- a/src/core/profiling/basic_timers.c
+++ b/src/core/profiling/basic_timers.c
@@ -53,50 +53,186 @@ typedef struct gpr_timer_entry {
short line;
char type;
gpr_uint8 important;
+ int thd;
} gpr_timer_entry;
-#define MAX_COUNT (5 * 1024 * 1024 / sizeof(gpr_timer_entry))
+#define MAX_COUNT 1000000
-static __thread gpr_timer_entry g_log[MAX_COUNT];
-static __thread int g_count;
+typedef struct gpr_timer_log {
+ size_t num_entries;
+ struct gpr_timer_log *next;
+ struct gpr_timer_log *prev;
+ gpr_timer_entry log[MAX_COUNT];
+} gpr_timer_log;
+
+typedef struct gpr_timer_log_list {
+ gpr_timer_log *head;
+ /* valid iff head!=NULL */
+ gpr_timer_log *tail;
+} gpr_timer_log_list;
+
+static __thread gpr_timer_log *g_thread_log;
static gpr_once g_once_init = GPR_ONCE_INIT;
static FILE *output_file;
+static const char *output_filename = "latency_trace.txt";
+static pthread_mutex_t g_mu;
+static pthread_cond_t g_cv;
+static gpr_timer_log_list g_in_progress_logs;
+static gpr_timer_log_list g_done_logs;
+static int g_shutdown;
+static gpr_thd_id g_writing_thread;
+static __thread int g_thread_id;
+static int g_next_thread_id;
-static void close_output() { fclose(output_file); }
+static int timer_log_push_back(gpr_timer_log_list *list, gpr_timer_log *log) {
+ if (list->head == NULL) {
+ list->head = list->tail = log;
+ log->next = log->prev = NULL;
+ return 1;
+ } else {
+ log->prev = list->tail;
+ log->next = NULL;
+ list->tail->next = log;
+ list->tail = log;
+ return 0;
+ }
+}
-static void init_output() {
- output_file = fopen("latency_trace.txt", "w");
- GPR_ASSERT(output_file);
- atexit(close_output);
+static gpr_timer_log *timer_log_pop_front(gpr_timer_log_list *list) {
+ gpr_timer_log *out = list->head;
+ if (out != NULL) {
+ list->head = out->next;
+ if (list->head != NULL) {
+ list->head->prev = NULL;
+ } else {
+ list->tail = NULL;
+ }
+ }
+ return out;
}
-static void log_report() {
- int i;
- gpr_once_init(&g_once_init, init_output);
- for (i = 0; i < g_count; i++) {
- gpr_timer_entry *entry = &(g_log[i]);
+static void timer_log_remove(gpr_timer_log_list *list, gpr_timer_log *log) {
+ if (log->prev == NULL) {
+ list->head = log->next;
+ if (list->head != NULL) {
+ list->head->prev = NULL;
+ }
+ } else {
+ log->prev->next = log->next;
+ }
+ if (log->next == NULL) {
+ list->tail = log->prev;
+ if (list->tail != NULL) {
+ list->tail->next = NULL;
+ }
+ } else {
+ log->next->prev = log->prev;
+ }
+}
+
+static void write_log(gpr_timer_log *log) {
+ size_t i;
+ if (output_file == NULL) {
+ output_file = fopen(output_filename, "w");
+ }
+ for (i = 0; i < log->num_entries; i++) {
+ gpr_timer_entry *entry = &(log->log[i]);
+ if (gpr_time_cmp(entry->tm, gpr_time_0(entry->tm.clock_type)) < 0) {
+ entry->tm = gpr_time_0(entry->tm.clock_type);
+ }
fprintf(output_file,
- "{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": "
+ "{\"t\": %ld.%09d, \"thd\": \"%d\", \"type\": \"%c\", \"tag\": "
"\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n",
- entry->tm.tv_sec, entry->tm.tv_nsec,
- (void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr,
- entry->file, entry->line, entry->important);
+ entry->tm.tv_sec, entry->tm.tv_nsec, entry->thd, entry->type,
+ entry->tagstr, entry->file, entry->line, entry->important);
+ }
+}
+
+static void writing_thread(void *unused) {
+ gpr_timer_log *log;
+ pthread_mutex_lock(&g_mu);
+ for (;;) {
+ while ((log = timer_log_pop_front(&g_done_logs)) == NULL && !g_shutdown) {
+ pthread_cond_wait(&g_cv, &g_mu);
+ }
+ if (log != NULL) {
+ pthread_mutex_unlock(&g_mu);
+ write_log(log);
+ free(log);
+ pthread_mutex_lock(&g_mu);
+ }
+ if (g_shutdown) {
+ pthread_mutex_unlock(&g_mu);
+ return;
+ }
}
+}
- /* Now clear out the log */
- g_count = 0;
+static void flush_logs(gpr_timer_log_list *list) {
+ gpr_timer_log *log;
+ while ((log = timer_log_pop_front(list)) != NULL) {
+ write_log(log);
+ free(log);
+ }
+}
+
+static void finish_writing() {
+ pthread_mutex_lock(&g_mu);
+ g_shutdown = 1;
+ pthread_cond_signal(&g_cv);
+ pthread_mutex_unlock(&g_mu);
+ gpr_thd_join(g_writing_thread);
+
+ gpr_log(GPR_INFO, "flushing logs");
+
+ pthread_mutex_lock(&g_mu);
+ flush_logs(&g_done_logs);
+ flush_logs(&g_in_progress_logs);
+ pthread_mutex_unlock(&g_mu);
+
+ if (output_file) {
+ fclose(output_file);
+ }
+}
+
+void gpr_timers_set_log_filename(const char *filename) {
+ output_filename = filename;
+}
+
+static void init_output() {
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+ gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options);
+ atexit(finish_writing);
+}
+
+static void rotate_log() {
+ gpr_timer_log *new = malloc(sizeof(*new));
+ gpr_once_init(&g_once_init, init_output);
+ new->num_entries = 0;
+ pthread_mutex_lock(&g_mu);
+ if (g_thread_log != NULL) {
+ timer_log_remove(&g_in_progress_logs, g_thread_log);
+ if (timer_log_push_back(&g_done_logs, g_thread_log)) {
+ pthread_cond_signal(&g_cv);
+ }
+ } else {
+ g_thread_id = g_next_thread_id++;
+ }
+ timer_log_push_back(&g_in_progress_logs, new);
+ pthread_mutex_unlock(&g_mu);
+ g_thread_log = new;
}
static void gpr_timers_log_add(const char *tagstr, marker_type type,
int important, const char *file, int line) {
gpr_timer_entry *entry;
- /* TODO (vpai) : Improve concurrency */
- if (g_count == MAX_COUNT) {
- log_report();
+ if (g_thread_log == NULL || g_thread_log->num_entries == MAX_COUNT) {
+ rotate_log();
}
- entry = &g_log[g_count++];
+ entry = &g_thread_log->log[g_thread_log->num_entries++];
entry->tm = gpr_now(GPR_CLOCK_PRECISE);
entry->tagstr = tagstr;
@@ -104,6 +240,7 @@ static void gpr_timers_log_add(const char *tagstr, marker_type type,
entry->file = file;
entry->line = (short)line;
entry->important = important != 0;
+ entry->thd = g_thread_id;
}
/* Latency profiler API implementation. */
@@ -131,4 +268,6 @@ void gpr_timers_global_destroy(void) {}
void gpr_timers_global_init(void) {}
void gpr_timers_global_destroy(void) {}
+
+void gpr_timers_set_log_filename(const char *filename) {}
#endif /* GRPC_BASIC_PROFILER */
diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h
index 0d112e7248..6a188dc566 100644
--- a/src/core/profiling/timers.h
+++ b/src/core/profiling/timers.h
@@ -48,6 +48,8 @@ void gpr_timer_begin(const char *tagstr, int important, const char *file,
void gpr_timer_end(const char *tagstr, int important, const char *file,
int line);
+void gpr_timers_set_log_filename(const char *filename);
+
#if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER))
/* No profiling. No-op all the things. */
#define GPR_TIMER_MARK(tag, important) \
diff --git a/src/core/support/thd_posix.c b/src/core/support/thd_posix.c
index c36d94d044..653a1c88c1 100644
--- a/src/core/support/thd_posix.c
+++ b/src/core/support/thd_posix.c
@@ -53,7 +53,7 @@ struct thd_arg {
/* Body of every thread started via gpr_thd_new. */
static void *thread_body(void *v) {
struct thd_arg a = *(struct thd_arg *)v;
- gpr_free(v);
+ free(v);
(*a.body)(a.arg);
return NULL;
}
@@ -63,7 +63,10 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
int thread_started;
pthread_attr_t attr;
pthread_t p;
- struct thd_arg *a = gpr_malloc(sizeof(*a));
+ /* don't use gpr_malloc as we may cause an infinite recursion with
+ * the profiling code */
+ struct thd_arg *a = malloc(sizeof(*a));
+ GPR_ASSERT(a != NULL);
a->body = thd_body;
a->arg = arg;
@@ -78,7 +81,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg,
thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0);
GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
if (!thread_started) {
- gpr_free(a);
+ free(a);
}
*t = (gpr_thd_id)p;
return thread_started;
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 353e476e40..7fe5649e5e 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -315,6 +315,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
}
+
+ GPR_TIMER_END("finalize_outbuf", 0);
}
void grpc_chttp2_cleanup_writing(