diff options
author | David Garcia Quintas <dgq@google.com> | 2015-08-09 09:21:01 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-08-09 09:21:01 -0700 |
commit | 49a513031880a6303273f3fe3cec837225a5ad78 (patch) | |
tree | 0140ee377eb87732cdb6072576751d98f8c9d8cc /src/core/surface | |
parent | 7c0d914cce379f14a1adfae9374641967c45d7b2 (diff) | |
parent | 10494fcb61d638682fb8e5d28356a1f5125e8d0a (diff) |
Merge branch 'compression-accept-encoding' into compression-interop
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 119 | ||||
-rw-r--r-- | src/core/surface/call.h | 4 | ||||
-rw-r--r-- | src/core/surface/channel.c | 75 | ||||
-rw-r--r-- | src/core/surface/channel.h | 2 | ||||
-rw-r--r-- | src/core/surface/channel_connectivity.c | 185 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 13 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 75 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 2 | ||||
-rw-r--r-- | src/core/surface/init.c | 16 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 18 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 17 | ||||
-rw-r--r-- | src/core/surface/server.c | 116 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 2 |
13 files changed, 509 insertions, 135 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 0e97110e5e..a7624fd96f 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -41,7 +41,6 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> -#include "src/core/census/grpc_context.h" #include "src/core/channel/channel_stack.h" #include "src/core/iomgr/alarm.h" #include "src/core/profiling/timers.h" @@ -145,6 +144,8 @@ typedef enum { struct grpc_call { grpc_completion_queue *cq; grpc_channel *channel; + grpc_call *parent; + grpc_call *first_child; grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; @@ -178,6 +179,8 @@ struct grpc_call { gpr_uint8 cancel_alarm; /** bitmask of allocated completion events in completions */ gpr_uint8 allocated_completions; + /** flag indicating that cancellation is inherited */ + gpr_uint8 cancellation_is_inherited; /* flags with bits corresponding to write states allowing us to determine what was sent */ @@ -272,6 +275,11 @@ struct grpc_call { /** completion events - for completion queue use */ grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; + + /** siblings: children of the same parent form a list, and this list is protected under + parent->mu */ + grpc_call *sibling_next; + grpc_call *sibling_prev; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -295,7 +303,9 @@ static void finished_loose_op(void *call, int success); static void lock(grpc_call *call); static void unlock(grpc_call *call); -grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, + gpr_uint32 propagation_mask, + grpc_completion_queue *cq, const void *server_transport_data, grpc_mdelem **add_initial_metadata, size_t add_initial_metadata_count, @@ -311,9 +321,10 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_mu_init(&call->completion_mu); call->channel = channel; call->cq = cq; - if (cq) { + if (cq != NULL) { GRPC_CQ_INTERNAL_REF(cq, "bind"); } + call->parent = parent_call; call->is_client = server_transport_data == NULL; for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { call->request_set[i] = REQSET_EMPTY; @@ -352,7 +363,48 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, } grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr, CALL_STACK_FROM_CALL(call)); - if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != 0) { + if (parent_call != NULL) { + GRPC_CALL_INTERNAL_REF(parent_call, "child"); + GPR_ASSERT(call->is_client); + GPR_ASSERT(!parent_call->is_client); + + gpr_mu_lock(&parent_call->mu); + + if (propagation_mask & GRPC_PROPAGATE_DEADLINE) { + send_deadline = gpr_time_min( + gpr_convert_clock_type(send_deadline, + parent_call->send_deadline.clock_type), + parent_call->send_deadline); + } + /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with + * GRPC_PROPAGATE_STATS_CONTEXT */ + /* TODO(ctiller): This should change to use the appropriate census start_op + * call. */ + if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { + GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, + parent_call->context[GRPC_CONTEXT_TRACING].value, + NULL); + } else { + GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); + } + if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) { + call->cancellation_is_inherited = 1; + } + + if (parent_call->first_child == NULL) { + parent_call->first_child = call; + call->sibling_next = call->sibling_prev = call; + } else { + call->sibling_next = parent_call->first_child; + call->sibling_prev = parent_call->first_child->sibling_prev; + call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call; + } + + gpr_mu_unlock(&parent_call->mu); + } + if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != + 0) { set_deadline_alarm(call, send_deadline); } return call; @@ -911,6 +963,8 @@ static int add_slice_to_message(grpc_call *call, gpr_slice slice) { static void call_on_done_recv(void *pc, int success) { grpc_call *call = pc; + grpc_call *child_call; + grpc_call *next_child_call; size_t i; GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0); lock(call); @@ -944,6 +998,19 @@ static void call_on_done_recv(void *pc, int success) { GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); call->read_state = READ_STATE_STREAM_CLOSED; call->cancel_alarm |= call->have_alarm; + /* propagate cancellation to any interested children */ + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + grpc_call_cancel(child_call); + GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0); + } + child_call = next_child_call; + } while (child_call != call->first_child); + } GRPC_CALL_INTERNAL_UNREF(call, "closed", 0); } finish_read_ops(call); @@ -973,7 +1040,7 @@ static int prepare_application_metadata(grpc_call *call, size_t count, GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, (const gpr_uint8 *)md->value, - md->value_length); + md->value_length, 1); if (!grpc_mdstr_is_legal_header(l->md->key)) { gpr_log(GPR_ERROR, "attempt to send invalid metadata key"); return 0; @@ -1217,6 +1284,22 @@ grpc_call_error grpc_call_start_ioreq_and_call_back( void grpc_call_destroy(grpc_call *c) { int cancel; + grpc_call *parent = c->parent; + + if (parent) { + gpr_mu_lock(&parent->mu); + if (c == parent->first_child) { + parent->first_child = c->sibling_next; + if (c == parent->first_child) { + parent->first_child = NULL; + } + c->sibling_prev->sibling_next = c->sibling_next; + c->sibling_next->sibling_prev = c->sibling_prev; + } + gpr_mu_unlock(&parent->mu); + GRPC_CALL_INTERNAL_UNREF(parent, "child", 1); + } + lock(c); GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; @@ -1244,7 +1327,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status, const char *description) { grpc_mdstr *details = - description ? grpc_mdstr_from_string(c->metadata_context, description) + description ? grpc_mdstr_from_string(c->metadata_context, description, 0) : NULL; GPR_ASSERT(status != GRPC_STATUS_OK); @@ -1294,6 +1377,11 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) { elem->filter->start_transport_stream_op(elem, op); } +char *grpc_call_get_peer(grpc_call *call) { + grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); + return elem->filter->get_peer(elem); +} + grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } @@ -1319,7 +1407,8 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { } GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; - grpc_alarm_init(&call->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), call_alarm, call, + call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + grpc_alarm_init(&call->alarm, call->send_deadline, call_alarm, call, gpr_now(GPR_CLOCK_MONOTONIC)); } @@ -1364,8 +1453,9 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) { gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); assert(0); } - grpc_mdelem_set_user_data(md, destroy_compression, - (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET)); + grpc_mdelem_set_user_data( + md, destroy_compression, + (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET)); } return algorithm; } @@ -1414,7 +1504,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { l->md = 0; } } - if (gpr_time_cmp(md->deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) { + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != + 0 && + !call->is_client) { set_deadline_alarm(call, md->deadline); } if (!is_trailing) { @@ -1502,6 +1594,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, if (!are_write_flags_valid(op->flags)) { return GRPC_CALL_ERROR_INVALID_FLAGS; } + if (op->data.send_message == NULL) { + return GRPC_CALL_ERROR_INVALID_MESSAGE; + } req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; @@ -1537,7 +1632,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.send_status_from_server.status_details != NULL ? grpc_mdstr_from_string( call->metadata_context, - op->data.send_status_from_server.status_details) + op->data.send_status_from_server.status_details, 0) : NULL; req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_CLOSE; @@ -1551,6 +1646,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; req->data.recv_metadata = op->data.recv_initial_metadata; + req->data.recv_metadata->count = 0; req->flags = op->flags; break; case GRPC_OP_RECV_MESSAGE: @@ -1582,6 +1678,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->op = GRPC_IOREQ_RECV_TRAILING_METADATA; req->data.recv_metadata = op->data.recv_status_on_client.trailing_metadata; + req->data.recv_metadata->count = 0; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; finish_func = finish_batch_with_close; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 5641561f85..00638e43b5 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -89,7 +89,9 @@ typedef struct { typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, void *user_data); -grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, + gpr_uint32 propagation_mask, + grpc_completion_queue *cq, const void *server_transport_data, grpc_mdelem **add_initial_metadata, size_t add_initial_metadata_count, diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index cd71e03b19..b80398858d 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -36,12 +36,14 @@ #include <stdlib.h> #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + #include "src/core/iomgr/iomgr.h" #include "src/core/support/string.h" #include "src/core/surface/call.h" #include "src/core/surface/init.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> /** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. * Avoids needing to take a metadata context lock for sending status @@ -74,6 +76,7 @@ struct grpc_channel { gpr_mu registered_call_mu; registered_call *registered_calls; grpc_iomgr_closure destroy_closure; + char *target; }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) @@ -86,33 +89,35 @@ struct grpc_channel { #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) grpc_channel *grpc_channel_create_from_filters( - const grpc_channel_filter **filters, size_t num_filters, + const char *target, const grpc_channel_filter **filters, size_t num_filters, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); memset(channel, 0, sizeof(*channel)); + channel->target = gpr_strdup(target); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; /* decremented by grpc_channel_destroy */ gpr_ref_init(&channel->refs, 1); channel->metadata_context = mdctx; - channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); + channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status", 0); channel->grpc_compression_algorithm_string = - grpc_mdstr_from_string(mdctx, "grpc-encoding"); + grpc_mdstr_from_string(mdctx, "grpc-encoding", 0); channel->grpc_encodings_accepted_by_peer_string = - grpc_mdstr_from_string(mdctx, "grpc-accept-encoding"); - channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); + grpc_mdstr_from_string(mdctx, "grpc-accept-encoding", 0); + channel->grpc_message_string = + grpc_mdstr_from_string(mdctx, "grpc-message", 0); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { char buf[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(i, buf); channel->grpc_status_elem[i] = grpc_mdelem_from_metadata_strings( mdctx, GRPC_MDSTR_REF(channel->grpc_status_string), - grpc_mdstr_from_string(mdctx, buf)); + grpc_mdstr_from_string(mdctx, buf, 0)); } - channel->path_string = grpc_mdstr_from_string(mdctx, ":path"); - channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); + channel->path_string = grpc_mdstr_from_string(mdctx, ":path", 0); + channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority", 0); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; @@ -140,32 +145,43 @@ grpc_channel *grpc_channel_create_from_filters( return channel; } +char *grpc_channel_get_target(grpc_channel *channel) { + return gpr_strdup(channel->target); +} + static grpc_call *grpc_channel_create_call_internal( - grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, + grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, + grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { grpc_mdelem *send_metadata[2]; + int num_metadata = 0; GPR_ASSERT(channel->is_client); - send_metadata[0] = path_mdelem; - send_metadata[1] = authority_mdelem; + send_metadata[num_metadata++] = path_mdelem; + if (authority_mdelem != NULL) { + send_metadata[num_metadata++] = authority_mdelem; + } - return grpc_call_create(channel, cq, NULL, send_metadata, - GPR_ARRAY_SIZE(send_metadata), deadline); + return grpc_call_create(channel, parent_call, propagation_mask, cq, NULL, + send_metadata, num_metadata, deadline); } grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_call *parent_call, + gpr_uint32 propagation_mask, grpc_completion_queue *cq, const char *method, const char *host, gpr_timespec deadline) { return grpc_channel_create_call_internal( - channel, cq, + channel, parent_call, propagation_mask, cq, grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), - grpc_mdstr_from_string(channel->metadata_context, method)), + grpc_mdstr_from_string(channel->metadata_context, method, 0)), + host ? grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string), - grpc_mdstr_from_string(channel->metadata_context, host)), + grpc_mdstr_from_string(channel->metadata_context, host, 0)) : NULL, deadline); } @@ -174,10 +190,10 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, registered_call *rc = gpr_malloc(sizeof(registered_call)); rc->path = grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), - grpc_mdstr_from_string(channel->metadata_context, method)); - rc->authority = grpc_mdelem_from_metadata_strings( + grpc_mdstr_from_string(channel->metadata_context, method, 0)); + rc->authority = host ? grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->authority_string), - grpc_mdstr_from_string(channel->metadata_context, host)); + grpc_mdstr_from_string(channel->metadata_context, host, 0)) : NULL; gpr_mu_lock(&channel->registered_call_mu); rc->next = channel->registered_calls; channel->registered_calls = rc; @@ -186,12 +202,14 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, } grpc_call *grpc_channel_create_registered_call( - grpc_channel *channel, grpc_completion_queue *completion_queue, - void *registered_call_handle, gpr_timespec deadline) { + grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, + grpc_completion_queue *completion_queue, void *registered_call_handle, + gpr_timespec deadline) { registered_call *rc = registered_call_handle; return grpc_channel_create_call_internal( - channel, completion_queue, GRPC_MDELEM_REF(rc->path), - GRPC_MDELEM_REF(rc->authority), deadline); + channel, parent_call, propagation_mask, completion_queue, + GRPC_MDELEM_REF(rc->path), + rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline); } #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG @@ -221,11 +239,14 @@ static void destroy_channel(void *p, int ok) { registered_call *rc = channel->registered_calls; channel->registered_calls = rc->next; GRPC_MDELEM_UNREF(rc->path); - GRPC_MDELEM_UNREF(rc->authority); + if (rc->authority) { + GRPC_MDELEM_UNREF(rc->authority); + } gpr_free(rc); } grpc_mdctx_unref(channel->metadata_context); gpr_mu_destroy(&channel->registered_call_mu); + gpr_free(channel->target); gpr_free(channel); } @@ -284,7 +305,7 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { gpr_ltoa(i, tmp); return grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->grpc_status_string), - grpc_mdstr_from_string(channel->metadata_context, tmp)); + grpc_mdstr_from_string(channel->metadata_context, tmp, 0)); } } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 1d1550bbe7..f271616f60 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -38,7 +38,7 @@ #include "src/core/client_config/subchannel_factory.h" grpc_channel *grpc_channel_create_from_filters( - const grpc_channel_filter **filters, size_t count, + const char *target, const grpc_channel_filter **filters, size_t count, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); /** Get a (borrowed) pointer to this channels underlying channel stack */ diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c new file mode 100644 index 0000000000..1223706457 --- /dev/null +++ b/src/core/surface/channel_connectivity.c @@ -0,0 +1,185 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/channel.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/channel/client_channel.h" +#include "src/core/iomgr/alarm.h" +#include "src/core/surface/completion_queue.h" + +grpc_connectivity_state grpc_channel_check_connectivity_state( + grpc_channel *channel, int try_to_connect) { + /* forward through to the underlying client channel */ + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + if (client_channel_elem->filter != &grpc_client_channel_filter) { + gpr_log(GPR_ERROR, + "grpc_channel_check_connectivity_state called on something that is " + "not a client channel, but '%s'", + client_channel_elem->filter->name); + return GRPC_CHANNEL_FATAL_FAILURE; + } + return grpc_client_channel_check_connectivity_state(client_channel_elem, + try_to_connect); +} + +typedef enum { + WAITING, + CALLING_BACK, + CALLING_BACK_AND_FINISHED, + CALLED_BACK +} callback_phase; + +typedef struct { + gpr_mu mu; + callback_phase phase; + int success; + grpc_iomgr_closure on_complete; + grpc_alarm alarm; + grpc_connectivity_state state; + grpc_completion_queue *cq; + grpc_cq_completion completion_storage; + grpc_channel *channel; + void *tag; +} state_watcher; + +static void delete_state_watcher(state_watcher *w) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(w->channel)); + grpc_client_channel_del_interested_party(client_channel_elem, grpc_cq_pollset(w->cq)); + GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity"); + gpr_mu_destroy(&w->mu); + gpr_free(w); +} + +static void finished_completion(void *pw, grpc_cq_completion *ignored) { + int delete = 0; + state_watcher *w = pw; + gpr_mu_lock(&w->mu); + switch (w->phase) { + case WAITING: + case CALLED_BACK: + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + break; + case CALLING_BACK: + w->phase = CALLED_BACK; + break; + case CALLING_BACK_AND_FINISHED: + delete = 1; + break; + } + gpr_mu_unlock(&w->mu); + + if (delete) { + delete_state_watcher(w); + } +} + +static void partly_done(state_watcher *w, int due_to_completion) { + int delete = 0; + + if (due_to_completion) { + gpr_mu_lock(&w->mu); + w->success = 1; + gpr_mu_unlock(&w->mu); + grpc_alarm_cancel(&w->alarm); + } + + gpr_mu_lock(&w->mu); + switch (w->phase) { + case WAITING: + w->phase = CALLING_BACK; + grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w, + &w->completion_storage); + break; + case CALLING_BACK: + w->phase = CALLING_BACK_AND_FINISHED; + break; + case CALLING_BACK_AND_FINISHED: + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + break; + case CALLED_BACK: + delete = 1; + break; + } + gpr_mu_unlock(&w->mu); + + if (delete) { + delete_state_watcher(w); + } +} + +static void watch_complete(void *pw, int success) { partly_done(pw, 1); } + +static void timeout_complete(void *pw, int success) { partly_done(pw, 0); } + +void grpc_channel_watch_connectivity_state( + grpc_channel *channel, grpc_connectivity_state last_observed_state, + gpr_timespec deadline, grpc_completion_queue *cq, void *tag) { + grpc_channel_element *client_channel_elem = + grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); + state_watcher *w = gpr_malloc(sizeof(*w)); + + grpc_cq_begin_op(cq); + + gpr_mu_init(&w->mu); + grpc_iomgr_closure_init(&w->on_complete, watch_complete, w); + w->phase = WAITING; + w->state = last_observed_state; + w->success = 0; + w->cq = cq; + w->tag = tag; + w->channel = channel; + + grpc_alarm_init( + &w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); + + if (client_channel_elem->filter != &grpc_client_channel_filter) { + gpr_log(GPR_ERROR, + "grpc_channel_watch_connectivity_state called on something that is " + "not a client channel, but '%s'", + client_channel_elem->filter->name); + grpc_iomgr_add_delayed_callback(&w->on_complete, 1); + } else { + GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); + grpc_client_channel_add_interested_party(client_channel_elem, grpc_cq_pollset(cq)); + grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state, + &w->on_complete); + } +} diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 91c7b35550..707d615688 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -109,6 +109,7 @@ typedef struct { gpr_refcount refs; grpc_mdctx *mdctx; grpc_channel_args *merge_args; + grpc_channel *master; } subchannel_factory; static void subchannel_factory_ref(grpc_subchannel_factory *scf) { @@ -119,6 +120,7 @@ static void subchannel_factory_ref(grpc_subchannel_factory *scf) { static void subchannel_factory_unref(grpc_subchannel_factory *scf) { subchannel_factory *f = (subchannel_factory *)scf; if (gpr_unref(&f->refs)) { + GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory"); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -137,6 +139,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; + args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(&c->base); grpc_channel_args_destroy(final_args); @@ -151,8 +154,8 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = { Asynchronously: - resolve target - connect to it (trying alternatives as presented) - perform handshakes */ -grpc_channel *grpc_channel_create(const char *target, - const grpc_channel_args *args) { +grpc_channel *grpc_insecure_channel_create(const char *target, + const grpc_channel_args *args) { grpc_channel *channel = NULL; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; @@ -168,18 +171,22 @@ grpc_channel *grpc_channel_create(const char *target, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); + channel = + grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1); + f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; gpr_ref_init(&f->refs, 1); grpc_mdctx_ref(mdctx); f->mdctx = mdctx; f->merge_args = grpc_channel_args_copy(args); + f->master = channel; + GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory"); resolver = grpc_resolver_create(target, &f->base); if (!resolver) { return NULL; } - channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 3f60b0b0ba..36d69cfe5f 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -45,6 +45,11 @@ #include <grpc/support/atm.h> #include <grpc/support/log.h> +typedef struct { + grpc_pollset_worker *worker; + void *tag; +} plucker; + /* Completion queue structure */ struct grpc_completion_queue { /** completed events */ @@ -60,6 +65,8 @@ struct grpc_completion_queue { int shutdown; int shutdown_called; int is_server_cq; + int num_pluckers; + plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; }; grpc_completion_queue *grpc_completion_queue_create(void) { @@ -107,6 +114,11 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { } void grpc_cq_begin_op(grpc_completion_queue *cc) { +#ifndef NDEBUG + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + GPR_ASSERT(!cc->shutdown_called); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); +#endif gpr_ref(&cc->pending_events); } @@ -117,6 +129,8 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, void (*done)(void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { int shutdown; + int i; + grpc_pollset_worker *pluck_worker; storage->tag = tag; storage->done = done; @@ -130,7 +144,14 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, cc->completed_tail->next = ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next); cc->completed_tail = storage; - grpc_pollset_kick(&cc->pollset); + pluck_worker = NULL; + for (i = 0; i < cc->num_pluckers; i++) { + if (cc->pluckers[i].tag == tag) { + pluck_worker = cc->pluckers[i].worker; + break; + } + } + grpc_pollset_kick(&cc->pollset, pluck_worker); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } else { cc->completed_tail->next = @@ -147,6 +168,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline) { grpc_event ret; + grpc_pollset_worker worker; deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -172,7 +194,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, ret.type = GRPC_QUEUE_SHUTDOWN; break; } - if (!grpc_pollset_work(&cc->pollset, deadline)) { + if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; @@ -184,11 +206,37 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, return ret; } +static int add_plucker(grpc_completion_queue *cc, void *tag, + grpc_pollset_worker *worker) { + if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { + return 0; + } + cc->pluckers[cc->num_pluckers].tag = tag; + cc->pluckers[cc->num_pluckers].worker = worker; + cc->num_pluckers++; + return 1; +} + +static void del_plucker(grpc_completion_queue *cc, void *tag, + grpc_pollset_worker *worker) { + int i; + for (i = 0; i < cc->num_pluckers; i++) { + if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) { + cc->num_pluckers--; + GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]); + return; + } + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} + grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; + grpc_pollset_worker worker; deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -219,12 +267,24 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ret.type = GRPC_QUEUE_SHUTDOWN; break; } - if (!grpc_pollset_work(&cc->pollset, deadline)) { + if (!add_plucker(cc, tag, &worker)) { + gpr_log(GPR_DEBUG, + "Too many outstanding grpc_completion_queue_pluck calls: maximum is %d", + GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); + /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; break; } + if (!grpc_pollset_work(&cc->pollset, &worker, deadline)) { + del_plucker(cc, tag, &worker); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + break; + } + del_plucker(cc, tag, &worker); } done: GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); @@ -261,15 +321,6 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { return &cc->pollset; } -void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) { - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - grpc_pollset_kick(&cc->pollset); - grpc_pollset_work(&cc->pollset, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(100, GPR_TIMESPAN))); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index f944f48d8e..8de024aaea 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -77,8 +77,6 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); -void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc); - void grpc_cq_mark_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 04e27d30ac..442bc72f21 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -39,6 +39,7 @@ #include "src/core/channel/channel_stack.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/client_config/resolvers/dns_resolver.h" +#include "src/core/client_config/resolvers/sockaddr_resolver.h" #include "src/core/debug/trace.h" #include "src/core/iomgr/iomgr.h" #include "src/core/profiling/timers.h" @@ -46,10 +47,7 @@ #include "src/core/surface/init.h" #include "src/core/surface/surface_trace.h" #include "src/core/transport/chttp2_transport.h" - -#ifdef GPR_POSIX_SOCKET -#include "src/core/client_config/resolvers/unix_resolver_posix.h" -#endif +#include "src/core/transport/connectivity_state.h" static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; @@ -68,6 +66,8 @@ void grpc_init(void) { gpr_time_init(); grpc_resolver_registry_init("dns:///"); grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create()); + grpc_register_resolver_type("ipv4", grpc_ipv4_resolver_factory_create()); + grpc_register_resolver_type("ipv6", grpc_ipv6_resolver_factory_create()); #ifdef GPR_POSIX_SOCKET grpc_register_resolver_type("unix", grpc_unix_resolver_factory_create()); #endif @@ -76,11 +76,15 @@ void grpc_init(void) { grpc_register_tracer("http", &grpc_http_trace); grpc_register_tracer("flowctl", &grpc_flowctl_trace); grpc_register_tracer("batch", &grpc_trace_batch); + grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace); grpc_security_pre_init(); grpc_iomgr_init(); grpc_tracer_init("GRPC_TRACE"); - if (census_initialize(CENSUS_NONE)) { - gpr_log(GPR_ERROR, "Could not initialize census."); + /* Only initialize census if noone else has. */ + if (census_enabled() == CENSUS_FEATURE_NONE) { + if (census_initialize(census_supported())) { /* enable all features. */ + gpr_log(GPR_ERROR, "Could not initialize census."); + } } grpc_timers_global_init(); } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 3f2bb5c8a9..c4215a2cfb 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -47,7 +47,10 @@ typedef struct { grpc_linked_mdelem details; } call_data; -typedef struct { grpc_mdctx *mdctx; } channel_data; +typedef struct { + grpc_mdctx *mdctx; + grpc_channel *master; +} channel_data; static void lame_start_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op) { @@ -82,6 +85,11 @@ static void lame_start_transport_stream_op(grpc_call_element *elem, } } +static char *lame_get_peer(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + return grpc_channel_get_target(chand->master); +} + static void lame_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) { if (op->on_connectivity_state_change) { @@ -112,6 +120,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GPR_ASSERT(is_first); GPR_ASSERT(is_last); chand->mdctx = mdctx; + chand->master = master; } static void destroy_channel_elem(grpc_channel_element *elem) {} @@ -125,11 +134,12 @@ static const grpc_channel_filter lame_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + lame_get_peer, "lame-client", }; -grpc_channel *grpc_lame_client_channel_create(void) { +grpc_channel *grpc_lame_client_channel_create(const char *target) { static const grpc_channel_filter *filters[] = {&lame_filter}; - return grpc_channel_create_from_filters(filters, 1, NULL, grpc_mdctx_create(), - 1); + return grpc_channel_create_from_filters(target, filters, 1, NULL, + grpc_mdctx_create(), 1); } diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index d87ec97b53..c3150250b8 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -88,8 +88,8 @@ static void on_secure_transport_setup_done(void *arg, c->args.channel_args, secure_endpoint, c->args.metadata_context, 1); grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); - c->result->filters[0] = &grpc_client_auth_filter; - c->result->filters[1] = &grpc_http_client_filter; + c->result->filters[0] = &grpc_http_client_filter; + c->result->filters[1] = &grpc_client_auth_filter; c->result->num_filters = 2; } notify = c->notify; @@ -134,6 +134,7 @@ typedef struct { grpc_mdctx *mdctx; grpc_channel_args *merge_args; grpc_channel_security_connector *security_connector; + grpc_channel *master; } subchannel_factory; static void subchannel_factory_ref(grpc_subchannel_factory *scf) { @@ -146,6 +147,7 @@ static void subchannel_factory_unref(grpc_subchannel_factory *scf) { if (gpr_unref(&f->refs)) { GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, "subchannel_factory"); + GRPC_CHANNEL_INTERNAL_UNREF(f->master, "subchannel_factory"); grpc_channel_args_destroy(f->merge_args); grpc_mdctx_unref(f->mdctx); gpr_free(f); @@ -165,6 +167,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_ref_init(&c->refs, 1); args->mdctx = f->mdctx; args->args = final_args; + args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(&c->base); grpc_channel_args_destroy(final_args); @@ -196,13 +199,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, if (grpc_find_security_connector_in_args(args) != NULL) { gpr_log(GPR_ERROR, "Cannot set security context in channel args."); - return grpc_lame_client_channel_create(); + return grpc_lame_client_channel_create(target); } if (grpc_credentials_create_security_connector( creds, target, args, NULL, &connector, &new_args_from_connector) != GRPC_SECURITY_OK) { - return grpc_lame_client_channel_create(); + return grpc_lame_client_channel_create(target); } mdctx = grpc_mdctx_create(); @@ -218,6 +221,9 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); + channel = + grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1); + f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; gpr_ref_init(&f->refs, 1); @@ -226,12 +232,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory"); f->security_connector = connector; f->merge_args = grpc_channel_args_copy(args_copy); + f->master = channel; + GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory"); resolver = grpc_resolver_create(target, &f->base); if (!resolver) { return NULL; } - channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 439452aea2..cd1dc589e1 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -327,6 +327,14 @@ static void request_matcher_zombify_all_pending_calls( } } +static void request_matcher_kill_requests(grpc_server *server, + request_matcher *rm) { + int request_id; + while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { + fail_call(server, &server->requested_calls[request_id]); + } +} + /* * server proper */ @@ -400,6 +408,15 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, call_data *calld = elem->call_data; int request_id; + if (gpr_atm_acq_load(&server->shutdown_flag)) { + gpr_mu_lock(&calld->mu_state); + calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); + return; + } + request_id = gpr_stack_lockfree_pop(request_matcher->requests); if (request_id == -1) { gpr_mu_lock(&server->mu_call); @@ -483,12 +500,25 @@ static int num_channels(grpc_server *server) { return n; } +static void kill_pending_work_locked(grpc_server *server) { + registered_method *rm; + request_matcher_kill_requests(server, &server->unregistered_request_matcher); + request_matcher_zombify_all_pending_calls( + &server->unregistered_request_matcher); + for (rm = server->registered_methods; rm; rm = rm->next) { + request_matcher_kill_requests(server, &rm->request_matcher); + request_matcher_zombify_all_pending_calls(&rm->request_matcher); + } +} + static void maybe_finish_shutdown(grpc_server *server) { size_t i; if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; } + kill_pending_work_locked(server); + if (server->root_channel_data.next != &server->root_channel_data || server->listeners_destroyed < num_listeners(server)) { if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), @@ -530,6 +560,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; + gpr_timespec op_deadline; if (success && !calld->got_initial_metadata) { size_t i; @@ -539,12 +570,15 @@ static void server_on_recv(void *ptr, int success) { grpc_stream_op *op = &ops[i]; if (op->type != GRPC_OP_METADATA) continue; grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - if (0 != gpr_time_cmp(op->data.metadata.deadline, - gpr_inf_future(GPR_CLOCK_REALTIME))) { + op_deadline = op->data.metadata.deadline; + if (0 != + gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { calld->deadline = op->data.metadata.deadline; } - calld->got_initial_metadata = 1; - start_new_rpc(elem); + if (calld->host && calld->path) { + calld->got_initial_metadata = 1; + start_new_rpc(elem); + } break; } } @@ -610,8 +644,8 @@ static void accept_stream(void *cd, grpc_transport *transport, const void *transport_server_data) { channel_data *chand = cd; /* create a call */ - grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0, - gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL, + 0, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { @@ -677,8 +711,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GPR_ASSERT(!is_last); chand->server = NULL; chand->channel = NULL; - chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); - chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); + chand->path_key = grpc_mdstr_from_string(metadata_context, ":path", 0); + chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority", 0); chand->next = chand->prev = chand; chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; @@ -722,6 +756,7 @@ static const grpc_channel_filter server_surface_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "server", }; @@ -878,8 +913,8 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_transport_perform_op(transport, &op); } - channel = - grpc_channel_create_from_filters(filters, num_filters, args, mdctx, 0); + channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args, + mdctx, 0); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0) ->channel_data; @@ -899,8 +934,8 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, chand->registered_methods = gpr_malloc(alloc); memset(chand->registered_methods, 0, alloc); for (rm = s->registered_methods; rm; rm = rm->next) { - host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL; - method = grpc_mdstr_from_string(mdctx, rm->method); + host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host, 0) : NULL; + method = grpc_mdstr_from_string(mdctx, rm->method, 0); hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash); for (probes = 0; chand->registered_methods[(hash + probes) % slots] .server_registered_method != NULL; @@ -933,52 +968,15 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, op.set_accept_stream_user_data = chand; op.on_connectivity_state_change = &chand->channel_connectivity_changed; op.connectivity_state = &chand->connectivity_state; + op.disconnect = gpr_atm_acq_load(&s->shutdown_flag); grpc_transport_perform_op(transport, &op); } -typedef struct { - requested_call **requests; - size_t count; - size_t capacity; -} request_killer; - -static void request_killer_init(request_killer *rk) { - memset(rk, 0, sizeof(*rk)); -} - -static void request_killer_add(request_killer *rk, requested_call *rc) { - if (rk->capacity == rk->count) { - rk->capacity = GPR_MAX(8, rk->capacity * 2); - rk->requests = - gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests)); - } - rk->requests[rk->count++] = rc; -} - -static void request_killer_add_request_matcher(request_killer *rk, - grpc_server *server, - request_matcher *rm) { - int request_id; - while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { - request_killer_add(rk, &server->requested_calls[request_id]); - } -} - -static void request_killer_run(request_killer *rk, grpc_server *server) { - size_t i; - for (i = 0; i < rk->count; i++) { - fail_call(server, rk->requests[i]); - } - gpr_free(rk->requests); -} - void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; - registered_method *rm; shutdown_tag *sdt; channel_broadcaster broadcaster; - request_killer reqkill; GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag); @@ -999,27 +997,16 @@ void grpc_server_shutdown_and_notify(grpc_server *server, server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); channel_broadcaster_init(server, &broadcaster); - request_killer_init(&reqkill); /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - request_killer_add_request_matcher(&reqkill, server, - &server->unregistered_request_matcher); - request_matcher_zombify_all_pending_calls( - &server->unregistered_request_matcher); - for (rm = server->registered_methods; rm; rm = rm->next) { - request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher); - request_matcher_zombify_all_pending_calls(&rm->request_matcher); - } + kill_pending_work_locked(server); gpr_mu_unlock(&server->mu_call); gpr_atm_rel_store(&server->shutdown_flag, 1); maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu_global); - /* terminate all the requested calls */ - request_killer_run(&reqkill, server); - /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { l->destroy(server, l->arg); @@ -1259,6 +1246,8 @@ static void done_request_event(void *req, grpc_cq_completion *c) { } else { gpr_free(req); } + + server_unref(server); } static void fail_call(grpc_server *server, requested_call *rc) { @@ -1271,6 +1260,7 @@ static void fail_call(grpc_server *server, requested_call *rc) { rc->data.registered.initial_metadata->count = 0; break; } + server_ref(server); grpc_cq_end_op(rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion); } @@ -1281,6 +1271,8 @@ static void publish_registered_or_batch(grpc_call *call, int success, grpc_call_stack_element(grpc_call_get_call_stack(call), 0); requested_call *rc = prc; call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + server_ref(chand->server); grpc_cq_end_op(calld->cq_new, rc->tag, success, done_request_event, rc, &rc->completion); GRPC_CALL_INTERNAL_UNREF(call, "server", 0); diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index 78c53466b3..4ab845bc00 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -80,7 +80,7 @@ static void destroy(grpc_server *server, void *tcpp) { grpc_tcp_server_destroy(tcp, grpc_server_listener_destroy_done, server); } -int grpc_server_add_http2_port(grpc_server *server, const char *addr) { +int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { grpc_resolved_addresses *resolved = NULL; grpc_tcp_server *tcp = NULL; size_t i; |