diff options
Diffstat (limited to 'src/core/lib')
43 files changed, 898 insertions, 432 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 106666410a..0f8e33c4be 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -23,7 +23,7 @@ #include <stdlib.h> #include <string.h> -grpc_tracer_flag grpc_trace_channel = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_channel = GRPC_TRACER_INITIALIZER(false, "channel"); /* Memory layouts. diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c index 01529df80a..c369e33073 100644 --- a/src/core/lib/channel/channel_stack_builder.c +++ b/src/core/lib/channel/channel_stack_builder.c @@ -24,7 +24,7 @@ #include <grpc/support/string_util.h> grpc_tracer_flag grpc_trace_channel_stack_builder = - GRPC_TRACER_INITIALIZER(false); + GRPC_TRACER_INITIALIZER(false, "channel_stack_builder"); typedef struct filter_node { struct filter_node *next; diff --git a/src/core/lib/compression/stream_compression.c b/src/core/lib/compression/stream_compression.c new file mode 100644 index 0000000000..df13d53e06 --- /dev/null +++ b/src/core/lib/compression/stream_compression.c @@ -0,0 +1,191 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/compression/stream_compression.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/slice/slice_internal.h" + +#define OUTPUT_BLOCK_SIZE (1024) + +static bool gzip_flate(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, grpc_slice_buffer *out, + size_t *output_size, size_t max_output_size, int flush, + bool *end_of_context) { + GPR_ASSERT(flush == 0 || flush == Z_SYNC_FLUSH || flush == Z_FINISH); + /* Full flush is not allowed when inflating. */ + GPR_ASSERT(!(ctx->flate == inflate && (flush == Z_FINISH))); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + int r; + bool eoc = false; + size_t original_max_output_size = max_output_size; + while (max_output_size > 0 && (in->length > 0 || flush) && !eoc) { + size_t slice_size = max_output_size < OUTPUT_BLOCK_SIZE ? max_output_size + : OUTPUT_BLOCK_SIZE; + grpc_slice slice_out = GRPC_SLICE_MALLOC(slice_size); + ctx->zs.avail_out = (uInt)slice_size; + ctx->zs.next_out = GRPC_SLICE_START_PTR(slice_out); + while (ctx->zs.avail_out > 0 && in->length > 0 && !eoc) { + grpc_slice slice = grpc_slice_buffer_take_first(in); + ctx->zs.avail_in = (uInt)GRPC_SLICE_LENGTH(slice); + ctx->zs.next_in = GRPC_SLICE_START_PTR(slice); + r = ctx->flate(&ctx->zs, Z_NO_FLUSH); + if (r < 0 && r != Z_BUF_ERROR) { + gpr_log(GPR_ERROR, "zlib error (%d)", r); + grpc_slice_unref_internal(&exec_ctx, slice_out); + grpc_exec_ctx_finish(&exec_ctx); + return false; + } else if (r == Z_STREAM_END && ctx->flate == inflate) { + eoc = true; + } + if (ctx->zs.avail_in > 0) { + grpc_slice_buffer_undo_take_first( + in, + grpc_slice_sub(slice, GRPC_SLICE_LENGTH(slice) - ctx->zs.avail_in, + GRPC_SLICE_LENGTH(slice))); + } + grpc_slice_unref_internal(&exec_ctx, slice); + } + if (flush != 0 && ctx->zs.avail_out > 0 && !eoc) { + GPR_ASSERT(in->length == 0); + r = ctx->flate(&ctx->zs, flush); + if (flush == Z_SYNC_FLUSH) { + switch (r) { + case Z_OK: + /* Maybe flush is not complete; just made some partial progress. */ + if (ctx->zs.avail_out > 0) { + flush = 0; + } + break; + case Z_BUF_ERROR: + case Z_STREAM_END: + flush = 0; + break; + default: + gpr_log(GPR_ERROR, "zlib error (%d)", r); + grpc_slice_unref_internal(&exec_ctx, slice_out); + grpc_exec_ctx_finish(&exec_ctx); + return false; + } + } else if (flush == Z_FINISH) { + switch (r) { + case Z_OK: + case Z_BUF_ERROR: + /* Wait for the next loop to assign additional output space. */ + GPR_ASSERT(ctx->zs.avail_out == 0); + break; + case Z_STREAM_END: + flush = 0; + break; + default: + gpr_log(GPR_ERROR, "zlib error (%d)", r); + grpc_slice_unref_internal(&exec_ctx, slice_out); + grpc_exec_ctx_finish(&exec_ctx); + return false; + } + } + } + + if (ctx->zs.avail_out == 0) { + grpc_slice_buffer_add(out, slice_out); + } else if (ctx->zs.avail_out < slice_size) { + slice_out.data.refcounted.length -= ctx->zs.avail_out; + grpc_slice_buffer_add(out, slice_out); + } else { + grpc_slice_unref_internal(&exec_ctx, slice_out); + } + max_output_size -= (slice_size - ctx->zs.avail_out); + } + grpc_exec_ctx_finish(&exec_ctx); + if (end_of_context) { + *end_of_context = eoc; + } + if (output_size) { + *output_size = original_max_output_size - max_output_size; + } + return true; +} + +bool grpc_stream_compress(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, grpc_slice_buffer *out, + size_t *output_size, size_t max_output_size, + grpc_stream_compression_flush flush) { + GPR_ASSERT(ctx->flate == deflate); + int gzip_flush; + switch (flush) { + case GRPC_STREAM_COMPRESSION_FLUSH_NONE: + gzip_flush = 0; + break; + case GRPC_STREAM_COMPRESSION_FLUSH_SYNC: + gzip_flush = Z_SYNC_FLUSH; + break; + case GRPC_STREAM_COMPRESSION_FLUSH_FINISH: + gzip_flush = Z_FINISH; + break; + default: + gzip_flush = 0; + } + return gzip_flate(ctx, in, out, output_size, max_output_size, gzip_flush, + NULL); +} + +bool grpc_stream_decompress(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, grpc_slice_buffer *out, + size_t *output_size, size_t max_output_size, + bool *end_of_context) { + GPR_ASSERT(ctx->flate == inflate); + return gzip_flate(ctx, in, out, output_size, max_output_size, Z_SYNC_FLUSH, + end_of_context); +} + +grpc_stream_compression_context *grpc_stream_compression_context_create( + grpc_stream_compression_method method) { + grpc_stream_compression_context *ctx = + gpr_zalloc(sizeof(grpc_stream_compression_context)); + int r; + if (ctx == NULL) { + return NULL; + } + if (method == GRPC_STREAM_COMPRESSION_DECOMPRESS) { + r = inflateInit2(&ctx->zs, 0x1F); + ctx->flate = inflate; + } else { + r = deflateInit2(&ctx->zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 0x1F, 8, + Z_DEFAULT_STRATEGY); + ctx->flate = deflate; + } + if (r != Z_OK) { + gpr_free(ctx); + return NULL; + } + + return ctx; +} + +void grpc_stream_compression_context_destroy( + grpc_stream_compression_context *ctx) { + if (ctx->flate == inflate) { + inflateEnd(&ctx->zs); + } else { + deflateEnd(&ctx->zs); + } + gpr_free(ctx); +} diff --git a/src/core/lib/compression/stream_compression.h b/src/core/lib/compression/stream_compression.h new file mode 100644 index 0000000000..844dff81a3 --- /dev/null +++ b/src/core/lib/compression/stream_compression.h @@ -0,0 +1,90 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_H +#define GRPC_CORE_LIB_COMPRESSION_STREAM_COMPRESSION_H + +#include <stdbool.h> + +#include <grpc/slice_buffer.h> +#include <zlib.h> + +/* Stream compression/decompression context */ +typedef struct grpc_stream_compression_context { + z_stream zs; + int (*flate)(z_stream *zs, int flush); +} grpc_stream_compression_context; + +typedef enum grpc_stream_compression_method { + GRPC_STREAM_COMPRESSION_COMPRESS = 0, + GRPC_STREAM_COMPRESSION_DECOMPRESS, + GRPC_STREAM_COMPRESSION_METHOD_COUNT +} grpc_stream_compression_method; + +typedef enum grpc_stream_compression_flush { + GRPC_STREAM_COMPRESSION_FLUSH_NONE = 0, + GRPC_STREAM_COMPRESSION_FLUSH_SYNC, + GRPC_STREAM_COMPRESSION_FLUSH_FINISH, + GRPC_STREAM_COMPRESSION_FLUSH_COUNT +} grpc_stream_compression_flush; + +/** + * Compress bytes provided in \a in with a given context, with an optional flush + * at the end of compression. Emits at most \a max_output_size compressed bytes + * into \a out. If all the bytes in input buffer \a in are depleted and \a flush + * is not GRPC_STREAM_COMPRESSION_FLUSH_NONE, the corresponding flush method is + * executed. The total number of bytes emitted is outputed in \a output_size. + * + * A SYNC flush indicates that the entire messages in \a in can be decompressed + * from \a out. A FINISH flush implies a SYNC flush, and that any further + * compression will not be dependent on the state of the current context and any + * previous compressed bytes. It allows corresponding decompression context to + * be dropped when reaching this boundary. + */ +bool grpc_stream_compress(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, grpc_slice_buffer *out, + size_t *output_size, size_t max_output_size, + grpc_stream_compression_flush flush); + +/** + * Decompress bytes provided in \a in with a given context. Emits at most \a + * max_output_size decompressed bytes into \a out. If decompression process + * reached the end of a gzip stream, \a end_of_context is set to true; otherwise + * it is set to false. The total number of bytes emitted is outputed in \a + * output_size. + */ +bool grpc_stream_decompress(grpc_stream_compression_context *ctx, + grpc_slice_buffer *in, grpc_slice_buffer *out, + size_t *output_size, size_t max_output_size, + bool *end_of_context); + +/** + * Creates a stream compression context. \a pending_bytes_buffer is the input + * buffer for compression/decompression operations. \a method specifies whether + * the context is for compression or decompression. + */ +grpc_stream_compression_context *grpc_stream_compression_context_create( + grpc_stream_compression_method method); + +/** + * Destroys a stream compression context. + */ +void grpc_stream_compression_context_destroy( + grpc_stream_compression_context *ctx); + +#endif diff --git a/src/core/lib/debug/trace.c b/src/core/lib/debug/trace.c index 8249b2ebd6..c6c1853e20 100644 --- a/src/core/lib/debug/trace.c +++ b/src/core/lib/debug/trace.c @@ -27,7 +27,6 @@ int grpc_tracer_set_enabled(const char *name, int enabled); typedef struct tracer { - const char *name; grpc_tracer_flag *flag; struct tracer *next; } tracer; @@ -39,9 +38,8 @@ static tracer *tracers; #define TRACER_SET(flag, on) (flag).value = (on) #endif -void grpc_register_tracer(const char *name, grpc_tracer_flag *flag) { +void grpc_register_tracer(grpc_tracer_flag *flag) { tracer *t = gpr_malloc(sizeof(*t)); - t->name = name; t->flag = flag; t->next = tracers; TRACER_SET(*flag, false); @@ -93,6 +91,14 @@ static void parse(const char *s) { gpr_free(strings); } +static void list_tracers() { + gpr_log(GPR_DEBUG, "available tracers:"); + tracer *t; + for (t = tracers; t; t = t->next) { + gpr_log(GPR_DEBUG, "\t%s", t->flag->name); + } +} + void grpc_tracer_init(const char *env_var) { char *e = gpr_getenv(env_var); if (e != NULL) { @@ -115,10 +121,18 @@ int grpc_tracer_set_enabled(const char *name, int enabled) { for (t = tracers; t; t = t->next) { TRACER_SET(*t->flag, enabled); } + } else if (0 == strcmp(name, "list_tracers")) { + list_tracers(); + } else if (0 == strcmp(name, "refcount")) { + for (t = tracers; t; t = t->next) { + if (strstr(t->flag->name, "refcount") != NULL) { + TRACER_SET(*t->flag, enabled); + } + } } else { int found = 0; for (t = tracers; t; t = t->next) { - if (0 == strcmp(name, t->name)) { + if (0 == strcmp(name, t->flag->name)) { TRACER_SET(*t->flag, enabled); found = 1; } diff --git a/src/core/lib/debug/trace.h b/src/core/lib/debug/trace.h index 7cc9fb4e41..dd9e6a30fe 100644 --- a/src/core/lib/debug/trace.h +++ b/src/core/lib/debug/trace.h @@ -35,19 +35,20 @@ typedef struct { #else bool value; #endif + char *name; } grpc_tracer_flag; #ifdef GRPC_THREADSAFE_TRACER #define GRPC_TRACER_ON(flag) (gpr_atm_no_barrier_load(&(flag).value) != 0) -#define GRPC_TRACER_INITIALIZER(on) \ - { (gpr_atm)(on) } +#define GRPC_TRACER_INITIALIZER(on, name) \ + { (gpr_atm)(on), (name) } #else #define GRPC_TRACER_ON(flag) ((flag).value) -#define GRPC_TRACER_INITIALIZER(on) \ - { (on) } +#define GRPC_TRACER_INITIALIZER(on, name) \ + { (on), (name) } #endif -void grpc_register_tracer(const char *name, grpc_tracer_flag *flag); +void grpc_register_tracer(grpc_tracer_flag *flag); void grpc_tracer_init(const char *env_var_name); void grpc_tracer_shutdown(void); diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index 71d697c278..9c5e93f4e5 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -25,7 +25,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> -grpc_tracer_flag grpc_http1_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_http1_trace = GRPC_TRACER_INITIALIZER(false, "http1"); static char *buf2str(void *buffer, size_t length) { char *out = gpr_malloc(length + 1); diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index e028e72ed6..26f9cbe0fa 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -25,7 +25,7 @@ #include "src/core/lib/profiling/timers.h" #ifndef NDEBUG -grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false, "closure"); #endif #ifndef NDEBUG diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 7f9c5d837f..c72c37e2b5 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -27,7 +27,8 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" -grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_combiner_trace = + GRPC_TRACER_INITIALIZER(false, "combiner"); #define GRPC_COMBINER_TRACE(fn) \ do { \ diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index a95929a1fb..3759dda992 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -36,7 +36,8 @@ #include "src/core/lib/slice/slice_internal.h" #ifndef NDEBUG -grpc_tracer_flag grpc_trace_error_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_error_refcount = + GRPC_TRACER_INITIALIZER(false, "error_refcount"); #endif static const char *error_int_name(grpc_error_ints key) { diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c index 9f82c480bc..27b4892d1d 100644 --- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c +++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c @@ -932,24 +932,12 @@ static int fd_wrapped_fd(grpc_fd *fd) { static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason) { - bool is_fd_closed = false; grpc_error *error = GRPC_ERROR_NONE; polling_island *unref_pi = NULL; gpr_mu_lock(&fd->po.mu); fd->on_done_closure = on_done; - /* If release_fd is not NULL, we should be relinquishing control of the file - descriptor fd->fd (but we still own the grpc_fd structure). */ - if (release_fd != NULL) { - *release_fd = fd->fd; - } else { - close(fd->fd); - is_fd_closed = true; - } - - fd->orphaned = true; - /* Remove the active status but keep referenced. We want this grpc_fd struct to be alive (and not added to freelist) until the end of this function */ REF_BY(fd, 1, reason); @@ -964,13 +952,24 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, before doing this.) */ if (fd->po.pi != NULL) { polling_island *pi_latest = polling_island_lock(fd->po.pi); - polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error); + polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */, + &error); gpr_mu_unlock(&pi_latest->mu); unref_pi = fd->po.pi; fd->po.pi = NULL; } + /* If release_fd is not NULL, we should be relinquishing control of the file + descriptor fd->fd (but we still own the grpc_fd structure). */ + if (release_fd != NULL) { + *release_fd = fd->fd; + } else { + close(fd->fd); + } + + fd->orphaned = true; + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->po.mu); diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 5574838187..5690431759 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -103,6 +103,32 @@ typedef struct pollable { grpc_pollset_worker *root_worker; } pollable; +static const char *polling_obj_type_string(polling_obj_type t) { + switch (t) { + case PO_POLLING_GROUP: + return "polling_group"; + case PO_POLLSET_SET: + return "pollset_set"; + case PO_POLLSET: + return "pollset"; + case PO_FD: + return "fd"; + case PO_EMPTY_POLLABLE: + return "empty_pollable"; + case PO_COUNT: + return "<invalid:count>"; + } + return "<invalid>"; +} + +static char *pollable_desc(pollable *p) { + char *out; + gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", + polling_obj_type_string(p->po.type), p->po.group, p->epfd, + p->wakeup.read_fd); + return out; +} + static pollable g_empty_pollable; static void pollable_init(pollable *p, polling_obj_type type); @@ -472,7 +498,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { GPR_ASSERT(epfd != -1); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "add fd %p to pollable %p", fd, p); + gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } gpr_mu_lock(&fd->orphaned_mu); @@ -537,10 +563,18 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, if (worker->pollable != &pollset->pollable) { gpr_mu_lock(&worker->pollable->po.mu); } - if (worker->initialized_cv) { + if (worker->initialized_cv && worker != pollset->root_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", + pollset, worker, &pollset->pollable, worker->pollable); + } worker->kicked = true; gpr_cv_signal(&worker->cv); } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", + pollset, worker, &pollset->pollable, worker->pollable); + } append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup), "pollset_shutdown"); } @@ -770,7 +804,9 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int timeout = poll_deadline_to_millis_timeout(deadline, now); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout); + char *desc = pollable_desc(p); + gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout); + gpr_free(desc); } if (timeout != 0) { @@ -985,10 +1021,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; if (pollset->current_pollable == &g_empty_pollable) { - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); + } /* empty pollable --> single fd pollable */ pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &fd->pollable; @@ -997,16 +1034,23 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu); REF_BY(fd, 2, "pollset_pollable"); } else if (pollset->current_pollable == &pollset->pollable) { - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); + } append_error(&error, pollable_add_fd(pollset->current_pollable, fd), err_desc); } else if (pollset->current_pollable != &fd->pollable) { grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", pollset, fd, had_fd); + } + /* Introduce a spurious completion. + If we do not, then it may be that the fd-specific epoll set consumed + a completion without being polled, leading to a missed edge going up. */ + grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure); pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &pollset->pollable; if (append_error(&error, pollable_materialize(&pollset->pollable), diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index 3c4ca9c7c5..0d969dccce 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -855,24 +855,12 @@ static int fd_wrapped_fd(grpc_fd *fd) { static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason) { - bool is_fd_closed = false; grpc_error *error = GRPC_ERROR_NONE; polling_island *unref_pi = NULL; gpr_mu_lock(&fd->po.mu); fd->on_done_closure = on_done; - /* If release_fd is not NULL, we should be relinquishing control of the file - descriptor fd->fd (but we still own the grpc_fd structure). */ - if (release_fd != NULL) { - *release_fd = fd->fd; - } else { - close(fd->fd); - is_fd_closed = true; - } - - fd->orphaned = true; - /* Remove the active status but keep referenced. We want this grpc_fd struct to be alive (and not added to freelist) until the end of this function */ REF_BY(fd, 1, reason); @@ -887,13 +875,24 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, before doing this.) */ if (fd->po.pi != NULL) { polling_island *pi_latest = polling_island_lock(fd->po.pi); - polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error); + polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */, + &error); gpr_mu_unlock(&pi_latest->mu); unref_pi = fd->po.pi; fd->po.pi = NULL; } + /* If release_fd is not NULL, we should be relinquishing control of the file + descriptor fd->fd (but we still own the grpc_fd structure). */ + if (release_fd != NULL) { + *release_fd = fd->fd; + } else { + close(fd->fd); + } + + fd->orphaned = true; + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->po.mu); diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 2648df393d..cff77e641c 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -39,10 +39,11 @@ #include "src/core/lib/support/env.h" grpc_tracer_flag grpc_polling_trace = - GRPC_TRACER_INITIALIZER(false); /* Disabled by default */ + GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */ #ifndef NDEBUG -grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_fd_refcount = + GRPC_TRACER_INITIALIZER(false, "fd_refcount"); #endif /** Default poll() function - a pointer so that it can be overridden by some @@ -120,11 +121,15 @@ void grpc_set_event_engine_test_only( g_event_engine = ev_engine; } +const grpc_event_engine_vtable *grpc_get_event_engine_test_only() { + return g_event_engine; +} + /* Call this only after calling grpc_event_engine_init() */ const char *grpc_get_poll_strategy_name() { return g_poll_strategy_name; } void grpc_event_engine_init(void) { - grpc_register_tracer("polling", &grpc_polling_trace); + grpc_register_tracer(&grpc_polling_trace); char *s = gpr_getenv("GRPC_POLL_STRATEGY"); if (s == NULL) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 54c4f2ee11..0de7333843 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -153,7 +153,9 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; -/* This should be used for testing purposes ONLY */ +/* WARNING: The following two functions should be used for testing purposes + * ONLY */ void grpc_set_event_engine_test_only(const grpc_event_engine_vtable *); +const grpc_event_engine_vtable *grpc_get_event_engine_test_only(); #endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_windows.c b/src/core/lib/iomgr/ev_windows.c index 027609c7e8..c24dfaeaf7 100644 --- a/src/core/lib/iomgr/ev_windows.c +++ b/src/core/lib/iomgr/ev_windows.c @@ -23,6 +23,6 @@ #include "src/core/lib/debug/trace.h" grpc_tracer_flag grpc_polling_trace = - GRPC_TRACER_INITIALIZER(false); /* Disabled by default */ + GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */ #endif // GRPC_WINSOCK_SOCKET diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c index 43f5d0406e..f5875a247e 100644 --- a/src/core/lib/iomgr/iomgr_posix.c +++ b/src/core/lib/iomgr/iomgr_posix.c @@ -28,7 +28,7 @@ void grpc_iomgr_platform_init(void) { grpc_wakeup_fd_global_init(); grpc_event_engine_init(); - grpc_register_tracer("tcp", &grpc_tcp_trace); + grpc_register_tracer(&grpc_tcp_trace); } void grpc_iomgr_platform_flush(void) {} diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c index c484a39c54..ffec6bcf76 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -31,7 +31,7 @@ gpr_thd_id grpc_init_thread; void grpc_iomgr_platform_init(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_global_init(); - grpc_register_tracer("tcp", &grpc_tcp_trace); + grpc_register_tracer(&grpc_tcp_trace); grpc_executor_set_threading(&exec_ctx, false); grpc_init_thread = gpr_thd_currentid(); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index 47c8c79cc2..e59801834d 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -35,7 +35,8 @@ #include "src/core/lib/debug/trace.h" #ifndef NDEBUG -grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_fd_refcount = + GRPC_TRACER_INITIALIZER(false, "fd_refcount"); #endif struct grpc_pollset { diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 1bfc2a22a8..ea017a6054 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -31,7 +31,8 @@ #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) #ifndef NDEBUG -grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_fd_refcount = + GRPC_TRACER_INITIALIZER(false, "fd_refcount"); #endif gpr_mu grpc_polling_mu; diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index f2cc1be74e..a31d9eef93 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -29,7 +29,8 @@ #include "src/core/lib/iomgr/combiner.h" -grpc_tracer_flag grpc_resource_quota_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_resource_quota_trace = + GRPC_TRACER_INITIALIZER(false, "resource_quota"); #define MEMORY_USAGE_ESTIMATION_MAX 65536 diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 5de2b0f4ee..b6dcd15cb0 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -59,7 +59,7 @@ typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type; typedef size_t msg_iovlen_type; #endif -grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); typedef struct { grpc_endpoint base; diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 115297634b..9b1a8db723 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -38,7 +38,7 @@ #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" -grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); typedef struct { grpc_endpoint base; diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 6704a158ce..2cbb97403b 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -48,7 +48,7 @@ #define GRPC_FIONBIO FIONBIO #endif -grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); static grpc_error *set_non_block(SOCKET sock) { int status; diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index bf73d2c685..12efce241f 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -41,44 +41,67 @@ #define MIN_QUEUE_WINDOW_DURATION 0.01 #define MAX_QUEUE_WINDOW_DURATION 1 -grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false); -grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false); - +grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer"); +grpc_tracer_flag grpc_timer_check_trace = + GRPC_TRACER_INITIALIZER(false, "timer_check"); + +/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with + * deadlines earlier than 'queue_deadline" cap are maintained in the heap and + * others are maintained in the list (unordered). This helps to keep the number + * of elements in the heap low. + * + * The 'queue_deadline_cap' gets recomputed periodically based on the timer + * stats maintained in 'stats' and the relevant timers are then moved from the + * 'list' to 'heap' + */ typedef struct { gpr_mu mu; grpc_time_averaged_stats stats; /* All and only timers with deadlines <= this will be in the heap. */ gpr_atm queue_deadline_cap; + /* The deadline of the next timer due in this shard */ gpr_atm min_deadline; - /* Index in the g_shard_queue */ + /* Index of this timer_shard in the g_shard_queue */ uint32_t shard_queue_index; /* This holds all timers with deadlines < queue_deadline_cap. Timers in this list have the top bit of their deadline set to 0. */ grpc_timer_heap heap; /* This holds timers whose deadline is >= queue_deadline_cap. */ grpc_timer list; -} shard_type; +} timer_shard; + +/* Array of timer shards. Whenever a timer (grpc_timer *) is added, its address + * is hashed to select the timer shard to add the timer to */ +static timer_shard g_shards[NUM_SHARDS]; + +/* Maintains a sorted list of timer shards (sorted by their min_deadline, i.e + * the deadline of the next timer in each shard). + * Access to this is protected by g_shared_mutables.mu */ +static timer_shard *g_shard_queue[NUM_SHARDS]; + +/* Thread local variable that stores the deadline of the next timer the thread + * has last-seen. This is an optimization to prevent the thread from checking + * shared_mutables.min_timer (which requires acquiring shared_mutables.mu lock, + * an expensive operation) */ +GPR_TLS_DECL(g_last_seen_min_timer); struct shared_mutables { + /* The deadline of the next timer due across all timer shards */ gpr_atm min_timer; /* Allow only one run_some_expired_timers at once */ gpr_spinlock checker_mu; bool initialized; - /* Protects g_shard_queue */ + /* Protects g_shard_queue (and the shared_mutables struct itself) */ gpr_mu mu; } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); static struct shared_mutables g_shared_mutables = { .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false, }; + static gpr_clock_type g_clock_type; -static shard_type g_shards[NUM_SHARDS]; -/* Protected by g_shared_mutables.mu */ -static shard_type *g_shard_queue[NUM_SHARDS]; static gpr_timespec g_start_time; -GPR_TLS_DECL(g_last_seen_min_timer); - static gpr_atm saturating_add(gpr_atm a, gpr_atm b) { if (a > GPR_ATM_MAX - b) { return GPR_ATM_MAX; @@ -122,7 +145,7 @@ static gpr_timespec atm_to_timespec(gpr_atm x) { return gpr_time_add(g_start_time, dbl_to_ts((double)x / 1000.0)); } -static gpr_atm compute_min_deadline(shard_type *shard) { +static gpr_atm compute_min_deadline(timer_shard *shard) { return grpc_timer_heap_is_empty(&shard->heap) ? saturating_add(shard->queue_deadline_cap, 1) : grpc_timer_heap_top(&shard->heap)->deadline; @@ -138,11 +161,11 @@ void grpc_timer_list_init(gpr_timespec now) { g_shared_mutables.min_timer = timespec_to_atm_round_down(now); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); - grpc_register_tracer("timer", &grpc_timer_trace); - grpc_register_tracer("timer_check", &grpc_timer_check_trace); + grpc_register_tracer(&grpc_timer_trace); + grpc_register_tracer(&grpc_timer_check_trace); for (i = 0; i < NUM_SHARDS; i++) { - shard_type *shard = &g_shards[i]; + timer_shard *shard = &g_shards[i]; gpr_mu_init(&shard->mu); grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1, 0.5); @@ -161,7 +184,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { exec_ctx, GPR_ATM_MAX, NULL, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown")); for (i = 0; i < NUM_SHARDS; i++) { - shard_type *shard = &g_shards[i]; + timer_shard *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); grpc_timer_heap_destroy(&shard->heap); } @@ -187,7 +210,7 @@ static void list_remove(grpc_timer *timer) { } static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { - shard_type *temp; + timer_shard *temp; temp = g_shard_queue[first_shard_queue_index]; g_shard_queue[first_shard_queue_index] = g_shard_queue[first_shard_queue_index + 1]; @@ -198,7 +221,7 @@ static void swap_adjacent_shards_in_queue(uint32_t first_shard_queue_index) { first_shard_queue_index + 1; } -static void note_deadline_change(shard_type *shard) { +static void note_deadline_change(timer_shard *shard) { while (shard->shard_queue_index > 0 && shard->min_deadline < g_shard_queue[shard->shard_queue_index - 1]->min_deadline) { @@ -215,7 +238,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec deadline, grpc_closure *closure, gpr_timespec now) { int is_first_timer = 0; - shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; + timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); timer->closure = closure; @@ -303,7 +326,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { return; } - shard_type *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; + timer_shard *shard = &g_shards[GPR_HASH_POINTER(timer, NUM_SHARDS)]; gpr_mu_lock(&shard->mu); if (GRPC_TRACER_ON(grpc_timer_trace)) { gpr_log(GPR_DEBUG, "TIMER %p: CANCEL pending=%s", timer, @@ -321,12 +344,12 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { gpr_mu_unlock(&shard->mu); } -/* This is called when the queue is empty and "now" has reached the - queue_deadline_cap. We compute a new queue deadline and then scan the map - for timers that fall at or under it. Returns true if the queue is no - longer empty. +/* Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving + all relevant timers in shard->list (i.e timers with deadlines earlier than + 'queue_deadline_cap') into into shard->heap. + Returns 'true' if shard->heap has atleast ONE element REQUIRES: shard->mu locked */ -static int refill_queue(shard_type *shard, gpr_atm now) { +static int refill_heap(timer_shard *shard, gpr_atm now) { /* Compute the new queue window width and bound by the limits: */ double computed_deadline_delta = grpc_time_averaged_stats_update_average(&shard->stats) * @@ -363,7 +386,7 @@ static int refill_queue(shard_type *shard, gpr_atm now) { /* This pops the next non-cancelled timer with deadline <= now from the queue, or returns NULL if there isn't one. REQUIRES: shard->mu locked */ -static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { +static grpc_timer *pop_one(timer_shard *shard, gpr_atm now) { grpc_timer *timer; for (;;) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { @@ -373,7 +396,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { } if (grpc_timer_heap_is_empty(&shard->heap)) { if (now < shard->queue_deadline_cap) return NULL; - if (!refill_queue(shard, now)) return NULL; + if (!refill_heap(shard, now)) return NULL; } timer = grpc_timer_heap_top(&shard->heap); if (GRPC_TRACER_ON(grpc_timer_check_trace)) { @@ -393,7 +416,7 @@ static grpc_timer *pop_one(shard_type *shard, gpr_atm now) { } /* REQUIRES: shard->mu unlocked */ -static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, +static size_t pop_timers(grpc_exec_ctx *exec_ctx, timer_shard *shard, gpr_atm now, gpr_atm *new_min_deadline, grpc_error *error) { size_t n = 0; diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index cb7998db97..631f7935d9 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -56,7 +56,7 @@ static gpr_timespec g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; -static void timer_thread(void *unused); +static void timer_thread(void *completed_thread_ptr); static void gc_completed_threads(void) { if (g_completed_threads != NULL) { @@ -81,10 +81,17 @@ static void start_timer_thread_and_unlock(void) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "Spawn timer thread"); } - gpr_thd_id thd; gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&thd, timer_thread, NULL, &opt); + completed_thread *ct = gpr_malloc(sizeof(*ct)); + // The call to gpr_thd_new() has to be under the same lock used by + // gc_completed_threads(), particularly due to ct->t, which is written here + // (internally by gpr_thd_new) and read there. Otherwise it's possible for ct + // to leak through g_completed_threads and be freed in gc_completed_threads() + // before "&ct->t" is written to, causing a use-after-free. + gpr_mu_lock(&g_mu); + gpr_thd_new(&ct->t, timer_thread, ct, &opt); + gpr_mu_unlock(&g_mu); } void grpc_timer_manager_tick() { @@ -245,7 +252,7 @@ static void timer_main_loop(grpc_exec_ctx *exec_ctx) { } } -static void timer_thread_cleanup(void) { +static void timer_thread_cleanup(completed_thread *ct) { gpr_mu_lock(&g_mu); // terminate the thread: drop the waiter count, thread count, and let whomever // stopped the threading stuff know that we're done @@ -254,8 +261,6 @@ static void timer_thread_cleanup(void) { if (0 == g_thread_count) { gpr_cv_signal(&g_cv_shutdown); } - completed_thread *ct = gpr_malloc(sizeof(*ct)); - ct->t = gpr_thd_currentid(); ct->next = g_completed_threads; g_completed_threads = ct; gpr_mu_unlock(&g_mu); @@ -264,14 +269,14 @@ static void timer_thread_cleanup(void) { } } -static void timer_thread(void *unused) { +static void timer_thread(void *completed_thread_ptr) { // this threads exec_ctx: we try to run things through to completion here // since it's easy to spin up new threads grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); timer_main_loop(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx); - timer_thread_cleanup(); + timer_thread_cleanup(completed_thread_ptr); } static void start_threads(void) { diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index e004ff5a3a..ff2570c60d 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -29,8 +29,9 @@ #include <uv.h> -grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false); -grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer"); +grpc_tracer_flag grpc_timer_check_trace = + GRPC_TRACER_INITIALIZER(false, "timer_check"); static void timer_close_callback(uv_handle_t *handle) { gpr_free(handle); } diff --git a/src/core/lib/security/context/security_context.c b/src/core/lib/security/context/security_context.c index dffe6d2e91..8fff2c92c5 100644 --- a/src/core/lib/security/context/security_context.c +++ b/src/core/lib/security/context/security_context.c @@ -31,7 +31,7 @@ #ifndef NDEBUG grpc_tracer_flag grpc_trace_auth_context_refcount = - GRPC_TRACER_INITIALIZER(false); + GRPC_TRACER_INITIALIZER(false, "auth_context_refcount"); #endif /* --- grpc_call --- */ diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.c b/src/core/lib/security/credentials/jwt/jwt_credentials.c index 589a6f9407..4357657def 100644 --- a/src/core/lib/security/credentials/jwt/jwt_credentials.c +++ b/src/core/lib/security/credentials/jwt/jwt_credentials.c @@ -125,6 +125,13 @@ grpc_service_account_jwt_access_credentials_create_from_auth_json_key( gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &jwt_vtable; c->key = key; + gpr_timespec max_token_lifetime = grpc_max_auth_token_lifetime(); + if (gpr_time_cmp(token_lifetime, max_token_lifetime) > 0) { + gpr_log(GPR_INFO, + "Cropping token lifetime to maximum allowed value (%d secs).", + (int)max_token_lifetime.tv_sec); + token_lifetime = grpc_max_auth_token_lifetime(); + } c->jwt_lifetime = token_lifetime; gpr_mu_init(&c->cache_mu); jwt_reset_cache(exec_ctx, c); diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index f4ed81db1a..5e41b94ff8 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -60,7 +60,8 @@ typedef struct { gpr_refcount ref; } secure_endpoint; -grpc_tracer_flag grpc_trace_secure_endpoint = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_secure_endpoint = + GRPC_TRACER_INITIALIZER(false, "secure_endpoint"); static void destroy(grpc_exec_ctx *exec_ctx, secure_endpoint *secure_ep) { secure_endpoint *ep = secure_ep; diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 3c0c24254b..6788126769 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -45,7 +45,7 @@ #ifndef NDEBUG grpc_tracer_flag grpc_trace_security_connector_refcount = - GRPC_TRACER_INITIALIZER(false); + GRPC_TRACER_INITIALIZER(false, "security_connector_refcount"); #endif /* -- Constants. -- */ @@ -383,8 +383,7 @@ static void fake_channel_add_handshakers( grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_create_adapter_handshaker( - tsi_create_fake_handshaker(true /* is_client */)), + exec_ctx, tsi_create_fake_handshaker(true /* is_client */), &sc->base)); } @@ -394,8 +393,7 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx, grpc_handshake_manager_add( handshake_mgr, grpc_security_handshaker_create( - exec_ctx, tsi_create_adapter_handshaker( - tsi_create_fake_handshaker(false /* is_client */)), + exec_ctx, tsi_create_fake_handshaker(false /* is_client */), &sc->base)); } diff --git a/src/core/lib/support/log_linux.c b/src/core/lib/support/log_linux.c index 5c512661a3..61d2346427 100644 --- a/src/core/lib/support/log_linux.c +++ b/src/core/lib/support/log_linux.c @@ -64,6 +64,8 @@ void gpr_default_log(gpr_log_func_args *args) { time_t timer; gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); struct tm tm; + static __thread long tid = 0; + if (tid == 0) tid = gettid(); timer = (time_t)now.tv_sec; final_slash = strrchr(args->file, '/'); @@ -81,7 +83,7 @@ void gpr_default_log(gpr_log_func_args *args) { gpr_asprintf(&prefix, "%s%s.%09" PRId32 " %7ld %s:%d]", gpr_log_severity_string(args->severity), time_buffer, - now.tv_nsec, gettid(), display_file, args->line); + now.tv_nsec, tid, display_file, args->line); fprintf(stderr, "%-60s %s\n", prefix, args->message); gpr_free(prefix); diff --git a/src/core/lib/surface/api_trace.c b/src/core/lib/surface/api_trace.c index f88ffd57aa..56973303da 100644 --- a/src/core/lib/surface/api_trace.c +++ b/src/core/lib/surface/api_trace.c @@ -19,4 +19,4 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/debug/trace.h" -grpc_tracer_flag grpc_api_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_api_trace = GRPC_TRACER_INITIALIZER(false, "api"); diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index aac443ed06..2365d27307 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -229,8 +229,10 @@ struct grpc_call { void *saved_receiving_stream_ready_bctlp; }; -grpc_tracer_flag grpc_call_error_trace = GRPC_TRACER_INITIALIZER(false); -grpc_tracer_flag grpc_compression_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_call_error_trace = + GRPC_TRACER_INITIALIZER(false, "call_error"); +grpc_tracer_flag grpc_compression_trace = + GRPC_TRACER_INITIALIZER(false, "compression"); #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b04aee6c73..978d7b4171 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -35,10 +35,13 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/event_string.h" -grpc_tracer_flag grpc_trace_operation_failures = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_operation_failures = + GRPC_TRACER_INITIALIZER(false, "op_failure"); #ifndef NDEBUG -grpc_tracer_flag grpc_trace_pending_tags = GRPC_TRACER_INITIALIZER(false); -grpc_tracer_flag grpc_trace_cq_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_pending_tags = + GRPC_TRACER_INITIALIZER(false, "pending_tags"); +grpc_tracer_flag grpc_trace_cq_refcount = + GRPC_TRACER_INITIALIZER(false, "cq_refcount"); #endif typedef struct { @@ -189,16 +192,19 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; - size_t (*size)(); - void (*begin_op)(grpc_completion_queue *cc, void *tag); - void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, + size_t data_size; + void (*init)(void *data); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); + void (*destroy)(void *data); + void (*begin_op)(grpc_completion_queue *cq, void *tag); + void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); - grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline, + grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); - grpc_event (*pluck)(grpc_completion_queue *cc, void *tag, + grpc_event (*pluck)(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); } cq_vtable; @@ -218,25 +224,28 @@ typedef struct grpc_cq_event_queue { gpr_atm num_queue_items; } grpc_cq_event_queue; -/* TODO: sreek Refactor this based on the completion_type. Put completion-type - * specific data in a different structure (and co-allocate memory for it along - * with completion queue + pollset )*/ -typedef struct cq_data { - gpr_mu *mu; +typedef struct cq_next_data { + /** Completed events for completion-queues of type GRPC_CQ_NEXT */ + grpc_cq_event_queue queue; + /** Counter of how many things have ever been queued on this completion queue + useful for avoiding locks to check the queue */ + gpr_atm things_queued_ever; + + /* Number of outstanding events (+1 if not shut down) */ + gpr_atm pending_events; + + int shutdown_called; +} cq_next_data; + +typedef struct cq_pluck_data { /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; - /** Completed events for completion-queues of type GRPC_CQ_NEXT */ - grpc_cq_event_queue queue; - /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; - /** Once owning_refs drops to zero, we will destroy the cq */ - gpr_refcount owning_refs; - /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; @@ -245,37 +254,45 @@ typedef struct cq_data { gpr_atm shutdown; int shutdown_called; - int is_server_cq; - int num_pluckers; - int num_polls; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; - grpc_closure pollset_shutdown_done; +} cq_pluck_data; + +/* Completion queue structure */ +struct grpc_completion_queue { + /** Once owning_refs drops to zero, we will destroy the cq */ + gpr_refcount owning_refs; + + gpr_mu *mu; + + const cq_vtable *vtable; + const cq_poller_vtable *poller_vtable; #ifndef NDEBUG void **outstanding_tags; size_t outstanding_tag_count; size_t outstanding_tag_capacity; #endif -} cq_data; -/* Completion queue structure */ -struct grpc_completion_queue { - cq_data data; - const cq_vtable *vtable; - const cq_poller_vtable *poller_vtable; + grpc_closure pollset_shutdown_done; + int num_polls; }; /* Forward declarations */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc); - -static size_t cq_size(grpc_completion_queue *cc); - -static void cq_begin_op(grpc_completion_queue *cc, void *tag); +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); + +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -283,42 +300,56 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage); static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); -static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, +static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); -static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, +static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); +static void cq_init_next(void *data); +static void cq_init_pluck(void *data); +static void cq_destroy_next(void *data); +static void cq_destroy_pluck(void *data); + /* Completion queue vtables based on the completion-type */ static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ - {.cq_completion_type = GRPC_CQ_NEXT, - .size = cq_size, - .begin_op = cq_begin_op, + {.data_size = sizeof(cq_next_data), + .cq_completion_type = GRPC_CQ_NEXT, + .init = cq_init_next, + .shutdown = cq_shutdown_next, + .destroy = cq_destroy_next, + .begin_op = cq_begin_op_for_next, .end_op = cq_end_op_for_next, .next = cq_next, .pluck = NULL}, /* GRPC_CQ_PLUCK */ - {.cq_completion_type = GRPC_CQ_PLUCK, - .size = cq_size, - .begin_op = cq_begin_op, + {.data_size = sizeof(cq_pluck_data), + .cq_completion_type = GRPC_CQ_PLUCK, + .init = cq_init_pluck, + .shutdown = cq_shutdown_pluck, + .destroy = cq_destroy_pluck, + .begin_op = cq_begin_op_for_pluck, .end_op = cq_end_op_for_pluck, .next = NULL, .pluck = cq_pluck}, }; -#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) -#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1) +#define DATA_FROM_CQ(cq) ((void *)(cq + 1)) +#define POLLSET_FROM_CQ(cq) \ + ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq))) -grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true); -grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); +grpc_tracer_flag grpc_cq_pluck_trace = + GRPC_TRACER_INITIALIZER(true, "queue_pluck"); +grpc_tracer_flag grpc_cq_event_timeout_trace = + GRPC_TRACER_INITIALIZER(true, "queue_timeout"); #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event) \ if (GRPC_TRACER_ON(grpc_api_trace) && \ @@ -329,7 +360,7 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); gpr_free(_ev); \ } -static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, +static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, grpc_error *error); static void cq_event_queue_init(grpc_cq_event_queue *q) { @@ -342,9 +373,9 @@ static void cq_event_queue_destroy(grpc_cq_event_queue *q) { gpr_mpscq_destroy(&q->queue); } -static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { +static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c); - gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1); + return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0; } static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { @@ -367,16 +398,10 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) { return (long)gpr_atm_no_barrier_load(&q->num_queue_items); } -static size_t cq_size(grpc_completion_queue *cc) { - /* Size of the completion queue and the size of the pollset whose memory is - allocated right after that of completion queue */ - return sizeof(grpc_completion_queue) + cc->poller_vtable->size(); -} - grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { - grpc_completion_queue *cc; + grpc_completion_queue *cq; GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0); @@ -389,158 +414,173 @@ grpc_completion_queue *grpc_completion_queue_create_internal( const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); - cq_data *cqd = &cc->data; + cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + + poller_vtable->size()); - cc->vtable = vtable; - cc->poller_vtable = poller_vtable; + cq->vtable = vtable; + cq->poller_vtable = poller_vtable; - poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu); + /* One for destroy(), one for pollset_shutdown */ + gpr_ref_init(&cq->owning_refs, 2); -#ifndef NDEBUG - cqd->outstanding_tags = NULL; - cqd->outstanding_tag_capacity = 0; -#endif + poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); + vtable->init(DATA_FROM_CQ(cq)); + + GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, + grpc_schedule_on_exec_ctx); + + GPR_TIMER_END("grpc_completion_queue_create_internal", 0); + + return cq; +} +static void cq_init_next(void *ptr) { + cq_next_data *cqd = ptr; + /* Initial ref is dropped by grpc_completion_queue_shutdown */ + gpr_atm_no_barrier_store(&cqd->pending_events, 1); + cqd->shutdown_called = false; + gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); + cq_event_queue_init(&cqd->queue); +} + +static void cq_destroy_next(void *ptr) { + cq_next_data *cqd = ptr; + GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0); + cq_event_queue_destroy(&cqd->queue); +} + +static void cq_init_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cqd->pending_events, 1); - /* One for destroy(), one for pollset_shutdown */ - gpr_ref_init(&cqd->owning_refs, 2); cqd->completed_tail = &cqd->completed_head; cqd->completed_head.next = (uintptr_t)cqd->completed_tail; gpr_atm_no_barrier_store(&cqd->shutdown, 0); cqd->shutdown_called = 0; - cqd->is_server_cq = 0; cqd->num_pluckers = 0; - cqd->num_polls = 0; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); -#ifndef NDEBUG - cqd->outstanding_tag_count = 0; -#endif - cq_event_queue_init(&cqd->queue); - GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc, - grpc_schedule_on_exec_ctx); - - GPR_TIMER_END("grpc_completion_queue_create_internal", 0); +} - return cc; +static void cq_destroy_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; + GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); } -grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { - return cc->vtable->cq_completion_type; +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) { + return cq->vtable->cq_completion_type; } -int grpc_get_cq_poll_num(grpc_completion_queue *cc) { +int grpc_get_cq_poll_num(grpc_completion_queue *cq) { int cur_num_polls; - gpr_mu_lock(cc->data.mu); - cur_num_polls = cc->data.num_polls; - gpr_mu_unlock(cc->data.mu); + gpr_mu_lock(cq->mu); + cur_num_polls = cq->num_polls; + gpr_mu_unlock(cq->mu); return cur_num_polls; } #ifndef NDEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, +void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cc->data; if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { - gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count); + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val + 1, + "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1, reason); } #else -void grpc_cq_internal_ref(grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; +void grpc_cq_internal_ref(grpc_completion_queue *cq) { #endif - gpr_ref(&cqd->owning_refs); + gpr_ref(&cq->owning_refs); } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_completion_queue *cc = arg; - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy"); + grpc_completion_queue *cq = arg; + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy"); } #ifndef NDEBUG -void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cc->data; if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { - gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count); + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val - 1, + "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1, reason); } #else void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; + grpc_completion_queue *cq) { #endif - if (gpr_unref(&cqd->owning_refs)) { - GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); - cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc)); - cq_event_queue_destroy(&cqd->queue); + if (gpr_unref(&cq->owning_refs)) { + cq->vtable->destroy(DATA_FROM_CQ(cq)); + cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq)); #ifndef NDEBUG - gpr_free(cqd->outstanding_tags); + gpr_free(cq->outstanding_tags); #endif - gpr_free(cc); + gpr_free(cq); } } -static void cq_begin_op(grpc_completion_queue *cc, void *tag) { - cq_data *cqd = &cc->data; -#ifndef NDEBUG - gpr_mu_lock(cqd->mu); +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + GPR_ASSERT(!cqd->shutdown_called); + gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); +} + +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); - if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) { - cqd->outstanding_tag_capacity = - GPR_MAX(4, 2 * cqd->outstanding_tag_capacity); - cqd->outstanding_tags = - gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) * - cqd->outstanding_tag_capacity); - } - cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag; - gpr_mu_unlock(cqd->mu); -#endif gpr_ref(&cqd->pending_events); } -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { - cc->vtable->begin_op(cc, tag); +void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG + gpr_mu_lock(cq->mu); + if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { + cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); + cq->outstanding_tags = + gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * + cq->outstanding_tag_capacity); + } + cq->outstanding_tags[cq->outstanding_tag_count++] = tag; + gpr_mu_unlock(cq->mu); +#endif + cq->vtable->begin_op(cq, tag); } #ifndef NDEBUG -static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) { - cq_data *cqd = &cc->data; +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { int found = 0; if (lock_cq) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); } - for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) { - if (cqd->outstanding_tags[i] == tag) { - cqd->outstanding_tag_count--; - GPR_SWAP(void *, cqd->outstanding_tags[i], - cqd->outstanding_tags[cqd->outstanding_tag_count]); + for (int i = 0; i < (int)cq->outstanding_tag_count; i++) { + if (cq->outstanding_tags[i] == tag) { + cq->outstanding_tag_count--; + GPR_SWAP(void *, cq->outstanding_tags[i], + cq->outstanding_tags[cq->outstanding_tag_count]); found = 1; break; } } if (lock_cq) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } GPR_ASSERT(found); } #else -static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {} +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_NEXT) */ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -553,16 +593,16 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } - cq_data *cqd = &cc->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); storage->tag = tag; @@ -570,28 +610,42 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = (uintptr_t)(is_success); - cq_check_tag(cc, tag, true); /* Used in debug builds only */ + cq_check_tag(cq, tag, true); /* Used in debug builds only */ /* Add the completion to the queue */ - cq_event_queue_push(&cqd->queue, storage); + bool is_first = cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - - gpr_mu_lock(cqd->mu); - - int shutdown = gpr_unref(&cqd->pending_events); - if (!shutdown) { - grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); - gpr_mu_unlock(cqd->mu); - - if (kick_error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); - - GRPC_ERROR_UNREF(kick_error); + bool will_definitely_shutdown = + gpr_atm_no_barrier_load(&cqd->pending_events) == 1; + + if (!will_definitely_shutdown) { + /* Only kick if this is the first item queued */ + if (is_first) { + gpr_mu_lock(cq->mu); + grpc_error *kick_error = + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); + + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + GRPC_ERROR_UNREF(kick_error); + } + } + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } } else { - cq_finish_shutdown(exec_ctx, cc); - gpr_mu_unlock(cqd->mu); + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); @@ -599,16 +653,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_PLUCK) */ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); @@ -618,9 +673,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); @@ -632,8 +687,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); - gpr_mu_lock(cqd->mu); - cq_check_tag(cc, tag, false); /* Used in debug builds only */ + gpr_mu_lock(cq->mu); + cq_check_tag(cq, tag, false); /* Used in debug builds only */ /* Add to the list of completions */ gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); @@ -652,9 +707,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, } grpc_error *kick_error = - cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -663,8 +718,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } } else { - cq_finish_shutdown(exec_ctx, cc); - gpr_mu_unlock(cqd->mu); + cq_finish_shutdown_pluck(exec_ctx, cq); + gpr_mu_unlock(cq->mu); } GPR_TIMER_END("cq_end_op_for_pluck", 0); @@ -672,12 +727,12 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, +void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage); + cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage); } typedef struct { @@ -692,7 +747,7 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -703,7 +758,8 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { gpr_atm_no_barrier_load(&cqd->things_queued_ever); /* Pop a cq_completion from the queue. Returns NULL if the queue is empty - * might return NULL in some cases even if the queue is not empty; but that + * might return NULL in some cases even if the queue is not empty; but + * that * is ok and doesn't affect correctness. Might effect the tail latencies a * bit) */ a->stolen_completion = cq_event_queue_pop(&cqd->queue); @@ -716,58 +772,56 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { } #ifndef NDEBUG -static void dump_pending_tags(grpc_completion_queue *cc) { +static void dump_pending_tags(grpc_completion_queue *cq) { if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return; - cq_data *cqd = &cc->data; - gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); - gpr_mu_lock(cqd->mu); - for (size_t i = 0; i < cqd->outstanding_tag_count; i++) { + gpr_mu_lock(cq->mu); + for (size_t i = 0; i < cq->outstanding_tag_count; i++) { char *s; - gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]); + gpr_asprintf(&s, " %p", cq->outstanding_tags[i]); gpr_strvec_add(&v, s); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); gpr_free(out); } #else -static void dump_pending_tags(grpc_completion_queue *cc) {} +static void dump_pending_tags(grpc_completion_queue *cq) {} #endif -static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, +static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; gpr_timespec now; - cq_data *cqd = &cc->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); GRPC_API_TRACE( "grpc_completion_queue_next(" - "cc=%p, " + "cq=%p, " "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, + 5, (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); GPR_ASSERT(!reserved); - dump_pending_tags(cc); + dump_pending_tags(cq); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cc, "next"); + GRPC_CQ_INTERNAL_REF(cq, "next"); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), - .cq = cc, + .cq = cq, .deadline = deadline, .stolen_completion = NULL, .tag = NULL, @@ -800,21 +854,24 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, /* If c == NULL it means either the queue is empty OR in an transient inconsistent state. If it is the latter, we shold do a 0-timeout poll so that the thread comes back quickly from poll to make a second - attempt at popping. Not doing this can potentially deadlock this thread + attempt at popping. Not doing this can potentially deadlock this + thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); } } - if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ if (cq_event_queue_num_items(&cqd->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because - (cc->shutdown == true) is only possible when there is no pending work - (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion + (cq->shutdown == true) is only possible when there is no pending + work + (i.e cq->pending_events == 0) and any outstanding + grpc_cq_completion events are already queued on this cq */ continue; } @@ -828,16 +885,16 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } /* The main polling work happens in grpc_pollset_work */ - gpr_mu_lock(cqd->mu); - cqd->num_polls++; - grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + gpr_mu_lock(cq->mu); + cq->num_polls++; + grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), NULL, now, iteration_deadline); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); @@ -846,30 +903,74 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } is_finished_arg.first_loop = false; } - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next"); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); + if (cq_event_queue_num_items(&cqd->queue) > 0 && + gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { + gpr_mu_lock(cq->mu); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); + } + GPR_TIMER_END("grpc_completion_queue_next", 0); return ret; } -grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling + this function */ +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); + + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); +} + +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); + return; + } + cqd->shutdown_called = 1; + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_next(exec_ctx, cq); + } + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); +} + +grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { - return cc->vtable->next(cc, deadline, reserved); + return cq->vtable->next(cq, deadline, reserved); } -static int add_plucker(grpc_completion_queue *cc, void *tag, +static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -879,9 +980,9 @@ static int add_plucker(grpc_completion_queue *cc, void *tag, return 1; } -static void del_plucker(grpc_completion_queue *cc, void *tag, +static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { cqd->num_pluckers--; @@ -895,13 +996,13 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); grpc_cq_completion *c; @@ -913,51 +1014,51 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); a->stolen_completion = c; return true; } prev = c; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } -static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, +static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" - "cc=%p, tag=%p, " + "cq=%p, tag=%p, " "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec, + 6, (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); } GPR_ASSERT(!reserved); - dump_pending_tags(cc); + dump_pending_tags(cq); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(cqd->mu); + GRPC_CQ_INTERNAL_REF(cq, "pluck"); + gpr_mu_lock(cq->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), - .cq = cc, + .cq = cq, .deadline = deadline, .stolen_completion = NULL, .tag = tag, @@ -966,7 +1067,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -983,7 +1084,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -993,54 +1094,54 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, prev = c; } if (gpr_atm_no_barrier_load(&cqd->shutdown)) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } - if (!add_plucker(cc, tag, &worker)) { + if (!add_plucker(cq, tag, &worker)) { gpr_log(GPR_DEBUG, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cqd->mu); + del_plucker(cq, tag, &worker); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } - cqd->num_polls++; - grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + cq->num_polls++; + grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), &worker, now, deadline); if (err != GRPC_ERROR_NONE) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cqd->mu); + del_plucker(cq, tag, &worker); + gpr_mu_unlock(cq->mu); const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg); GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } is_finished_arg.first_loop = false; - del_plucker(cc, tag, &worker); + del_plucker(cq, tag, &worker); } done: - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck"); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); @@ -1049,85 +1150,66 @@ done: return ret; } -grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, +grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) { - return cc->vtable->pluck(cc, tag, deadline, reserved); + return cq->vtable->pluck(cq, tag, deadline, reserved); } -/* Finishes the completion queue shutdown. This means that there are no more - completion events / tags expected from the completion queue - - Must be called under completion queue lock - - Must be called only once in completion queue's lifetime - - grpc_completion_queue_shutdown() MUST have been called before calling - this function */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); gpr_atm_no_barrier_store(&cqd->shutdown, 1); - cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cqd->pollset_shutdown_done); + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); } -/* Shutdown simply drops a ref that we reserved at creation time; if we drop - to zero here, then enter shutdown mode and wake up any waiters */ -void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); - GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - cq_data *cqd = &cc->data; +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } cqd->shutdown_called = 1; if (gpr_unref(&cqd->pending_events)) { - cq_finish_shutdown(&exec_ctx, cc); + cq_finish_shutdown_pluck(exec_ctx, cq); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); +} + +/* Shutdown simply drops a ref that we reserved at creation time; if we drop + to zero here, then enter shutdown mode and wake up any waiters */ +void grpc_completion_queue_shutdown(grpc_completion_queue *cq) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); + GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); + cq->vtable->shutdown(&exec_ctx, cq); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } -void grpc_completion_queue_destroy(grpc_completion_queue *cc) { - GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); +void grpc_completion_queue_destroy(grpc_completion_queue *cq) { + GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); - grpc_completion_queue_shutdown(cc); - - /* TODO (sreek): This should not ideally be here. Refactor it into the - * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ - if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) { - GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0); - } + grpc_completion_queue_shutdown(cq); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy"); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } -grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; -} - -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { - return CQ_FROM_POLLSET(ps); -} - -void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { - cc->data.is_server_cq = 1; -} - -bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { - return cc->data.is_server_cq; +grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) { + return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL; } -bool grpc_cq_can_listen(grpc_completion_queue *cc) { - return cc->poller_vtable->can_listen; +bool grpc_cq_can_listen(grpc_completion_queue *cq) { + return cq->poller_vtable->can_listen; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 97ea9cae20..af44482513 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -84,10 +84,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); -void grpc_cq_mark_server_cq(grpc_completion_queue *cc); -bool grpc_cq_is_server_cq(grpc_completion_queue *cc); bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 14a86bfa0a..db111e597f 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -120,29 +120,27 @@ void grpc_init(void) { grpc_slice_intern_init(); grpc_mdctx_global_init(); grpc_channel_init_init(); - grpc_register_tracer("api", &grpc_api_trace); - grpc_register_tracer("channel", &grpc_trace_channel); - grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace); - grpc_register_tracer("channel_stack_builder", - &grpc_trace_channel_stack_builder); - grpc_register_tracer("http1", &grpc_http1_trace); - grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace); // default on - grpc_register_tracer("combiner", &grpc_combiner_trace); - grpc_register_tracer("server_channel", &grpc_server_channel_trace); - grpc_register_tracer("bdp_estimator", &grpc_bdp_estimator_trace); - grpc_register_tracer("queue_timeout", - &grpc_cq_event_timeout_trace); // default on - grpc_register_tracer("op_failure", &grpc_trace_operation_failures); - grpc_register_tracer("resource_quota", &grpc_resource_quota_trace); - grpc_register_tracer("call_error", &grpc_call_error_trace); + grpc_register_tracer(&grpc_api_trace); + grpc_register_tracer(&grpc_trace_channel); + grpc_register_tracer(&grpc_connectivity_state_trace); + grpc_register_tracer(&grpc_trace_channel_stack_builder); + grpc_register_tracer(&grpc_http1_trace); + grpc_register_tracer(&grpc_cq_pluck_trace); // default on + grpc_register_tracer(&grpc_combiner_trace); + grpc_register_tracer(&grpc_server_channel_trace); + grpc_register_tracer(&grpc_bdp_estimator_trace); + grpc_register_tracer(&grpc_cq_event_timeout_trace); // default on + grpc_register_tracer(&grpc_trace_operation_failures); + grpc_register_tracer(&grpc_resource_quota_trace); + grpc_register_tracer(&grpc_call_error_trace); #ifndef NDEBUG - grpc_register_tracer("pending_tags", &grpc_trace_pending_tags); - grpc_register_tracer("queue_refcount", &grpc_trace_cq_refcount); - grpc_register_tracer("closure", &grpc_trace_closure); - grpc_register_tracer("error_refcount", &grpc_trace_error_refcount); - grpc_register_tracer("stream_refcount", &grpc_trace_stream_refcount); - grpc_register_tracer("fd_refcount", &grpc_trace_fd_refcount); - grpc_register_tracer("metadata", &grpc_trace_metadata); + grpc_register_tracer(&grpc_trace_pending_tags); + grpc_register_tracer(&grpc_trace_cq_refcount); + grpc_register_tracer(&grpc_trace_closure); + grpc_register_tracer(&grpc_trace_error_refcount); + grpc_register_tracer(&grpc_trace_stream_refcount); + grpc_register_tracer(&grpc_trace_fd_refcount); + grpc_register_tracer(&grpc_trace_metadata); #endif grpc_security_pre_init(); grpc_iomgr_init(&exec_ctx); diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c index 7dbea581d0..2366c24910 100644 --- a/src/core/lib/surface/init_secure.c +++ b/src/core/lib/surface/init_secure.c @@ -37,13 +37,11 @@ #endif void grpc_security_pre_init(void) { - grpc_register_tracer("secure_endpoint", &grpc_trace_secure_endpoint); - grpc_register_tracer("transport_security", &tsi_tracing_enabled); + grpc_register_tracer(&grpc_trace_secure_endpoint); + grpc_register_tracer(&tsi_tracing_enabled); #ifndef NDEBUG - grpc_register_tracer("auth_context_refcount", - &grpc_trace_auth_context_refcount); - grpc_register_tracer("security_connector_refcount", - &grpc_trace_security_connector_refcount); + grpc_register_tracer(&grpc_trace_auth_context_refcount); + grpc_register_tracer(&grpc_trace_security_connector_refcount); #endif } diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 0cd436883a..fce7f8dca1 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -58,7 +58,8 @@ typedef struct registered_method registered_method; typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; -grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_server_channel_trace = + GRPC_TRACER_INITIALIZER(false, "server_channel"); typedef struct requested_call { requested_call_type type; @@ -975,8 +976,6 @@ static void register_completion_queue(grpc_server *server, if (server->cqs[i] == cq) return; } - grpc_cq_mark_server_cq(cq); - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -1156,9 +1155,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, chand->channel = channel; size_t cq_idx; - grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset); for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { - if (s->cqs[cq_idx] == accepting_cq) break; + if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break; } if (cq_idx == s->cq_count) { /* completion queue not found: pick a random one to publish new calls to */ diff --git a/src/core/lib/transport/bdp_estimator.c b/src/core/lib/transport/bdp_estimator.c index d33e3a48ab..311ae6390d 100644 --- a/src/core/lib/transport/bdp_estimator.c +++ b/src/core/lib/transport/bdp_estimator.c @@ -23,7 +23,8 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> -grpc_tracer_flag grpc_bdp_estimator_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_bdp_estimator_trace = + GRPC_TRACER_INITIALIZER(false, "bdp_estimator"); void grpc_bdp_estimator_init(grpc_bdp_estimator *estimator, const char *name) { estimator->estimate = 65536; diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 6fe40af3b2..73a9178ae2 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -24,7 +24,8 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -grpc_tracer_flag grpc_connectivity_state_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_connectivity_state_trace = + GRPC_TRACER_INITIALIZER(false, "connectivity_state"); const char *grpc_connectivity_state_name(grpc_connectivity_state state) { switch (state) { diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c index 87a2abf344..2fea366072 100644 --- a/src/core/lib/transport/metadata.c +++ b/src/core/lib/transport/metadata.c @@ -48,7 +48,8 @@ */ #ifndef NDEBUG -grpc_tracer_flag grpc_trace_metadata = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_metadata = + GRPC_TRACER_INITIALIZER(false, "metadata"); #define DEBUG_ARGS , const char *file, int line #define FWD_DEBUG_ARGS , file, line #define REF_MD_LOCKED(shard, s) ref_md_locked((shard), (s), __FILE__, __LINE__) diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 6a9eba110d..7281602d66 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -32,7 +32,8 @@ #include "src/core/lib/transport/transport_impl.h" #ifndef NDEBUG -grpc_tracer_flag grpc_trace_stream_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_stream_refcount = + GRPC_TRACER_INITIALIZER(false, "stream_refcount"); #endif #ifndef NDEBUG |