diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel.cc | 25 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel.h | 5 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel_index.cc | 17 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc | 16 | ||||
-rw-r--r-- | src/core/lib/iomgr/cfstream_handle.cc | 97 | ||||
-rw-r--r-- | src/core/lib/iomgr/cfstream_handle.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.cc | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.cc | 94 | ||||
-rw-r--r-- | src/core/lib/surface/channel_init.h | 5 | ||||
-rw-r--r-- | src/core/lib/surface/server.cc | 104 | ||||
-rw-r--r-- | src/core/lib/surface/version.cc | 2 | ||||
-rw-r--r-- | src/core/lib/transport/metadata.cc | 1 |
12 files changed, 186 insertions, 183 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 9077aa9753..dff213efc6 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -64,18 +64,6 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -namespace { -struct state_watcher { - grpc_closure closure; - grpc_subchannel* subchannel; - grpc_connectivity_state connectivity_state; - grpc_connectivity_state last_connectivity_state; - grpc_core::OrphanablePtr<grpc_core::HealthCheckClient> health_check_client; - grpc_closure health_check_closure; - grpc_connectivity_state health_state; -}; -} // namespace - typedef struct external_state_watcher { grpc_subchannel* subchannel; grpc_pollset_set* pollset_set; @@ -101,9 +89,6 @@ struct grpc_subchannel { keep the subchannel open */ gpr_atm ref_pair; - /** non-transport related channel filters */ - const grpc_channel_filter** filters; - size_t num_filters; /** channel arguments */ grpc_channel_args* args; @@ -384,7 +369,6 @@ static void subchannel_destroy(void* arg, grpc_error* error) { c->channelz_subchannel->MarkSubchannelDestroyed(); c->channelz_subchannel.reset(); } - gpr_free((void*)c->filters); c->health_check_service_name.reset(); grpc_channel_args_destroy(c->args); grpc_connectivity_state_destroy(&c->state_tracker); @@ -567,15 +551,6 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; grpc_connector_ref(c->connector); - c->num_filters = args->filter_count; - if (c->num_filters > 0) { - c->filters = static_cast<const grpc_channel_filter**>( - gpr_malloc(sizeof(grpc_channel_filter*) * c->num_filters)); - memcpy((void*)c->filters, args->filters, - sizeof(grpc_channel_filter*) * c->num_filters); - } else { - c->filters = nullptr; - } c->pollset_set = grpc_pollset_set_create(); grpc_resolved_address* addr = static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr))); diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 14f87f2c68..d0c0a672fa 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -189,11 +189,6 @@ grpc_call_stack* grpc_subchannel_call_get_call_stack( struct grpc_subchannel_args { /* When updating this struct, also update subchannel_index.c */ - /** Channel filters for this channel - wrapped factories will likely - want to mutate this */ - const grpc_channel_filter** filters; - /** The number of filters in the above array */ - size_t filter_count; /** Channel arguments to be supplied to the newly created channel */ const grpc_channel_args* args; }; diff --git a/src/core/ext/filters/client_channel/subchannel_index.cc b/src/core/ext/filters/client_channel/subchannel_index.cc index aa8441f17b..0ae7898c5a 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.cc +++ b/src/core/ext/filters/client_channel/subchannel_index.cc @@ -49,15 +49,6 @@ static grpc_subchannel_key* create_key( grpc_channel_args* (*copy_channel_args)(const grpc_channel_args* args)) { grpc_subchannel_key* k = static_cast<grpc_subchannel_key*>(gpr_malloc(sizeof(*k))); - k->args.filter_count = args->filter_count; - if (k->args.filter_count > 0) { - k->args.filters = static_cast<const grpc_channel_filter**>( - gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count)); - memcpy(reinterpret_cast<grpc_channel_filter*>(k->args.filters), - args->filters, sizeof(*k->args.filters) * k->args.filter_count); - } else { - k->args.filters = nullptr; - } k->args.args = copy_channel_args(args->args); return k; } @@ -75,18 +66,10 @@ int grpc_subchannel_key_compare(const grpc_subchannel_key* a, const grpc_subchannel_key* b) { // To pretend the keys are different, return a non-zero value. if (GPR_UNLIKELY(g_force_creation)) return 1; - int c = GPR_ICMP(a->args.filter_count, b->args.filter_count); - if (c != 0) return c; - if (a->args.filter_count > 0) { - c = memcmp(a->args.filters, b->args.filters, - a->args.filter_count * sizeof(*a->args.filters)); - if (c != 0) return c; - } return grpc_channel_args_compare(a->args.args, b->args.args); } void grpc_subchannel_key_destroy(grpc_subchannel_key* k) { - gpr_free(reinterpret_cast<grpc_channel_args*>(k->args.filters)); grpc_channel_args_destroy(const_cast<grpc_channel_args*>(k->args.args)); gpr_free(k); } diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc index 40a30e4a31..6e08d27b21 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.cc @@ -46,9 +46,21 @@ GRPCAPI grpc_channel* grpc_cronet_secure_channel_create( "grpc_create_cronet_transport: stream_engine = %p, target=%s", engine, target); + // Disable client authority filter when using Cronet + grpc_arg disable_client_authority_filter_arg; + disable_client_authority_filter_arg.key = + const_cast<char*>(GRPC_ARG_DISABLE_CLIENT_AUTHORITY_FILTER); + disable_client_authority_filter_arg.type = GRPC_ARG_INTEGER; + disable_client_authority_filter_arg.value.integer = 1; + grpc_channel_args* new_args = grpc_channel_args_copy_and_add( + args, &disable_client_authority_filter_arg, 1); + grpc_transport* ct = - grpc_create_cronet_transport(engine, target, args, reserved); + grpc_create_cronet_transport(engine, target, new_args, reserved); grpc_core::ExecCtx exec_ctx; - return grpc_channel_create(target, args, GRPC_CLIENT_DIRECT_CHANNEL, ct); + grpc_channel* channel = + grpc_channel_create(target, new_args, GRPC_CLIENT_DIRECT_CHANNEL, ct); + grpc_channel_args_destroy(new_args); + return channel; } diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc index 827fd24831..6cb9ca1a0d 100644 --- a/src/core/lib/iomgr/cfstream_handle.cc +++ b/src/core/lib/iomgr/cfstream_handle.cc @@ -52,62 +52,52 @@ CFStreamHandle* CFStreamHandle::CreateStreamHandle( void CFStreamHandle::ReadCallback(CFReadStreamRef stream, CFStreamEventType type, void* client_callback_info) { + grpc_core::ExecCtx exec_ctx; CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info); - CFSTREAM_HANDLE_REF(handle, "read callback"); - dispatch_async( - dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ - grpc_core::ExecCtx exec_ctx; - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle, - stream, type, client_callback_info); - } - switch (type) { - case kCFStreamEventOpenCompleted: - handle->open_event_.SetReady(); - break; - case kCFStreamEventHasBytesAvailable: - case kCFStreamEventEndEncountered: - handle->read_event_.SetReady(); - break; - case kCFStreamEventErrorOccurred: - handle->open_event_.SetReady(); - handle->read_event_.SetReady(); - break; - default: - GPR_UNREACHABLE_CODE(return ); - } - CFSTREAM_HANDLE_UNREF(handle, "read callback"); - }); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle, + stream, type, client_callback_info); + } + switch (type) { + case kCFStreamEventOpenCompleted: + handle->open_event_.SetReady(); + break; + case kCFStreamEventHasBytesAvailable: + case kCFStreamEventEndEncountered: + handle->read_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + handle->open_event_.SetReady(); + handle->read_event_.SetReady(); + break; + default: + GPR_UNREACHABLE_CODE(return ); + } } void CFStreamHandle::WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, void* clientCallBackInfo) { + grpc_core::ExecCtx exec_ctx; CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo); - CFSTREAM_HANDLE_REF(handle, "write callback"); - dispatch_async( - dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ - grpc_core::ExecCtx exec_ctx; - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle, - stream, type, clientCallBackInfo); - } - switch (type) { - case kCFStreamEventOpenCompleted: - handle->open_event_.SetReady(); - break; - case kCFStreamEventCanAcceptBytes: - case kCFStreamEventEndEncountered: - handle->write_event_.SetReady(); - break; - case kCFStreamEventErrorOccurred: - handle->open_event_.SetReady(); - handle->write_event_.SetReady(); - break; - default: - GPR_UNREACHABLE_CODE(return ); - } - CFSTREAM_HANDLE_UNREF(handle, "write callback"); - }); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle, + stream, type, clientCallBackInfo); + } + switch (type) { + case kCFStreamEventOpenCompleted: + handle->open_event_.SetReady(); + break; + case kCFStreamEventCanAcceptBytes: + case kCFStreamEventEndEncountered: + handle->write_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + handle->open_event_.SetReady(); + handle->write_event_.SetReady(); + break; + default: + GPR_UNREACHABLE_CODE(return ); + } } CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, @@ -116,6 +106,7 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, open_event_.InitEvent(); read_event_.InitEvent(); write_event_.InitEvent(); + dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL); CFStreamClientContext ctx = {0, static_cast<void*>(this), CFStreamHandle::Retain, CFStreamHandle::Release, nil}; @@ -129,10 +120,8 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, CFStreamHandle::WriteCallback, &ctx); - CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), - kCFRunLoopCommonModes); - CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), - kCFRunLoopCommonModes); + CFReadStreamSetDispatchQueue(read_stream, dispatch_queue_); + CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue_); } CFStreamHandle::~CFStreamHandle() { diff --git a/src/core/lib/iomgr/cfstream_handle.h b/src/core/lib/iomgr/cfstream_handle.h index 4258e72431..93ec5f044b 100644 --- a/src/core/lib/iomgr/cfstream_handle.h +++ b/src/core/lib/iomgr/cfstream_handle.h @@ -62,6 +62,8 @@ class CFStreamHandle final { grpc_core::LockfreeEvent read_event_; grpc_core::LockfreeEvent write_event_; + dispatch_queue_t dispatch_queue_; + gpr_refcount refcount_; }; diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 7e4b3c9b2f..61c366098e 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -665,6 +665,7 @@ void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) { GPR_ASSERT(resource_quota->num_threads_allocated == 0); GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota"); gpr_free(resource_quota->name); + gpr_mu_destroy(&resource_quota->thread_count_mu); gpr_free(resource_quota); } } diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index 4b5250803d..86ee1010cf 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -42,6 +42,7 @@ #include "src/core/lib/iomgr/tcp_windows.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #if defined(__MSYS__) && defined(GPR_ARCH_64) /* Nasty workaround for nasty bug when using the 64 bits msys compiler @@ -112,7 +113,10 @@ typedef struct grpc_tcp { grpc_closure* read_cb; grpc_closure* write_cb; - grpc_slice read_slice; + + /* garbage after the last read */ + grpc_slice_buffer last_read_buffer; + grpc_slice_buffer* write_slices; grpc_slice_buffer* read_slices; @@ -131,6 +135,7 @@ static void tcp_free(grpc_tcp* tcp) { grpc_winsocket_destroy(tcp->socket); gpr_mu_destroy(&tcp->mu); gpr_free(tcp->peer_string); + grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); grpc_resource_user_unref(tcp->resource_user); if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error); gpr_free(tcp); @@ -179,9 +184,12 @@ static void on_read(void* tcpp, grpc_error* error) { grpc_tcp* tcp = (grpc_tcp*)tcpp; grpc_closure* cb = tcp->read_cb; grpc_winsocket* socket = tcp->socket; - grpc_slice sub; grpc_winsocket_callback_info* info = &socket->read_info; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p on_read", tcp); + } + GRPC_ERROR_REF(error); if (error == GRPC_ERROR_NONE) { @@ -189,13 +197,35 @@ static void on_read(void* tcpp, grpc_error* error) { char* utf8_message = gpr_format_message(info->wsa_error); error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(utf8_message); gpr_free(utf8_message); - grpc_slice_unref_internal(tcp->read_slice); + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); } else { if (info->bytes_transfered != 0 && !tcp->shutting_down) { - sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered); - grpc_slice_buffer_add(tcp->read_slices, sub); + GPR_ASSERT((size_t)info->bytes_transfered <= tcp->read_slices->length); + if (static_cast<size_t>(info->bytes_transfered) != + tcp->read_slices->length) { + grpc_slice_buffer_trim_end( + tcp->read_slices, + tcp->read_slices->length - + static_cast<size_t>(info->bytes_transfered), + &tcp->last_read_buffer); + } + GPR_ASSERT((size_t)info->bytes_transfered == tcp->read_slices->length); + + if (grpc_tcp_trace.enabled()) { + size_t i; + for (i = 0; i < tcp->read_slices->count; i++) { + char* dump = grpc_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_INFO, "READ %p (peer=%s): %s", tcp, tcp->peer_string, + dump); + gpr_free(dump); + } + } } else { - grpc_slice_unref_internal(tcp->read_slice); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p unref read_slice", tcp); + } + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); error = tcp->shutting_down ? GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "TCP stream shutting down", &tcp->shutdown_error, 1) @@ -209,6 +239,8 @@ static void on_read(void* tcpp, grpc_error* error) { GRPC_CLOSURE_SCHED(cb, error); } +#define DEFAULT_TARGET_READ_SIZE 8192 +#define MAX_WSABUF_COUNT 16 static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, grpc_closure* cb) { grpc_tcp* tcp = (grpc_tcp*)ep; @@ -217,7 +249,12 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, int status; DWORD bytes_read = 0; DWORD flags = 0; - WSABUF buffer; + WSABUF buffers[MAX_WSABUF_COUNT]; + size_t i; + + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p win_read", tcp); + } if (tcp->shutting_down) { GRPC_CLOSURE_SCHED( @@ -229,18 +266,27 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, tcp->read_cb = cb; tcp->read_slices = read_slices; grpc_slice_buffer_reset_and_unref_internal(read_slices); + grpc_slice_buffer_swap(read_slices, &tcp->last_read_buffer); - tcp->read_slice = GRPC_SLICE_MALLOC(8192); + if (tcp->read_slices->length < DEFAULT_TARGET_READ_SIZE / 2 && + tcp->read_slices->count < MAX_WSABUF_COUNT) { + // TODO(jtattermusch): slice should be allocated using resource quota + grpc_slice_buffer_add(tcp->read_slices, + GRPC_SLICE_MALLOC(DEFAULT_TARGET_READ_SIZE)); + } - buffer.len = (ULONG)GRPC_SLICE_LENGTH( - tcp->read_slice); // we know slice size fits in 32bit. - buffer.buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slice); + GPR_ASSERT(tcp->read_slices->count <= MAX_WSABUF_COUNT); + for (i = 0; i < tcp->read_slices->count; i++) { + buffers[i].len = (ULONG)GRPC_SLICE_LENGTH( + tcp->read_slices->slices[i]); // we know slice size fits in 32bit. + buffers[i].buf = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[i]); + } TCP_REF(tcp, "read"); /* First let's try a synchronous, non-blocking read. */ - status = - WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, NULL, NULL); + status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count, + &bytes_read, &flags, NULL, NULL); info->wsa_error = status == 0 ? 0 : WSAGetLastError(); /* Did we get data immediately ? Yay. */ @@ -252,8 +298,8 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices, /* Otherwise, let's retry, by queuing a read. */ memset(&tcp->socket->read_info.overlapped, 0, sizeof(OVERLAPPED)); - status = WSARecv(tcp->socket->socket, &buffer, 1, &bytes_read, &flags, - &info->overlapped, NULL); + status = WSARecv(tcp->socket->socket, buffers, (DWORD)tcp->read_slices->count, + &bytes_read, &flags, &info->overlapped, NULL); if (status != 0) { int wsa_error = WSAGetLastError(); @@ -275,6 +321,10 @@ static void on_write(void* tcpp, grpc_error* error) { grpc_winsocket_callback_info* info = &handle->write_info; grpc_closure* cb; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p on_write", tcp); + } + GRPC_ERROR_REF(error); gpr_mu_lock(&tcp->mu); @@ -303,11 +353,21 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices, unsigned i; DWORD bytes_sent; int status; - WSABUF local_buffers[16]; + WSABUF local_buffers[MAX_WSABUF_COUNT]; WSABUF* allocated = NULL; WSABUF* buffers = local_buffers; size_t len; + if (grpc_tcp_trace.enabled()) { + size_t i; + for (i = 0; i < slices->count; i++) { + char* data = + grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_INFO, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data); + gpr_free(data); + } + } + if (tcp->shutting_down) { GRPC_CLOSURE_SCHED( cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -412,6 +472,7 @@ static void win_shutdown(grpc_endpoint* ep, grpc_error* why) { static void win_destroy(grpc_endpoint* ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = (grpc_tcp*)ep; + grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); TCP_UNREF(tcp, "destroy"); } @@ -463,6 +524,7 @@ grpc_endpoint* grpc_tcp_create(grpc_winsocket* socket, GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx); tcp->peer_string = gpr_strdup(peer_string); + grpc_slice_buffer_init(&tcp->last_read_buffer); tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h index f01852473b..d17a721606 100644 --- a/src/core/lib/surface/channel_init.h +++ b/src/core/lib/surface/channel_init.h @@ -45,6 +45,11 @@ void grpc_channel_init_init(void); /// registration order (in the case of a tie). /// Stages are registered against one of the pre-determined channel stack /// types. +/// If the channel stack type is GRPC_CLIENT_SUBCHANNEL, the caller should +/// ensure that subchannels with different filter lists will always have +/// different channel args. This requires setting a channel arg in case the +/// registration function relies on some condition other than channel args to +/// decide whether to add a filter or not. void grpc_channel_init_register_stage(grpc_channel_stack_type type, int priority, grpc_channel_init_stage stage_fn, diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 67b38e6f0c..7ae6e51a5f 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -194,13 +194,10 @@ struct call_data { }; struct request_matcher { - request_matcher(grpc_server* server); - ~request_matcher(); - grpc_server* server; - std::atomic<call_data*> pending_head{nullptr}; - call_data* pending_tail = nullptr; - gpr_locked_mpscq* requests_per_cq = nullptr; + call_data* pending_head; + call_data* pending_tail; + gpr_locked_mpscq* requests_per_cq; }; struct registered_method { @@ -349,30 +346,22 @@ static void channel_broadcaster_shutdown(channel_broadcaster* cb, * request_matcher */ -namespace { -request_matcher::request_matcher(grpc_server* server) : server(server) { - requests_per_cq = static_cast<gpr_locked_mpscq*>( - gpr_malloc(sizeof(*requests_per_cq) * server->cq_count)); - for (size_t i = 0; i < server->cq_count; i++) { - gpr_locked_mpscq_init(&requests_per_cq[i]); - } -} - -request_matcher::~request_matcher() { +static void request_matcher_init(request_matcher* rm, grpc_server* server) { + memset(rm, 0, sizeof(*rm)); + rm->server = server; + rm->requests_per_cq = static_cast<gpr_locked_mpscq*>( + gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count)); for (size_t i = 0; i < server->cq_count; i++) { - GPR_ASSERT(gpr_locked_mpscq_pop(&requests_per_cq[i]) == nullptr); - gpr_locked_mpscq_destroy(&requests_per_cq[i]); + gpr_locked_mpscq_init(&rm->requests_per_cq[i]); } - gpr_free(requests_per_cq); -} -} // namespace - -static void request_matcher_init(request_matcher* rm, grpc_server* server) { - new (rm) request_matcher(server); } static void request_matcher_destroy(request_matcher* rm) { - rm->~request_matcher(); + for (size_t i = 0; i < rm->server->cq_count; i++) { + GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr); + gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); + } + gpr_free(rm->requests_per_cq); } static void kill_zombie(void* elem, grpc_error* error) { @@ -381,10 +370,9 @@ static void kill_zombie(void* elem, grpc_error* error) { } static void request_matcher_zombify_all_pending_calls(request_matcher* rm) { - call_data* calld; - while ((calld = rm->pending_head.load(std::memory_order_relaxed)) != - nullptr) { - rm->pending_head.store(calld->pending_next, std::memory_order_relaxed); + while (rm->pending_head) { + call_data* calld = rm->pending_head; + rm->pending_head = calld->pending_next; gpr_atm_no_barrier_store(&calld->state, ZOMBIED); GRPC_CLOSURE_INIT( &calld->kill_zombie_closure, kill_zombie, @@ -582,9 +570,8 @@ static void publish_new_rpc(void* arg, grpc_error* error) { } gpr_atm_no_barrier_store(&calld->state, PENDING); - if (rm->pending_head.load(std::memory_order_relaxed) == nullptr) { - rm->pending_head.store(calld, std::memory_order_relaxed); - rm->pending_tail = calld; + if (rm->pending_head == nullptr) { + rm->pending_tail = rm->pending_head = calld; } else { rm->pending_tail->pending_next = calld; rm->pending_tail = calld; @@ -1448,39 +1435,30 @@ static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx, rm = &rc->data.registered.method->matcher; break; } - - // Fast path: if there is no pending request to be processed, immediately - // return. - if (!gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link) || - // Note: We are reading the pending_head without holding the server's call - // mutex. Even if we read a non-null value here due to reordering, - // we will check it below again after grabbing the lock. - rm->pending_head.load(std::memory_order_relaxed) == nullptr) { - return GRPC_CALL_OK; - } - // Slow path: This was the first queued request and there are pendings: - // We need to lock and start matching calls. - gpr_mu_lock(&server->mu_call); - while ((calld = rm->pending_head.load(std::memory_order_relaxed)) != - nullptr) { - rc = reinterpret_cast<requested_call*>( - gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx])); - if (rc == nullptr) break; - rm->pending_head.store(calld->pending_next, std::memory_order_relaxed); - gpr_mu_unlock(&server->mu_call); - if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { - // Zombied Call - GRPC_CLOSURE_INIT( - &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); - } else { - publish_call(server, calld, cq_idx, rc); - } + if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { + /* this was the first queued request: we need to lock and start + matching calls */ gpr_mu_lock(&server->mu_call); + while ((calld = rm->pending_head) != nullptr) { + rc = reinterpret_cast<requested_call*>( + gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx])); + if (rc == nullptr) break; + rm->pending_head = calld->pending_next; + gpr_mu_unlock(&server->mu_call); + if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) { + // Zombied Call + GRPC_CLOSURE_INIT( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE); + } else { + publish_call(server, calld, cq_idx, rc); + } + gpr_mu_lock(&server->mu_call); + } + gpr_mu_unlock(&server->mu_call); } - gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc index 4829cc80a5..70d7580bec 100644 --- a/src/core/lib/surface/version.cc +++ b/src/core/lib/surface/version.cc @@ -25,4 +25,4 @@ const char* grpc_version_string(void) { return "7.0.0-dev"; } -const char* grpc_g_stands_for(void) { return "goose"; } +const char* grpc_g_stands_for(void) { return "gold"; } diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc index 60af22393e..30482a1b3b 100644 --- a/src/core/lib/transport/metadata.cc +++ b/src/core/lib/transport/metadata.cc @@ -187,6 +187,7 @@ static void gc_mdtab(mdtab_shard* shard) { ((destroy_user_data_func)gpr_atm_no_barrier_load( &md->destroy_user_data))(user_data); } + gpr_mu_destroy(&md->mu_user_data); gpr_free(md); *prev_next = next; num_freed++; |