diff options
Diffstat (limited to 'src')
27 files changed, 1063 insertions, 390 deletions
diff --git a/src/compiler/config.h b/src/compiler/config.h index ba44cd8a31..fd1400cd24 100644 --- a/src/compiler/config.h +++ b/src/compiler/config.h @@ -96,4 +96,11 @@ typedef GRPC_CUSTOM_STRINGOUTPUTSTREAM StringOutputStream; } // namespace protobuf } // namespace grpc +namespace grpc_cpp_generator { + +static const char* const kCppGeneratorMessageHeaderExt = ".pb.h"; +static const char* const kCppGeneratorServiceHeaderExt = ".grpc.pb.h"; + +} // namespace grpc_cpp_generator + #endif // SRC_COMPILER_CONFIG_H diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index a1a0258c68..7a2c44fd46 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -40,9 +40,6 @@ namespace grpc_cpp_generator { namespace { -grpc::string message_header_ext() { return ".pb.h"; } -grpc::string service_header_ext() { return ".grpc.pb.h"; } - template <class T> grpc::string as_string(T x) { std::ostringstream out; @@ -113,7 +110,7 @@ grpc::string GetHeaderPrologue(grpc_generator::File *file, vars["filename"] = file->filename(); vars["filename_identifier"] = FilenameIdentifier(file->filename()); vars["filename_base"] = file->filename_without_ext(); - vars["message_header_ext"] = message_header_ext(); + vars["message_header_ext"] = kCppGeneratorMessageHeaderExt; printer->Print(vars, "// Generated by the gRPC C++ plugin.\n"); printer->Print(vars, @@ -128,6 +125,7 @@ grpc::string GetHeaderPrologue(grpc_generator::File *file, printer->Print(vars, "#define GRPC_$filename_identifier$__INCLUDED\n"); printer->Print(vars, "\n"); printer->Print(vars, "#include \"$filename_base$$message_header_ext$\"\n"); + printer->Print(vars, file->additional_headers().c_str()); printer->Print(vars, "\n"); } return output; @@ -1039,8 +1037,8 @@ grpc::string GetSourcePrologue(grpc_generator::File *file, vars["filename"] = file->filename(); vars["filename_base"] = file->filename_without_ext(); - vars["message_header_ext"] = message_header_ext(); - vars["service_header_ext"] = service_header_ext(); + vars["message_header_ext"] = kCppGeneratorMessageHeaderExt; + vars["service_header_ext"] = kCppGeneratorServiceHeaderExt; printer->Print(vars, "// Generated by the gRPC C++ plugin.\n"); printer->Print(vars, @@ -1049,7 +1047,6 @@ grpc::string GetSourcePrologue(grpc_generator::File *file, printer->Print(vars, "#include \"$filename_base$$message_header_ext$\"\n"); printer->Print(vars, "#include \"$filename_base$$service_header_ext$\"\n"); - printer->Print(vars, file->additional_headers().c_str()); printer->Print(vars, "\n"); } return output; @@ -1425,8 +1422,8 @@ grpc::string GetMockPrologue(grpc_generator::File *file, vars["filename"] = file->filename(); vars["filename_base"] = file->filename_without_ext(); - vars["message_header_ext"] = message_header_ext(); - vars["service_header_ext"] = service_header_ext(); + vars["message_header_ext"] = kCppGeneratorMessageHeaderExt; + vars["service_header_ext"] = kCppGeneratorServiceHeaderExt; printer->Print(vars, "// Generated by the gRPC C++ plugin.\n"); printer->Print(vars, diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c new file mode 100644 index 0000000000..7fb75e3a4f --- /dev/null +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c @@ -0,0 +1,223 @@ +// +// Copyright 2017, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#include "src/core/ext/filters/workarounds/workaround_cronet_compression_filter.h" + +#include <string.h> + +#include <grpc/support/alloc.h> + +#include "src/core/ext/filters/workarounds/workaround_utils.h" +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/surface/channel_init.h" +#include "src/core/lib/transport/metadata.h" + +typedef struct call_data { + // Receive closures are chained: we inject this closure as the + // recv_initial_metadata_ready up-call on transport_stream_op, and remember to + // call our next_recv_initial_metadata_ready member after handling it. + grpc_closure recv_initial_metadata_ready; + // Used by recv_initial_metadata_ready. + grpc_metadata_batch* recv_initial_metadata; + // Original recv_initial_metadata_ready callback, invoked after our own. + grpc_closure* next_recv_initial_metadata_ready; + + // Marks whether the workaround is active + bool workaround_active; +} call_data; + +// Find the user agent metadata element in the batch +static bool get_user_agent_mdelem(const grpc_metadata_batch* batch, + grpc_mdelem* md) { + if (batch->idx.named.user_agent != NULL) { + *md = batch->idx.named.user_agent->md; + return true; + } + return false; +} + +// Callback invoked when we receive an initial metadata. +static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, + void* user_data, grpc_error* error) { + grpc_call_element* elem = user_data; + call_data* calld = elem->call_data; + + if (GRPC_ERROR_NONE == error) { + grpc_mdelem md; + if (get_user_agent_mdelem(calld->recv_initial_metadata, &md)) { + grpc_workaround_user_agent_md* user_agent_md = grpc_parse_user_agent(md); + if (user_agent_md + ->workaround_active[GRPC_WORKAROUND_ID_CRONET_COMPRESSION]) { + calld->workaround_active = true; + } + } + } + + // Invoke the next callback. + grpc_closure_run(exec_ctx, calld->next_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); +} + +// Start transport stream op. +static void start_transport_stream_op_batch( + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* op) { + call_data* calld = elem->call_data; + + // Inject callback for receiving initial metadata + if (op->recv_initial_metadata) { + calld->next_recv_initial_metadata_ready = + op->payload->recv_initial_metadata.recv_initial_metadata_ready; + op->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; + calld->recv_initial_metadata = + op->payload->recv_initial_metadata.recv_initial_metadata; + } + + if (op->send_message) { + /* Send message happens after client's user-agent (initial metadata) is + * received, so workaround_active must be set already */ + if (calld->workaround_active) { + op->payload->send_message.send_message->flags |= GRPC_WRITE_NO_COMPRESS; + } + } + + // Chain to the next filter. + grpc_call_next_op(exec_ctx, elem, op); +} + +// Constructor for call_data. +static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + const grpc_call_element_args* args) { + call_data* calld = elem->call_data; + calld->next_recv_initial_metadata_ready = NULL; + calld->workaround_active = false; + grpc_closure_init(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem, + grpc_schedule_on_exec_ctx); + return GRPC_ERROR_NONE; +} + +// Destructor for call_data. +static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* ignored) {} + +// Constructor for channel_data. +static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { + return GRPC_ERROR_NONE; +} + +// Destructor for channel_data. +static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) {} + +// Parse the user agent +static bool parse_user_agent(grpc_mdelem md) { + const char grpc_objc_specifier[] = "grpc-objc/"; + const size_t grpc_objc_specifier_len = sizeof(grpc_objc_specifier) - 1; + const char cronet_specifier[] = "cronet_http"; + const size_t cronet_specifier_len = sizeof(cronet_specifier) - 1; + + char* user_agent_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + bool grpc_objc_specifier_seen = false; + bool cronet_specifier_seen = false; + char *major_version_str = user_agent_str, *minor_version_str; + long major_version, minor_version; + + char* head = strtok(user_agent_str, " "); + while (head != NULL) { + if (!grpc_objc_specifier_seen && + 0 == strncmp(head, grpc_objc_specifier, grpc_objc_specifier_len)) { + major_version_str = head + grpc_objc_specifier_len; + grpc_objc_specifier_seen = true; + } else if (grpc_objc_specifier_seen && + 0 == strncmp(head, cronet_specifier, cronet_specifier_len)) { + cronet_specifier_seen = true; + break; + } + + head = strtok(NULL, " "); + } + if (grpc_objc_specifier_seen) { + major_version_str = strtok(major_version_str, "."); + minor_version_str = strtok(NULL, "."); + major_version = atol(major_version_str); + minor_version = atol(minor_version_str); + } + + gpr_free(user_agent_str); + return (grpc_objc_specifier_seen && cronet_specifier_seen && + (major_version < 1 || (major_version == 1 && minor_version <= 3))); +} + +const grpc_channel_filter grpc_workaround_cronet_compression_filter = { + start_transport_stream_op_batch, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + 0, + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "workaround_cronet_compression"}; + +static bool register_workaround_cronet_compression( + grpc_exec_ctx* exec_ctx, grpc_channel_stack_builder* builder, void* arg) { + const grpc_channel_args* channel_args = + grpc_channel_stack_builder_get_channel_arguments(builder); + const grpc_arg* a = grpc_channel_args_find( + channel_args, GRPC_ARG_WORKAROUND_CRONET_COMPRESSION); + if (a == NULL) { + return true; + } + if (grpc_channel_arg_get_bool(a, false) == false) { + return true; + } + return grpc_channel_stack_builder_prepend_filter( + builder, &grpc_workaround_cronet_compression_filter, NULL, NULL); +} + +void grpc_workaround_cronet_compression_filter_init(void) { + grpc_channel_init_register_stage( + GRPC_SERVER_CHANNEL, GRPC_WORKAROUND_PRIORITY_HIGH, + register_workaround_cronet_compression, NULL); + grpc_register_workaround(GRPC_WORKAROUND_ID_CRONET_COMPRESSION, + parse_user_agent); +} + +void grpc_workaround_cronet_compression_filter_shutdown(void) {} diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.h b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.h new file mode 100644 index 0000000000..58c79a0c00 --- /dev/null +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.h @@ -0,0 +1,40 @@ +// +// Copyright 2017, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_CRONET_COMPRESSION_FILTER_H +#define GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_CRONET_COMPRESSION_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_workaround_cronet_compression_filter; + +#endif /* GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_CRONET_COMPRESSION_FILTER_H \ + */ diff --git a/src/core/ext/filters/workarounds/workaround_utils.c b/src/core/ext/filters/workarounds/workaround_utils.c new file mode 100644 index 0000000000..1c565388e1 --- /dev/null +++ b/src/core/ext/filters/workarounds/workaround_utils.c @@ -0,0 +1,65 @@ +// +// Copyright 2017, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#include "src/core/ext/filters/workarounds/workaround_utils.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +user_agent_parser ua_parser[GRPC_MAX_WORKAROUND_ID]; + +static void destroy_user_agent_md(void *user_agent_md) { + gpr_free(user_agent_md); +} + +grpc_workaround_user_agent_md *grpc_parse_user_agent(grpc_mdelem md) { + grpc_workaround_user_agent_md *user_agent_md = + (grpc_workaround_user_agent_md *)grpc_mdelem_get_user_data( + md, destroy_user_agent_md); + + if (NULL != user_agent_md) { + return user_agent_md; + } + user_agent_md = gpr_malloc(sizeof(grpc_workaround_user_agent_md)); + for (int i = 0; i < GRPC_MAX_WORKAROUND_ID; i++) { + if (ua_parser[i]) { + user_agent_md->workaround_active[i] = ua_parser[i](md); + } + } + grpc_mdelem_set_user_data(md, destroy_user_agent_md, (void *)user_agent_md); + + return user_agent_md; +} + +void grpc_register_workaround(uint32_t id, user_agent_parser parser) { + GPR_ASSERT(id < GRPC_MAX_WORKAROUND_ID); + ua_parser[id] = parser; +} diff --git a/src/core/ext/filters/workarounds/workaround_utils.h b/src/core/ext/filters/workarounds/workaround_utils.h new file mode 100644 index 0000000000..7cd70c12d8 --- /dev/null +++ b/src/core/ext/filters/workarounds/workaround_utils.h @@ -0,0 +1,52 @@ +// +// Copyright 2017, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#ifndef GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_UTILS_H +#define GRPC_CORE_EXT_FILTERS_WORKAROUNDS_WORKAROUND_UTILS_H + +#include <grpc/support/workaround_list.h> + +#include "src/core/lib/transport/metadata.h" + +#define GRPC_WORKAROUND_PRIORITY_HIGH 10001 +#define GRPC_WORKAROUND_PROIRITY_LOW 9999 + +typedef struct grpc_workaround_user_agent_md { + bool workaround_active[GRPC_MAX_WORKAROUND_ID]; +} grpc_workaround_user_agent_md; + +grpc_workaround_user_agent_md *grpc_parse_user_agent(grpc_mdelem md); + +typedef bool (*user_agent_parser)(grpc_mdelem); + +void grpc_register_workaround(uint32_t id, user_agent_parser parser); + +#endif diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 238d176dfa..247b134938 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -31,6 +31,8 @@ * */ +#include <grpc/support/port_platform.h> + #include <limits.h> #include <string.h> diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 5f2c989aad..685581b5cb 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -769,7 +769,7 @@ grpc_error *grpc_os_error(const char *file, int line, int err, GRPC_ERROR_INT_ERRNO, err), GRPC_ERROR_STR_OS_ERROR, grpc_slice_from_static_string(strerror(err))), - GRPC_ERROR_STR_SYSCALL, grpc_slice_from_static_string(call_name)); + GRPC_ERROR_STR_SYSCALL, grpc_slice_from_copied_string(call_name)); } #ifdef GPR_WINDOWS diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c index 1015cc6776..822abd075d 100644 --- a/src/core/lib/support/mpscq.c +++ b/src/core/lib/support/mpscq.c @@ -46,11 +46,12 @@ void gpr_mpscq_destroy(gpr_mpscq *q) { GPR_ASSERT(q->tail == &q->stub); } -void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { +bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_mpscq_node *prev = (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); gpr_atm_rel_store(&prev->next, (gpr_atm)n); + return prev == &q->stub; } gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { @@ -92,3 +93,25 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) { *empty = false; return NULL; } + +void gpr_locked_mpscq_init(gpr_locked_mpscq *q) { + gpr_mpscq_init(&q->queue); + q->read_lock = GPR_SPINLOCK_INITIALIZER; +} + +void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) { + gpr_mpscq_destroy(&q->queue); +} + +bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) { + return gpr_mpscq_push(&q->queue, n); +} + +gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) { + if (gpr_spinlock_trylock(&q->read_lock)) { + gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue); + gpr_spinlock_unlock(&q->read_lock); + return n; + } + return NULL; +} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h index 24c89f90c9..b3a171678a 100644 --- a/src/core/lib/support/mpscq.h +++ b/src/core/lib/support/mpscq.h @@ -37,6 +37,7 @@ #include <grpc/support/atm.h> #include <stdbool.h> #include <stddef.h> +#include "src/core/lib/support/spinlock.h" // Multiple-producer single-consumer lock free queue, based upon the // implementation from Dmitry Vyukov here: @@ -58,12 +59,34 @@ typedef struct gpr_mpscq { void gpr_mpscq_init(gpr_mpscq *q); void gpr_mpscq_destroy(gpr_mpscq *q); // Push a node -void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); +// Thread safe - can be called from multiple threads concurrently +// Returns true if this was possibly the first node (may return true +// sporadically, will not return false sporadically) +bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); // Pop a node (returns NULL if no node is ready - which doesn't indicate that // the queue is empty!!) +// Thread compatible - can only be called from one thread at a time gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); - // Pop a node; sets *empty to true if the queue is empty, or false if it is not gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty); +// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing +// only one thread will succeed concurrently +typedef struct gpr_locked_mpscq { + gpr_mpscq queue; + gpr_spinlock read_lock; +} gpr_locked_mpscq; + +void gpr_locked_mpscq_init(gpr_locked_mpscq *q); +void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q); +// Push a node +// Thread safe - can be called from multiple threads concurrently +// Returns true if this was possibly the first node (may return true +// sporadically, will not return false sporadically) +bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n); +// Pop a node (returns NULL if no node is ready - which doesn't indicate that +// the queue is empty!!) +// Thread safe - can be called from multiple threads concurrently +gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q); + #endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index df5b70205c..b0a4b1fbcc 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -30,7 +30,6 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ - #include "src/core/lib/surface/completion_queue.h" #include <stdio.h> @@ -45,6 +44,7 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -201,33 +201,68 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { .destroy = non_polling_poller_destroy}, }; -/* Completion queue structure */ -struct grpc_completion_queue { - /** owned by pollset */ +typedef struct cq_vtable { + grpc_cq_completion_type cq_completion_type; + size_t (*size)(); + void (*begin_op)(grpc_completion_queue *cc, void *tag); + void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); + grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline, + void *reserved); + grpc_event (*pluck)(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved); +} cq_vtable; + +/* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue + * (a lockfree multiproducer single consumer queue). It uses a queue_lock + * to support multiple consumers. + * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */ +typedef struct grpc_cq_event_queue { + /* Spinlock to serialize consumers i.e pop() operations */ + gpr_spinlock queue_lock; + + gpr_mpscq queue; + + /* A lazy counter of number of items in the queue. This is NOT atomically + incremented/decremented along with push/pop operations and hence is only + eventually consistent */ + gpr_atm num_queue_items; +} grpc_cq_event_queue; + +/* TODO: sreek Refactor this based on the completion_type. Put completion-type + * specific data in a different structure (and co-allocate memory for it along + * with completion queue + pollset )*/ +typedef struct cq_data { gpr_mu *mu; - grpc_cq_completion_type completion_type; - - const cq_poller_vtable *poller_vtable; - - /** completed events */ + /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; + + /** Completed events for completion-queues of type GRPC_CQ_NEXT */ + grpc_cq_event_queue queue; + /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; + /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /** counter of how many things have ever been queued on this completion queue + + /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; + /** 0 initially, 1 once we've begun shutting down */ - int shutdown; + gpr_atm shutdown; int shutdown_called; + int is_server_cq; - /** Can the server cq accept incoming channels */ - /* TODO: sreek - This will no longer be needed. Use polling_type set */ - int is_non_listening_server_cq; + int num_pluckers; + int num_polls; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; grpc_closure pollset_shutdown_done; @@ -236,8 +271,61 @@ struct grpc_completion_queue { size_t outstanding_tag_count; size_t outstanding_tag_capacity; #endif +} cq_data; + +/* Completion queue structure */ +struct grpc_completion_queue { + cq_data data; + const cq_vtable *vtable; + const cq_poller_vtable *poller_vtable; +}; - grpc_completion_queue *next_free; +/* Forward declarations */ +static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc); + +static size_t cq_size(grpc_completion_queue *cc); + +static void cq_begin_op(grpc_completion_queue *cc, void *tag); + +static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); + +static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage); + +static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, + void *reserved); + +static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved); + +/* Completion queue vtables based on the completion-type */ +static const cq_vtable g_cq_vtable[] = { + /* GRPC_CQ_NEXT */ + {.cq_completion_type = GRPC_CQ_NEXT, + .size = cq_size, + .begin_op = cq_begin_op, + .end_op = cq_end_op_for_next, + .next = cq_next, + .pluck = NULL}, + /* GRPC_CQ_PLUCK */ + {.cq_completion_type = GRPC_CQ_PLUCK, + .size = cq_size, + .begin_op = cq_begin_op, + .end_op = cq_end_op_for_pluck, + .next = NULL, + .pluck = cq_pluck}, }; #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) @@ -258,6 +346,47 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, grpc_error *error); +static void cq_event_queue_init(grpc_cq_event_queue *q) { + gpr_mpscq_init(&q->queue); + q->queue_lock = GPR_SPINLOCK_INITIALIZER; + gpr_atm_no_barrier_store(&q->num_queue_items, 0); +} + +static void cq_event_queue_destroy(grpc_cq_event_queue *q) { + gpr_mpscq_destroy(&q->queue); +} + +static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { + gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c); + gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1); +} + +static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { + grpc_cq_completion *c = NULL; + if (gpr_spinlock_trylock(&q->queue_lock)) { + c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); + gpr_spinlock_unlock(&q->queue_lock); + } + + if (c) { + gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); + } + + return c; +} + +/* Note: The counter is not incremented/decremented atomically with push/pop. + * The count is only eventually consistent */ +static long cq_event_queue_num_items(grpc_cq_event_queue *q) { + return (long)gpr_atm_no_barrier_load(&q->num_queue_items); +} + +static size_t cq_size(grpc_completion_queue *cc) { + /* Size of the completion queue and the size of the pollset whose memory is + allocated right after that of completion queue */ + return sizeof(grpc_completion_queue) + cc->poller_vtable->size(); +} + grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { @@ -270,35 +399,40 @@ grpc_completion_queue *grpc_completion_queue_create_internal( "polling_type=%d)", 2, (completion_type, polling_type)); + const cq_vtable *vtable = &g_cq_vtable[completion_type]; const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); - poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu); -#ifndef NDEBUG - cc->outstanding_tags = NULL; - cc->outstanding_tag_capacity = 0; -#endif + cq_data *cqd = &cc->data; - cc->completion_type = completion_type; + cc->vtable = vtable; cc->poller_vtable = poller_vtable; + poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu); + +#ifndef NDEBUG + cqd->outstanding_tags = NULL; + cqd->outstanding_tag_capacity = 0; +#endif + /* Initial ref is dropped by grpc_completion_queue_shutdown */ - gpr_ref_init(&cc->pending_events, 1); + gpr_ref_init(&cqd->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ - gpr_ref_init(&cc->owning_refs, 2); - cc->completed_tail = &cc->completed_head; - cc->completed_head.next = (uintptr_t)cc->completed_tail; - cc->shutdown = 0; - cc->shutdown_called = 0; - cc->is_server_cq = 0; - cc->is_non_listening_server_cq = 0; - cc->num_pluckers = 0; - gpr_atm_no_barrier_store(&cc->things_queued_ever, 0); + gpr_ref_init(&cqd->owning_refs, 2); + cqd->completed_tail = &cqd->completed_head; + cqd->completed_head.next = (uintptr_t)cqd->completed_tail; + gpr_atm_no_barrier_store(&cqd->shutdown, 0); + cqd->shutdown_called = 0; + cqd->is_server_cq = 0; + cqd->num_pluckers = 0; + cqd->num_polls = 0; + gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); #ifndef NDEBUG - cc->outstanding_tag_count = 0; + cqd->outstanding_tag_count = 0; #endif - grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, + cq_event_queue_init(&cqd->queue); + grpc_closure_init(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc, grpc_schedule_on_exec_ctx); GPR_TIMER_END("grpc_completion_queue_create_internal", 0); @@ -307,18 +441,28 @@ grpc_completion_queue *grpc_completion_queue_create_internal( } grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { - return cc->completion_type; + return cc->vtable->cq_completion_type; +} + +int grpc_get_cq_poll_num(grpc_completion_queue *cc) { + int cur_num_polls; + gpr_mu_lock(cc->data.mu); + cur_num_polls = cc->data.num_polls; + gpr_mu_unlock(cc->data.mu); + return cur_num_polls; } #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { + cq_data *cqd = &cc->data; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc, - (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason); + (int)cqd->owning_refs.count, (int)cqd->owning_refs.count + 1, reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { + cq_data *cqd = &cc->data; #endif - gpr_ref(&cc->owning_refs); + gpr_ref(&cqd->owning_refs); } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, @@ -328,63 +472,95 @@ static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, } #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - const char *reason, const char *file, int line) { +void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, + const char *file, int line) { + cq_data *cqd = &cc->data; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc, - (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason); + (int)cqd->owning_refs.count, (int)cqd->owning_refs.count - 1, reason); #else void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc) { + cq_data *cqd = &cc->data; #endif - if (gpr_unref(&cc->owning_refs)) { - GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); + if (gpr_unref(&cqd->owning_refs)) { + GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc)); + cq_event_queue_destroy(&cqd->queue); #ifndef NDEBUG - gpr_free(cc->outstanding_tags); + gpr_free(cqd->outstanding_tags); #endif gpr_free(cc); } } -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { +static void cq_begin_op(grpc_completion_queue *cc, void *tag) { + cq_data *cqd = &cc->data; #ifndef NDEBUG - gpr_mu_lock(cc->mu); - GPR_ASSERT(!cc->shutdown_called); - if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) { - cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity); - cc->outstanding_tags = - gpr_realloc(cc->outstanding_tags, sizeof(*cc->outstanding_tags) * - cc->outstanding_tag_capacity); + gpr_mu_lock(cqd->mu); + GPR_ASSERT(!cqd->shutdown_called); + if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) { + cqd->outstanding_tag_capacity = + GPR_MAX(4, 2 * cqd->outstanding_tag_capacity); + cqd->outstanding_tags = + gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) * + cqd->outstanding_tag_capacity); } - cc->outstanding_tags[cc->outstanding_tag_count++] = tag; - gpr_mu_unlock(cc->mu); + cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag; + gpr_mu_unlock(cqd->mu); #endif - gpr_ref(&cc->pending_events); + gpr_ref(&cqd->pending_events); +} + +void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { + cc->vtable->begin_op(cc, tag); } -/* Signal the end of an operation - if this is the last waiting-to-be-queued - event, then enter shutdown mode */ -/* Queue a GRPC_OP_COMPLETED operation */ -void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, - void *tag, grpc_error *error, - void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, - grpc_cq_completion *storage), - void *done_arg, grpc_cq_completion *storage) { - int shutdown; - int i; - grpc_pollset_worker *pluck_worker; #ifndef NDEBUG +static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) { + cq_data *cqd = &cc->data; int found = 0; + if (lock_cq) { + gpr_mu_lock(cqd->mu); + } + + for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) { + if (cqd->outstanding_tags[i] == tag) { + cqd->outstanding_tag_count--; + GPR_SWAP(void *, cqd->outstanding_tags[i], + cqd->outstanding_tags[cqd->outstanding_tag_count]); + found = 1; + break; + } + } + + if (lock_cq) { + gpr_mu_unlock(cqd->mu); + } + + GPR_ASSERT(found); +} +#else +static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {} #endif - GPR_TIMER_BEGIN("grpc_cq_end_op", 0); +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion + * type of GRPC_CQ_NEXT) */ +static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + GPR_TIMER_BEGIN("cq_end_op_for_next", 0); + if (GRPC_TRACER_ON(grpc_api_trace) || (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " - "done_arg=%p, storage=%p)", + "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "done=%p, done_arg=%p, storage=%p)", 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { @@ -392,41 +568,100 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, } } + cq_data *cqd = &cc->data; + int is_success = (error == GRPC_ERROR_NONE); + storage->tag = tag; storage->done = done; storage->done_arg = done_arg; - storage->next = ((uintptr_t)&cc->completed_head) | - ((uintptr_t)(error == GRPC_ERROR_NONE)); + storage->next = (uintptr_t)(is_success); - gpr_mu_lock(cc->mu); -#ifndef NDEBUG - for (i = 0; i < (int)cc->outstanding_tag_count; i++) { - if (cc->outstanding_tags[i] == tag) { - cc->outstanding_tag_count--; - GPR_SWAP(void *, cc->outstanding_tags[i], - cc->outstanding_tags[cc->outstanding_tag_count]); - found = 1; - break; + cq_check_tag(cc, tag, true); /* Used in debug builds only */ + + /* Add the completion to the queue */ + cq_event_queue_push(&cqd->queue, storage); + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + + int shutdown = gpr_unref(&cqd->pending_events); + + gpr_mu_lock(cqd->mu); + if (!shutdown) { + grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); + gpr_mu_unlock(cqd->mu); + + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + + GRPC_ERROR_UNREF(kick_error); } + } else { + cq_finish_shutdown(exec_ctx, cc); + gpr_mu_unlock(cqd->mu); } - GPR_ASSERT(found); -#endif - shutdown = gpr_unref(&cc->pending_events); - gpr_atm_no_barrier_fetch_add(&cc->things_queued_ever, 1); + + GPR_TIMER_END("cq_end_op_for_next", 0); + + GRPC_ERROR_UNREF(error); +} + +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion + * type of GRPC_CQ_PLUCK) */ +static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc, void *tag, + grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, + void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + cq_data *cqd = &cc->data; + int is_success = (error == GRPC_ERROR_NONE); + + GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); + + if (GRPC_TRACER_ON(grpc_api_trace) || + (GRPC_TRACER_ON(grpc_trace_operation_failures) && + error != GRPC_ERROR_NONE)) { + const char *errmsg = grpc_error_string(error); + GRPC_API_TRACE( + "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "done=%p, done_arg=%p, storage=%p)", + 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + if (GRPC_TRACER_ON(grpc_trace_operation_failures) && + error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); + } + } + + storage->tag = tag; + storage->done = done; + storage->done_arg = done_arg; + storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); + + gpr_mu_lock(cqd->mu); + cq_check_tag(cc, tag, false); /* Used in debug builds only */ + + /* Add to the list of completions */ + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + cqd->completed_tail->next = + ((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next); + cqd->completed_tail = storage; + + int shutdown = gpr_unref(&cqd->pending_events); if (!shutdown) { - cc->completed_tail->next = - ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); - cc->completed_tail = storage; - pluck_worker = NULL; - for (i = 0; i < cc->num_pluckers; i++) { - if (cc->pluckers[i].tag == tag) { - pluck_worker = *cc->pluckers[i].worker; + grpc_pollset_worker *pluck_worker = NULL; + for (int i = 0; i < cqd->num_pluckers; i++) { + if (cqd->pluckers[i].tag == tag) { + pluck_worker = *cqd->pluckers[i].worker; break; } } + grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); - gpr_mu_unlock(cc->mu); + + gpr_mu_unlock(cqd->mu); + if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); gpr_log(GPR_ERROR, "Kick failed: %s", msg); @@ -434,22 +669,23 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GRPC_ERROR_UNREF(kick_error); } } else { - cc->completed_tail->next = - ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); - cc->completed_tail = storage; - GPR_ASSERT(!cc->shutdown); - GPR_ASSERT(cc->shutdown_called); - cc->shutdown = 1; - cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); - gpr_mu_unlock(cc->mu); + cq_finish_shutdown(exec_ctx, cc); + gpr_mu_unlock(cqd->mu); } - GPR_TIMER_END("grpc_cq_end_op", 0); + GPR_TIMER_END("cq_end_op_for_pluck", 0); GRPC_ERROR_UNREF(error); } +void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, + void *tag, grpc_error *error, + void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, + grpc_cq_completion *storage), + void *done_arg, grpc_cq_completion *storage) { + cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage); +} + typedef struct { gpr_atm last_seen_things_queued_ever; grpc_completion_queue *cq; @@ -462,23 +698,24 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; + cq_data *cqd = &cq->data; GPR_ASSERT(a->stolen_completion == NULL); + gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); + if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); - if (cq->completed_tail != &cq->completed_head) { - a->stolen_completion = (grpc_cq_completion *)cq->completed_head.next; - cq->completed_head.next = a->stolen_completion->next & ~(uintptr_t)1; - if (a->stolen_completion == cq->completed_tail) { - cq->completed_tail = &cq->completed_head; - } - gpr_mu_unlock(cq->mu); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); + + /* Pop a cq_completion from the queue. Returns NULL if the queue is empty + * might return NULL in some cases even if the queue is not empty; but that + * is ok and doesn't affect correctness. Might effect the tail latencies a + * bit) */ + a->stolen_completion = cq_event_queue_pop(&cqd->queue); + if (a->stolen_completion != NULL) { return true; } - gpr_mu_unlock(cq->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; @@ -488,16 +725,18 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { static void dump_pending_tags(grpc_completion_queue *cc) { if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return; + cq_data *cqd = &cc->data; + gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); - gpr_mu_lock(cc->mu); - for (size_t i = 0; i < cc->outstanding_tag_count; i++) { + gpr_mu_lock(cqd->mu); + for (size_t i = 0; i < cqd->outstanding_tag_count; i++) { char *s; - gpr_asprintf(&s, " %p", cc->outstanding_tags[i]); + gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]); gpr_strvec_add(&v, s); } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); @@ -507,17 +746,11 @@ static void dump_pending_tags(grpc_completion_queue *cc) { static void dump_pending_tags(grpc_completion_queue *cc) {} #endif -grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, - gpr_timespec deadline, void *reserved) { +static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, + void *reserved) { grpc_event ret; gpr_timespec now; - - if (cc->completion_type != GRPC_CQ_NEXT) { - gpr_log(GPR_ERROR, - "grpc_completion_queue_next() cannot be called on this completion " - "queue since its completion type is not GRPC_CQ_NEXT"); - abort(); - } + cq_data *cqd = &cc->data; GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -536,10 +769,10 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); - gpr_mu_lock(cc->mu); + cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cc->things_queued_ever), + gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cc, .deadline = deadline, .stolen_completion = NULL, @@ -547,9 +780,11 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, .first_loop = true}; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, cq_is_next_finished, &is_finished_arg); + for (;;) { + gpr_timespec iteration_deadline = deadline; + if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cc->mu); grpc_cq_completion *c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -558,37 +793,59 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, c->done(&exec_ctx, c->done_arg, c); break; } - if (cc->completed_tail != &cc->completed_head) { - grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; - cc->completed_head.next = c->next & ~(uintptr_t)1; - if (c == cc->completed_tail) { - cc->completed_tail = &cc->completed_head; - } - gpr_mu_unlock(cc->mu); + + grpc_cq_completion *c = cq_event_queue_pop(&cqd->queue); + + if (c != NULL) { ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; c->done(&exec_ctx, c->done_arg, c); break; + } else { + /* If c == NULL it means either the queue is empty OR in an transient + inconsistent state. If it is the latter, we shold do a 0-timeout poll + so that the thread comes back quickly from poll to make a second + attempt at popping. Not doing this can potentially deadlock this thread + forever (if the deadline is infinity) */ + if (cq_event_queue_num_items(&cqd->queue) > 0) { + iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); + } } - if (cc->shutdown) { - gpr_mu_unlock(cc->mu); + + if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + /* Before returning, check if the queue has any items left over (since + gpr_mpscq_pop() can sometimes return NULL even if the queue is not + empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ + if (cq_event_queue_num_items(&cqd->queue) > 0) { + /* Go to the beginning of the loop. No point doing a poll because + (cc->shutdown == true) is only possible when there is no pending work + (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion + events are already queued on this cq */ + continue; + } + memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } + now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { - gpr_mu_unlock(cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cc); break; } + + /* The main polling work happens in grpc_pollset_work */ + gpr_mu_lock(cqd->mu); + cqd->num_polls++; grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), - NULL, now, deadline); + NULL, now, iteration_deadline); + gpr_mu_unlock(cqd->mu); + if (err != GRPC_ERROR_NONE) { - gpr_mu_unlock(cc->mu); const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); @@ -600,6 +857,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, } is_finished_arg.first_loop = false; } + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next"); grpc_exec_ctx_finish(&exec_ctx); @@ -610,24 +868,30 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, return ret; } +grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, + gpr_timespec deadline, void *reserved) { + return cc->vtable->next(cc, deadline, reserved); +} + static int add_plucker(grpc_completion_queue *cc, void *tag, grpc_pollset_worker **worker) { - if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { + cq_data *cqd = &cc->data; + if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } - cc->pluckers[cc->num_pluckers].tag = tag; - cc->pluckers[cc->num_pluckers].worker = worker; - cc->num_pluckers++; + cqd->pluckers[cqd->num_pluckers].tag = tag; + cqd->pluckers[cqd->num_pluckers].worker = worker; + cqd->num_pluckers++; return 1; } static void del_plucker(grpc_completion_queue *cc, void *tag, grpc_pollset_worker **worker) { - int i; - for (i = 0; i < cc->num_pluckers; i++) { - if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) { - cc->num_pluckers--; - GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]); + cq_data *cqd = &cc->data; + for (int i = 0; i < cqd->num_pluckers; i++) { + if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { + cqd->num_pluckers--; + GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]); return; } } @@ -637,51 +901,47 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; + cq_data *cqd = &cq->data; + GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cq->mu); + gpr_mu_lock(cqd->mu); a->last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cq->things_queued_ever); + gpr_atm_no_barrier_load(&cqd->things_queued_ever); grpc_cq_completion *c; - grpc_cq_completion *prev = &cq->completed_head; + grpc_cq_completion *prev = &cqd->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != - &cq->completed_head) { + &cqd->completed_head) { if (c->tag == a->tag) { prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); - if (c == cq->completed_tail) { - cq->completed_tail = prev; + if (c == cqd->completed_tail) { + cqd->completed_tail = prev; } - gpr_mu_unlock(cq->mu); + gpr_mu_unlock(cqd->mu); a->stolen_completion = c; return true; } prev = c; } - gpr_mu_unlock(cq->mu); + gpr_mu_unlock(cqd->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } -grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, - gpr_timespec deadline, void *reserved) { +static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; + cq_data *cqd = &cc->data; GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); - if (cc->completion_type != GRPC_CQ_PLUCK) { - gpr_log(GPR_ERROR, - "grpc_completion_queue_pluck() cannot be called on this completion " - "queue since its completion type is not GRPC_CQ_PLUCK"); - abort(); - } - if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" @@ -699,10 +959,10 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(cc->mu); + gpr_mu_lock(cqd->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = - gpr_atm_no_barrier_load(&cc->things_queued_ever), + gpr_atm_no_barrier_load(&cqd->things_queued_ever), .cq = cc, .deadline = deadline, .stolen_completion = NULL, @@ -712,7 +972,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -721,15 +981,15 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, c->done(&exec_ctx, c->done_arg, c); break; } - prev = &cc->completed_head; + prev = &cqd->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != - &cc->completed_head) { + &cqd->completed_head) { if (c->tag == tag) { prev->next = (prev->next & (uintptr_t)1) | (c->next & ~(uintptr_t)1); - if (c == cc->completed_tail) { - cc->completed_tail = prev; + if (c == cqd->completed_tail) { + cqd->completed_tail = prev; } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -738,8 +998,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, } prev = c; } - if (cc->shutdown) { - gpr_mu_unlock(cc->mu); + if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + gpr_mu_unlock(cqd->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; @@ -749,7 +1009,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; @@ -759,19 +1019,21 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; dump_pending_tags(cc); break; } + + cqd->num_polls++; grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, deadline); if (err != GRPC_ERROR_NONE) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); const char *msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg); GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); @@ -793,26 +1055,48 @@ done: return ret; } +grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline, void *reserved) { + return cc->vtable->pluck(cc, tag, deadline, reserved); +} + +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling + this function */ +static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cc) { + cq_data *cqd = &cc->data; + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); + gpr_atm_no_barrier_store(&cqd->shutdown, 1); + + cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cqd->pollset_shutdown_done); +} + /* Shutdown simply drops a ref that we reserved at creation time; if we drop to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - gpr_mu_lock(cc->mu); - if (cc->shutdown_called) { - gpr_mu_unlock(cc->mu); + cq_data *cqd = &cc->data; + + gpr_mu_lock(cqd->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cqd->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } - cc->shutdown_called = 1; - if (gpr_unref(&cc->pending_events)) { - GPR_ASSERT(!cc->shutdown); - cc->shutdown = 1; - cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cqd->shutdown_called = 1; + if (gpr_unref(&cqd->pending_events)) { + cq_finish_shutdown(&exec_ctx, cc); } - gpr_mu_unlock(cc->mu); + gpr_mu_unlock(cqd->mu); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -821,6 +1105,13 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); grpc_completion_queue_shutdown(cc); + + /* TODO (sreek): This should not ideally be here. Refactor it into the + * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ + if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) { + GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0); + } + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); grpc_exec_ctx_finish(&exec_ctx); @@ -835,22 +1126,12 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { return CQ_FROM_POLLSET(ps); } -void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) { - /* TODO: sreek - use cc->polling_type field here and add a validation check - (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose - polling_type is set to GRPC_CQ_NON_LISTENING */ - cc->is_non_listening_server_cq = 1; +void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { + cc->data.is_server_cq = 1; } -bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { - /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */ - return (cc->is_non_listening_server_cq == 1); -} - -void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } - bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { - return cc->is_server_cq; + return cc->data.is_server_cq; } bool grpc_cq_can_listen(grpc_completion_queue *cc) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 8d9ce2ec02..7963ea75e7 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -49,7 +49,13 @@ extern grpc_tracer_flag grpc_trace_operation_failures; extern grpc_tracer_flag grpc_trace_pending_tags; #endif +#ifdef __cplusplus +extern "C" { +#endif + typedef struct grpc_cq_completion { + gpr_mpscq_node node; + /** user supplied tag */ void *tag; /** done callback - called when this queue element is no longer @@ -101,7 +107,13 @@ bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); +int grpc_get_cq_poll_num(grpc_completion_queue *cc); + grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 560229e892..7e4ae421a0 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -47,7 +47,8 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/stack_lockfree.h" +#include "src/core/lib/support/mpscq.h" +#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -76,6 +77,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); typedef struct requested_call { + gpr_mpscq_node request_link; /* must be first */ requested_call_type type; size_t cq_idx; void *tag; @@ -175,7 +177,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_stack_lockfree **requests_per_cq; + gpr_locked_mpscq *requests_per_cq; }; struct registered_method { @@ -220,11 +222,6 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; - /** free list of available requested_calls_per_cq indices */ - gpr_stack_lockfree **request_freelist_per_cq; - /** requested call backing data */ - requested_call **requested_calls_per_cq; - int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -324,21 +321,20 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher *rm, size_t entries, - grpc_server *server) { +static void request_matcher_init(request_matcher *rm, grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; rm->requests_per_cq = gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { - rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); + gpr_locked_mpscq_init(&rm->requests_per_cq[i]); } } static void request_matcher_destroy(request_matcher *rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); - gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); + GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); + gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); } gpr_free(rm->requests_per_cq); } @@ -368,13 +364,17 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm, grpc_error *error) { - int request_id; + requested_call *rc; for (size_t i = 0; i < server->cq_count; i++) { - while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != - -1) { - fail_call(exec_ctx, server, i, - &server->requested_calls_per_cq[i][request_id], - GRPC_ERROR_REF(error)); + /* Here we know: + 1. no requests are being added (since the server is shut down) + 2. no other threads are pulling (since the shut down process is single + threaded) + So, we can ignore the queue lock and just pop, with the guarantee that a + NULL returned here truly means that the queue is empty */ + while ((rc = (requested_call *)gpr_mpscq_pop( + &rm->requests_per_cq[i].queue)) != NULL) { + fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -409,13 +409,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); - if (server->started) { - gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); - gpr_free(server->requested_calls_per_cq[i]); - } } - gpr_free(server->request_freelist_per_cq); - gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -473,21 +467,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, grpc_cq_completion *c) { - requested_call *rc = req; - grpc_server *server = rc->server; - - if (rc >= server->requested_calls_per_cq[rc->cq_idx] && - rc < server->requested_calls_per_cq[rc->cq_idx] + - server->max_requested_calls_per_cq) { - GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); - gpr_stack_lockfree_push( - server->request_freelist_per_cq[rc->cq_idx], - (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); - } else { - gpr_free(req); - } - - server_unref(exec_ctx, server); + gpr_free(req); } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -516,10 +496,6 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_UNREACHABLE_CODE(return ); } - grpc_call_element *elem = - grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - channel_data *chand = elem->channel_data; - server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); } @@ -547,15 +523,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, for (size_t i = 0; i < server->cq_count; i++) { size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); - if (request_id == -1) { + requested_call *rc = + (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); + if (rc == NULL) { continue; } else { gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls_per_cq[cq_idx][request_id]); + publish_call(exec_ctx, server, calld, cq_idx, rc); return; /* early out */ } } @@ -1029,8 +1005,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; - /* TODO(ctiller): expose a channel_arg for this */ - server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1103,29 +1077,15 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); - server->request_freelist_per_cq = - gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); - server->requested_calls_per_cq = - gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } - server->request_freelist_per_cq[i] = - gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); - for (int j = 0; j < server->max_requested_calls_per_cq; j++) { - gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); - } - server->requested_calls_per_cq[i] = - gpr_malloc((size_t)server->max_requested_calls_per_cq * - sizeof(*server->requested_calls_per_cq[i])); } - request_matcher_init(&server->unregistered_request_matcher, - (size_t)server->max_requested_calls_per_cq, server); + request_matcher_init(&server->unregistered_request_matcher, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, - (size_t)server->max_requested_calls_per_cq, server); + request_matcher_init(&rm->request_matcher, server); } server_ref(server); @@ -1379,21 +1339,11 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; - int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } - request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); - if (request_id == -1) { - /* out of request ids: just fail this one */ - fail_call(exec_ctx, server, cq_idx, rc, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), - GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); - return GRPC_CALL_OK; - } switch (rc->type) { case BATCH_CALL: rm = &server->unregistered_request_matcher; @@ -1402,15 +1352,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - server->requested_calls_per_cq[cq_idx][request_id] = *rc; - gpr_free(rc); - if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { + if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); - if (request_id == -1) break; + rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); + if (rc == NULL) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1426,8 +1374,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls_per_cq[cq_idx][request_id]); + publish_call(exec_ctx, server, calld, cq_idx, rc); } gpr_mu_lock(&server->mu_call); } @@ -1534,7 +1481,6 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); - server_ref(server); grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); } diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c index 25bda7a262..510cf5d5a0 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.c +++ b/src/core/plugin_registry/grpc_plugin_registry.c @@ -61,6 +61,8 @@ extern void grpc_max_age_filter_init(void); extern void grpc_max_age_filter_shutdown(void); extern void grpc_message_size_filter_init(void); extern void grpc_message_size_filter_shutdown(void); +extern void grpc_workaround_cronet_compression_filter_init(void); +extern void grpc_workaround_cronet_compression_filter_shutdown(void); void grpc_register_built_in_plugins(void) { grpc_register_plugin(grpc_http_filters_init, @@ -91,4 +93,6 @@ void grpc_register_built_in_plugins(void) { grpc_max_age_filter_shutdown); grpc_register_plugin(grpc_message_size_filter_init, grpc_message_size_filter_shutdown); + grpc_register_plugin(grpc_workaround_cronet_compression_filter_init, + grpc_workaround_cronet_compression_filter_shutdown); } diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c index 05d4771bce..e5eb68f934 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c @@ -61,6 +61,8 @@ extern void grpc_max_age_filter_init(void); extern void grpc_max_age_filter_shutdown(void); extern void grpc_message_size_filter_init(void); extern void grpc_message_size_filter_shutdown(void); +extern void grpc_workaround_cronet_compression_filter_init(void); +extern void grpc_workaround_cronet_compression_filter_shutdown(void); void grpc_register_built_in_plugins(void) { grpc_register_plugin(grpc_http_filters_init, @@ -91,4 +93,6 @@ void grpc_register_built_in_plugins(void) { grpc_max_age_filter_shutdown); grpc_register_plugin(grpc_message_size_filter_init, grpc_message_size_filter_shutdown); + grpc_register_plugin(grpc_workaround_cronet_compression_filter_init, + grpc_workaround_cronet_compression_filter_shutdown); } diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 2f89aa3dce..7c93bb8683 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -686,6 +686,7 @@ bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, StringFromCopiedSlice(call_details_.method); static_cast<GenericServerContext*>(context_)->host_ = StringFromCopiedSlice(call_details_.host); + context_->deadline_ = call_details_.deadline; } grpc_slice_unref(call_details_.method); grpc_slice_unref(call_details_.host); diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 7e0f3f053d..c0865001a8 100755 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -19,27 +19,27 @@ <ItemGroup> <EmbeddedResource Include="..\..\..\etc\roots.pem" /> - <Content Include="..\nativelibs\macosx_x64\libgrpc_csharp_ext.dylib"> + <Content Include="..\nativelibs\csharp_ext_macos_x64\libgrpc_csharp_ext.dylib"> <PackagePath>runtimes/osx/native/libgrpc_csharp_ext.x64.dylib</PackagePath> <Pack>true</Pack> </Content> - <Content Include="..\nativelibs\macosx_x86\libgrpc_csharp_ext.dylib"> + <Content Include="..\nativelibs\csharp_ext_macos_x86\libgrpc_csharp_ext.dylib"> <PackagePath>runtimes/osx/native/libgrpc_csharp_ext.x86.dylib</PackagePath> <Pack>true</Pack> </Content> - <Content Include="..\nativelibs\linux_x64\libgrpc_csharp_ext.so"> + <Content Include="..\nativelibs\csharp_ext_linux_x64\libgrpc_csharp_ext.so"> <PackagePath>runtimes/linux/native/libgrpc_csharp_ext.x64.so</PackagePath> <Pack>true</Pack> </Content> - <Content Include="..\nativelibs\linux_x86\libgrpc_csharp_ext.so"> + <Content Include="..\nativelibs\csharp_ext_linux_x86\libgrpc_csharp_ext.so"> <PackagePath>runtimes/linux/native/libgrpc_csharp_ext.x86.so</PackagePath> <Pack>true</Pack> </Content> - <Content Include="..\nativelibs\windows_x64\grpc_csharp_ext.dll"> + <Content Include="..\nativelibs\csharp_ext_windows_x64\grpc_csharp_ext.dll"> <PackagePath>runtimes/win/native/grpc_csharp_ext.x64.dll</PackagePath> <Pack>true</Pack> </Content> - <Content Include="..\nativelibs\windows_x86\grpc_csharp_ext.dll"> + <Content Include="..\nativelibs\csharp_ext_windows_x86\grpc_csharp_ext.dll"> <PackagePath>runtimes/win/native/grpc_csharp_ext.x86.dll</PackagePath> <Pack>true</Pack> </Content> diff --git a/src/csharp/Grpc.Tools.nuspec b/src/csharp/Grpc.Tools.nuspec index ba4e1d674c..0cae5572fd 100644 --- a/src/csharp/Grpc.Tools.nuspec +++ b/src/csharp/Grpc.Tools.nuspec @@ -17,17 +17,17 @@ </metadata> <files> <!-- forward slashes in src path enable building on Linux --> - <file src="protoc_plugins/windows_x86/protoc.exe" target="tools/windows_x86/protoc.exe" /> - <file src="protoc_plugins/windows_x86/grpc_csharp_plugin.exe" target="tools/windows_x86/grpc_csharp_plugin.exe" /> - <file src="protoc_plugins/windows_x64/protoc.exe" target="tools/windows_x64/protoc.exe" /> - <file src="protoc_plugins/windows_x64/grpc_csharp_plugin.exe" target="tools/windows_x64/grpc_csharp_plugin.exe" /> - <file src="protoc_plugins/linux_x86/protoc" target="tools/linux_x86/protoc" /> - <file src="protoc_plugins/linux_x86/grpc_csharp_plugin" target="tools/linux_x86/grpc_csharp_plugin" /> - <file src="protoc_plugins/linux_x64/protoc" target="tools/linux_x64/protoc" /> - <file src="protoc_plugins/linux_x64/grpc_csharp_plugin" target="tools/linux_x64/grpc_csharp_plugin" /> - <file src="protoc_plugins/macosx_x86/protoc" target="tools/macosx_x86/protoc" /> - <file src="protoc_plugins/macosx_x86/grpc_csharp_plugin" target="tools/macosx_x86/grpc_csharp_plugin" /> - <file src="protoc_plugins/macosx_x64/protoc" target="tools/macosx_x64/protoc" /> - <file src="protoc_plugins/macosx_x64/grpc_csharp_plugin" target="tools/macosx_x64/grpc_csharp_plugin" /> + <file src="protoc_plugins/protoc_windows_x86/protoc.exe" target="tools/windows_x86/protoc.exe" /> + <file src="protoc_plugins/protoc_windows_x86/grpc_csharp_plugin.exe" target="tools/windows_x86/grpc_csharp_plugin.exe" /> + <file src="protoc_plugins/protoc_windows_x64/protoc.exe" target="tools/windows_x64/protoc.exe" /> + <file src="protoc_plugins/protoc_windows_x64/grpc_csharp_plugin.exe" target="tools/windows_x64/grpc_csharp_plugin.exe" /> + <file src="protoc_plugins/protoc_linux_x86/protoc" target="tools/linux_x86/protoc" /> + <file src="protoc_plugins/protoc_linux_x86/grpc_csharp_plugin" target="tools/linux_x86/grpc_csharp_plugin" /> + <file src="protoc_plugins/protoc_linux_x64/protoc" target="tools/linux_x64/protoc" /> + <file src="protoc_plugins/protoc_linux_x64/grpc_csharp_plugin" target="tools/linux_x64/grpc_csharp_plugin" /> + <file src="protoc_plugins/protoc_macos_x86/protoc" target="tools/macosx_x86/protoc" /> + <file src="protoc_plugins/protoc_macos_x86/grpc_csharp_plugin" target="tools/macosx_x86/grpc_csharp_plugin" /> + <file src="protoc_plugins/protoc_macos_x64/protoc" target="tools/macosx_x64/protoc" /> + <file src="protoc_plugins/protoc_macos_x64/grpc_csharp_plugin" target="tools/macosx_x64/grpc_csharp_plugin" /> </files> </package> diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat index 673642e3d8..aa8a8d3b17 100755 --- a/src/csharp/build_packages_dotnetcli.bat +++ b/src/csharp/build_packages_dotnetcli.bat @@ -36,29 +36,20 @@ set DOTNET=dotnet set -ex -mkdir -p ..\..\artifacts\ +mkdir ..\..\artifacts @rem Collect the artifacts built by the previous build step if running on Jenkins -@rem TODO(jtattermusch): is there a better way to do this? -xcopy /Y /I ..\..\architecture=x86,language=csharp,platform=windows\artifacts\* nativelibs\windows_x86\ -xcopy /Y /I ..\..\architecture=x64,language=csharp,platform=windows\artifacts\* nativelibs\windows_x64\ -xcopy /Y /I ..\..\architecture=x86,language=csharp,platform=linux\artifacts\* nativelibs\linux_x86\ -xcopy /Y /I ..\..\architecture=x64,language=csharp,platform=linux\artifacts\* nativelibs\linux_x64\ -xcopy /Y /I ..\..\architecture=x86,language=csharp,platform=macos\artifacts\* nativelibs\macosx_x86\ -xcopy /Y /I ..\..\architecture=x64,language=csharp,platform=macos\artifacts\* nativelibs\macosx_x64\ +mkdir nativelibs +powershell -Command "cp -r ..\..\platform=*\artifacts\csharp_ext_* nativelibs" @rem Collect protoc artifacts built by the previous build step -xcopy /Y /I ..\..\architecture=x86,language=protoc,platform=windows\artifacts\* protoc_plugins\windows_x86\ -xcopy /Y /I ..\..\architecture=x64,language=protoc,platform=windows\artifacts\* protoc_plugins\windows_x64\ -xcopy /Y /I ..\..\architecture=x86,language=protoc,platform=linux\artifacts\* protoc_plugins\linux_x86\ -xcopy /Y /I ..\..\architecture=x64,language=protoc,platform=linux\artifacts\* protoc_plugins\linux_x64\ -xcopy /Y /I ..\..\architecture=x86,language=protoc,platform=macos\artifacts\* protoc_plugins\macosx_x86\ -xcopy /Y /I ..\..\architecture=x64,language=protoc,platform=macos\artifacts\* protoc_plugins\macosx_x64\ +mkdir protoc_plugins +powershell -Command "cp -r ..\..\platform=*\artifacts\protoc_* protoc_plugins" %DOTNET% restore Grpc.sln || goto :error @rem To be able to build, we also need to put grpc_csharp_ext to its normal location -xcopy /Y /I nativelibs\windows_x64\grpc_csharp_ext.dll ..\..\cmake\build\x64\Release\ +xcopy /Y /I nativelibs\csharp_ext_windows_x64\grpc_csharp_ext.dll ..\..\cmake\build\x64\Release\ %DOTNET% pack --configuration Release Grpc.Core --output ..\..\..\artifacts || goto :error %DOTNET% pack --configuration Release Grpc.Core.Testing --output ..\..\..\artifacts || goto :error diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh index ee923e3d87..d33923845c 100755 --- a/src/csharp/build_packages_dotnetcli.sh +++ b/src/csharp/build_packages_dotnetcli.sh @@ -34,35 +34,19 @@ cd $(dirname $0) mkdir -p ../../artifacts/ -mkdir -p nativelibs/windows_x86 nativelibs/windows_x64 \ - nativelibs/linux_x86 nativelibs/linux_x64 \ - nativelibs/macosx_x86 nativelibs/macosx_x64 - -mkdir -p protoc_plugins/windows_x86 protoc_plugins/windows_x64 \ - protoc_plugins/linux_x86 protoc_plugins/linux_x64 \ - protoc_plugins/macosx_x86 protoc_plugins/macosx_x64 - -# Collect the artifacts built by the previous build step if running on Jenkins -cp $EXTERNAL_GIT_ROOT/architecture=x86,language=csharp,platform=windows/artifacts/* nativelibs/windows_x86 || true -cp $EXTERNAL_GIT_ROOT/architecture=x64,language=csharp,platform=windows/artifacts/* nativelibs/windows_x64 || true -cp $EXTERNAL_GIT_ROOT/architecture=x86,language=csharp,platform=linux/artifacts/* nativelibs/linux_x86 || true -cp $EXTERNAL_GIT_ROOT/architecture=x64,language=csharp,platform=linux/artifacts/* nativelibs/linux_x64 || true -cp $EXTERNAL_GIT_ROOT/architecture=x86,language=csharp,platform=macos/artifacts/* nativelibs/macosx_x86 || true -cp $EXTERNAL_GIT_ROOT/architecture=x64,language=csharp,platform=macos/artifacts/* nativelibs/macosx_x64 || true +# Collect the artifacts built by the previous build step +mkdir -p nativelibs +cp -r $EXTERNAL_GIT_ROOT/platform={windows,linux,macos}/artifacts/csharp_ext_* nativelibs || true # Collect protoc artifacts built by the previous build step -cp $EXTERNAL_GIT_ROOT/architecture=x86,language=protoc,platform=windows/artifacts/* protoc_plugins/windows_x86 || true -cp $EXTERNAL_GIT_ROOT/architecture=x64,language=protoc,platform=windows/artifacts/* protoc_plugins/windows_x64 || true -cp $EXTERNAL_GIT_ROOT/architecture=x86,language=protoc,platform=linux/artifacts/* protoc_plugins/linux_x86 || true -cp $EXTERNAL_GIT_ROOT/architecture=x64,language=protoc,platform=linux/artifacts/* protoc_plugins/linux_x64 || true -cp $EXTERNAL_GIT_ROOT/architecture=x86,language=protoc,platform=macos/artifacts/* protoc_plugins/macosx_x86 || true -cp $EXTERNAL_GIT_ROOT/architecture=x64,language=protoc,platform=macos/artifacts/* protoc_plugins/macosx_x64 || true +mkdir -p protoc_plugins +cp -r $EXTERNAL_GIT_ROOT/platform={windows,linux,macos}/artifacts/protoc_* protoc_plugins || true dotnet restore Grpc.sln # To be able to build, we also need to put grpc_csharp_ext to its normal location mkdir -p ../../libs/opt -cp nativelibs/linux_x64/libgrpc_csharp_ext.so ../../libs/opt +cp nativelibs/csharp_ext_linux_x64/libgrpc_csharp_ext.so ../../libs/opt dotnet pack --configuration Release Grpc.Core --output ../../../artifacts dotnet pack --configuration Release Grpc.Core.Testing --output ../../../artifacts diff --git a/src/proto/grpc/testing/BUILD b/src/proto/grpc/testing/BUILD index 805988c337..5f1c005faf 100644 --- a/src/proto/grpc/testing/BUILD +++ b/src/proto/grpc/testing/BUILD @@ -42,8 +42,11 @@ grpc_proto_library( grpc_proto_library( name = "control_proto", srcs = ["control.proto"], - deps = ["payloads_proto", "stats_proto"], has_services = False, + deps = [ + "payloads_proto", + "stats_proto", + ], ) grpc_proto_library( @@ -101,5 +104,8 @@ grpc_proto_library( grpc_proto_library( name = "test_proto", srcs = ["test.proto"], - deps = ["empty_proto", "messages_proto"], + deps = [ + "empty_proto", + "messages_proto", + ], ) diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 02b156d0cd..1f4569e278 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -244,6 +244,10 @@ message ScenarioResultSummary // Number of requests that succeeded/failed double successful_requests_per_second = 13; double failed_requests_per_second = 14; + + // Number of polls called inside completion queue per request + double client_polls_per_request = 15; + double server_polls_per_request = 16; } // Results of a single benchmark scenario. diff --git a/src/proto/grpc/testing/stats.proto b/src/proto/grpc/testing/stats.proto index 80014161a1..e236cf159b 100644 --- a/src/proto/grpc/testing/stats.proto +++ b/src/proto/grpc/testing/stats.proto @@ -47,6 +47,9 @@ message ServerStats { // change in idle time of the server (data from proc/stat) uint64 idle_cpu_time = 5; + + // Number of polls called inside completion queue + uint64 cq_poll_count = 6; } // Histogram params based on grpc/support/histogram.c @@ -81,4 +84,7 @@ message ClientStats { // Number of failed requests (one row per status code seen) repeated RequestResultCount request_results = 5; + + // Number of polls called inside completion queue + uint64 cq_poll_count = 6; } diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 20ced8a8c8..44ddafc8ef 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -318,6 +318,8 @@ CORE_SOURCE_FILES = [ 'src/core/ext/census/tracing.c', 'src/core/ext/filters/max_age/max_age_filter.c', 'src/core/ext/filters/message_size/message_size_filter.c', + 'src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c', + 'src/core/ext/filters/workarounds/workaround_utils.c', 'src/core/plugin_registry/grpc_plugin_registry.c', 'src/boringssl/err_data.c', 'third_party/boringssl/crypto/aes/aes.c', diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index 7ee5336a7d..658994d780 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -56,7 +56,8 @@ INSTALL_REQUIRES = ( 'grpcio>={version}'.format(version=grpc_version.VERSION), 'grpcio-tools>={version}'.format(version=grpc_version.VERSION), 'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION), - 'oauth2client>=1.4.7', 'protobuf>=3.3.0', 'six>=1.10',) + 'oauth2client>=1.4.7', 'protobuf>=3.3.0', 'six>=1.10', 'google-auth>=1.0.0', + 'requests>=2.14.2') COMMAND_CLASS = { # Run `preprocess` *before* doing any packaging! diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index 97f6843d3c..9be3ba5945 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -29,10 +29,11 @@ """The Python implementation of the GRPC interoperability test client.""" import argparse -from oauth2client import client as oauth2client_client +import os +from google import auth as google_auth +from google.auth import jwt as google_auth_jwt import grpc -from grpc.beta import implementations from src.proto.grpc.testing import test_pb2 from tests.interop import methods @@ -84,25 +85,24 @@ def _application_default_credentials(): def _stub(args): target = '{}:{}'.format(args.server_host, args.server_port) if args.test_case == 'oauth2_auth_token': - google_credentials = _application_default_credentials() - scoped_credentials = google_credentials.create_scoped( - [args.oauth_scope]) - access_token = scoped_credentials.get_access_token().access_token - call_credentials = grpc.access_token_call_credentials(access_token) + google_credentials, unused_project_id = google_auth.default( + scopes=[args.oauth_scope]) + google_credentials.refresh(google_auth.transport.requests.Request()) + call_credentials = grpc.access_token_call_credentials( + google_credentials.token) elif args.test_case == 'compute_engine_creds': - google_credentials = _application_default_credentials() - scoped_credentials = google_credentials.create_scoped( - [args.oauth_scope]) - # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last - # remaining use of the Beta API. - call_credentials = implementations.google_call_credentials( - scoped_credentials) + google_credentials, unused_project_id = google_auth.default( + scopes=[args.oauth_scope]) + call_credentials = grpc.metadata_call_credentials( + google_auth.transport.grpc.AuthMetadataPlugin( + credentials=google_credentials, + request=google_auth.transport.requests.Request())) elif args.test_case == 'jwt_token_creds': - google_credentials = _application_default_credentials() - # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last - # remaining use of the Beta API. - call_credentials = implementations.google_call_credentials( - google_credentials) + google_credentials = google_auth_jwt.OnDemandCredentials.from_service_account_file( + os.environ[google_auth.environment_vars.CREDENTIALS]) + call_credentials = grpc.metadata_call_credentials( + google_auth.transport.grpc.AuthMetadataPlugin( + credentials=google_credentials, request=None)) else: call_credentials = None if args.use_tls: diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py index e1016f7c0d..354b51da25 100644 --- a/src/python/grpcio_tests/tests/interop/methods.py +++ b/src/python/grpcio_tests/tests/interop/methods.py @@ -33,8 +33,10 @@ import json import os import threading -from oauth2client import client as oauth2client_client - +from google import auth as google_auth +from google.auth import environment_vars as google_auth_environment_vars +from google.auth.transport import grpc as google_auth_transport_grpc +from google.auth.transport import requests as google_auth_transport_requests import grpc from grpc.beta import implementations @@ -401,8 +403,7 @@ def _compute_engine_creds(stub, args): def _oauth2_auth_token(stub, args): - json_key_filename = os.environ[ - oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] + json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] response = _large_unary_common_behavior(stub, True, True, None) if wanted_email != response.username: @@ -414,8 +415,7 @@ def _oauth2_auth_token(stub, args): def _jwt_token_creds(stub, args): - json_key_filename = os.environ[ - oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] + json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] response = _large_unary_common_behavior(stub, True, False, None) if wanted_email != response.username: @@ -424,15 +424,14 @@ def _jwt_token_creds(stub, args): def _per_rpc_creds(stub, args): - json_key_filename = os.environ[ - oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS] + json_key_filename = os.environ[google_auth_environment_vars.CREDENTIALS] wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] - credentials = oauth2client_client.GoogleCredentials.get_application_default() - scoped_credentials = credentials.create_scoped([args.oauth_scope]) - # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last - # remaining use of the Beta API. - call_credentials = implementations.google_call_credentials( - scoped_credentials) + google_credentials, unused_project_id = google_auth.default( + scopes=[args.oauth_scope]) + call_credentials = grpc.metadata_call_credentials( + google_auth_transport_grpc.AuthMetadataPlugin( + credentials=google_credentials, + request=google_auth_transport_requests.Request())) response = _large_unary_common_behavior(stub, True, False, call_credentials) if wanted_email != response.username: raise ValueError('expected username %s, got %s' % |