diff options
author | Craig Tiller <ctiller@google.com> | 2015-11-03 11:03:48 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-11-03 11:03:48 -0800 |
commit | 61ead3e061f685f87e284bf41f7ed1cb44f347b4 (patch) | |
tree | 612acee4fed0ca0e594843c9b3260cd10eeecb8b /src/core | |
parent | 4b65b1dec305edf96b05375155dc918c974a626a (diff) |
Lower latency profiling
Current latency profiles have their tails dominated by writing latency
logs, which is hugely undesirable.
Now when a thread log fills up, push it to a background thread to write
to disk. At shutdown, wait for all latency traces to be flushed.
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/profiling/basic_timers.c | 185 | ||||
-rw-r--r-- | src/core/profiling/timers.h | 2 | ||||
-rw-r--r-- | src/core/support/thd_posix.c | 9 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 2 |
4 files changed, 172 insertions, 26 deletions
diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c index 527a160101..f0fce7858d 100644 --- a/src/core/profiling/basic_timers.c +++ b/src/core/profiling/basic_timers.c @@ -53,50 +53,186 @@ typedef struct gpr_timer_entry { short line; char type; gpr_uint8 important; + int thd; } gpr_timer_entry; -#define MAX_COUNT (5 * 1024 * 1024 / sizeof(gpr_timer_entry)) +#define MAX_COUNT 1000000 -static __thread gpr_timer_entry g_log[MAX_COUNT]; -static __thread int g_count; +typedef struct gpr_timer_log { + size_t num_entries; + struct gpr_timer_log *next; + struct gpr_timer_log *prev; + gpr_timer_entry log[MAX_COUNT]; +} gpr_timer_log; + +typedef struct gpr_timer_log_list { + gpr_timer_log *head; + /* valid iff head!=NULL */ + gpr_timer_log *tail; +} gpr_timer_log_list; + +static __thread gpr_timer_log *g_thread_log; static gpr_once g_once_init = GPR_ONCE_INIT; static FILE *output_file; +static const char *output_filename = "latency_trace.txt"; +static pthread_mutex_t g_mu; +static pthread_cond_t g_cv; +static gpr_timer_log_list g_in_progress_logs; +static gpr_timer_log_list g_done_logs; +static int g_shutdown; +static gpr_thd_id g_writing_thread; +static __thread int g_thread_id; +static int g_next_thread_id; -static void close_output() { fclose(output_file); } +static int timer_log_push_back(gpr_timer_log_list *list, gpr_timer_log *log) { + if (list->head == NULL) { + list->head = list->tail = log; + log->next = log->prev = NULL; + return 1; + } else { + log->prev = list->tail; + log->next = NULL; + list->tail->next = log; + list->tail = log; + return 0; + } +} -static void init_output() { - output_file = fopen("latency_trace.txt", "w"); - GPR_ASSERT(output_file); - atexit(close_output); +static gpr_timer_log *timer_log_pop_front(gpr_timer_log_list *list) { + gpr_timer_log *out = list->head; + if (out != NULL) { + list->head = out->next; + if (list->head != NULL) { + list->head->prev = NULL; + } else { + list->tail = NULL; + } + } + return out; } -static void log_report() { - int i; - gpr_once_init(&g_once_init, init_output); - for (i = 0; i < g_count; i++) { - gpr_timer_entry *entry = &(g_log[i]); +static void timer_log_remove(gpr_timer_log_list *list, gpr_timer_log *log) { + if (log->prev == NULL) { + list->head = log->next; + if (list->head != NULL) { + list->head->prev = NULL; + } + } else { + log->prev->next = log->next; + } + if (log->next == NULL) { + list->tail = log->prev; + if (list->tail != NULL) { + list->tail->next = NULL; + } + } else { + log->next->prev = log->prev; + } +} + +static void write_log(gpr_timer_log *log) { + size_t i; + if (output_file == NULL) { + output_file = fopen(output_filename, "w"); + } + for (i = 0; i < log->num_entries; i++) { + gpr_timer_entry *entry = &(log->log[i]); + if (gpr_time_cmp(entry->tm, gpr_time_0(entry->tm.clock_type)) < 0) { + entry->tm = gpr_time_0(entry->tm.clock_type); + } fprintf(output_file, - "{\"t\": %ld.%09d, \"thd\": \"%p\", \"type\": \"%c\", \"tag\": " + "{\"t\": %ld.%09d, \"thd\": \"%d\", \"type\": \"%c\", \"tag\": " "\"%s\", \"file\": \"%s\", \"line\": %d, \"imp\": %d}\n", - entry->tm.tv_sec, entry->tm.tv_nsec, - (void *)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tagstr, - entry->file, entry->line, entry->important); + entry->tm.tv_sec, entry->tm.tv_nsec, entry->thd, entry->type, + entry->tagstr, entry->file, entry->line, entry->important); + } +} + +static void writing_thread(void *unused) { + gpr_timer_log *log; + pthread_mutex_lock(&g_mu); + for (;;) { + while ((log = timer_log_pop_front(&g_done_logs)) == NULL && !g_shutdown) { + pthread_cond_wait(&g_cv, &g_mu); + } + if (log != NULL) { + pthread_mutex_unlock(&g_mu); + write_log(log); + free(log); + pthread_mutex_lock(&g_mu); + } + if (g_shutdown) { + pthread_mutex_unlock(&g_mu); + return; + } } +} - /* Now clear out the log */ - g_count = 0; +static void flush_logs(gpr_timer_log_list *list) { + gpr_timer_log *log; + while ((log = timer_log_pop_front(list)) != NULL) { + write_log(log); + free(log); + } +} + +static void finish_writing() { + pthread_mutex_lock(&g_mu); + g_shutdown = 1; + pthread_cond_signal(&g_cv); + pthread_mutex_unlock(&g_mu); + gpr_thd_join(g_writing_thread); + + gpr_log(GPR_INFO, "flushing logs"); + + pthread_mutex_lock(&g_mu); + flush_logs(&g_done_logs); + flush_logs(&g_in_progress_logs); + pthread_mutex_unlock(&g_mu); + + if (output_file) { + fclose(output_file); + } +} + +void gpr_timers_set_log_filename(const char *filename) { + output_filename = filename; +} + +static void init_output() { + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + gpr_thd_new(&g_writing_thread, writing_thread, NULL, &options); + atexit(finish_writing); +} + +static void rotate_log() { + gpr_timer_log *new = malloc(sizeof(*new)); + gpr_once_init(&g_once_init, init_output); + new->num_entries = 0; + pthread_mutex_lock(&g_mu); + if (g_thread_log != NULL) { + timer_log_remove(&g_in_progress_logs, g_thread_log); + if (timer_log_push_back(&g_done_logs, g_thread_log)) { + pthread_cond_signal(&g_cv); + } + } else { + g_thread_id = g_next_thread_id++; + } + timer_log_push_back(&g_in_progress_logs, new); + pthread_mutex_unlock(&g_mu); + g_thread_log = new; } static void gpr_timers_log_add(const char *tagstr, marker_type type, int important, const char *file, int line) { gpr_timer_entry *entry; - /* TODO (vpai) : Improve concurrency */ - if (g_count == MAX_COUNT) { - log_report(); + if (g_thread_log == NULL || g_thread_log->num_entries == MAX_COUNT) { + rotate_log(); } - entry = &g_log[g_count++]; + entry = &g_thread_log->log[g_thread_log->num_entries++]; entry->tm = gpr_now(GPR_CLOCK_PRECISE); entry->tagstr = tagstr; @@ -104,6 +240,7 @@ static void gpr_timers_log_add(const char *tagstr, marker_type type, entry->file = file; entry->line = (short)line; entry->important = important != 0; + entry->thd = g_thread_id; } /* Latency profiler API implementation. */ @@ -131,4 +268,6 @@ void gpr_timers_global_destroy(void) {} void gpr_timers_global_init(void) {} void gpr_timers_global_destroy(void) {} + +void gpr_timers_set_log_filename(const char *filename) {} #endif /* GRPC_BASIC_PROFILER */ diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h index 0d112e7248..6a188dc566 100644 --- a/src/core/profiling/timers.h +++ b/src/core/profiling/timers.h @@ -48,6 +48,8 @@ void gpr_timer_begin(const char *tagstr, int important, const char *file, void gpr_timer_end(const char *tagstr, int important, const char *file, int line); +void gpr_timers_set_log_filename(const char *filename); + #if !(defined(GRPC_STAP_PROFILER) + defined(GRPC_BASIC_PROFILER)) /* No profiling. No-op all the things. */ #define GPR_TIMER_MARK(tag, important) \ diff --git a/src/core/support/thd_posix.c b/src/core/support/thd_posix.c index c36d94d044..653a1c88c1 100644 --- a/src/core/support/thd_posix.c +++ b/src/core/support/thd_posix.c @@ -53,7 +53,7 @@ struct thd_arg { /* Body of every thread started via gpr_thd_new. */ static void *thread_body(void *v) { struct thd_arg a = *(struct thd_arg *)v; - gpr_free(v); + free(v); (*a.body)(a.arg); return NULL; } @@ -63,7 +63,10 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, int thread_started; pthread_attr_t attr; pthread_t p; - struct thd_arg *a = gpr_malloc(sizeof(*a)); + /* don't use gpr_malloc as we may cause an infinite recursion with + * the profiling code */ + struct thd_arg *a = malloc(sizeof(*a)); + GPR_ASSERT(a != NULL); a->body = thd_body; a->arg = arg; @@ -78,7 +81,7 @@ int gpr_thd_new(gpr_thd_id *t, void (*thd_body)(void *arg), void *arg, thread_started = (pthread_create(&p, &attr, &thread_body, a) == 0); GPR_ASSERT(pthread_attr_destroy(&attr) == 0); if (!thread_started) { - gpr_free(a); + free(a); } *t = (gpr_thd_id)p; return thread_started; diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index 353e476e40..7fe5649e5e 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -315,6 +315,8 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, grpc_chttp2_list_add_written_stream(transport_writing, stream_writing); } } + + GPR_TIMER_END("finalize_outbuf", 0); } void grpc_chttp2_cleanup_writing( |