diff options
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 34 | ||||
-rw-r--r-- | src/core/surface/call.h | 1 | ||||
-rw-r--r-- | src/core/surface/channel.c | 45 | ||||
-rw-r--r-- | src/core/surface/channel.h | 2 | ||||
-rw-r--r-- | src/core/surface/channel_connectivity.c | 33 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 1 | ||||
-rw-r--r-- | src/core/surface/channel_ping.c | 79 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 6 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 17 | ||||
-rw-r--r-- | src/core/surface/server_create.c | 13 |
10 files changed, 143 insertions, 88 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 84b9daaa28..5d064ef00d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -336,26 +336,19 @@ void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_cq_pollset(cq)); } -grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { - return call->cq; -} - #ifdef GRPC_STREAM_REFCOUNT_DEBUG -void grpc_call_internal_ref(grpc_call *c, const char *reason) { - grpc_call_stack_ref(CALL_STACK_FROM_CALL(c), reason); -} -void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c, - const char *reason) { - grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c), reason); -} +#define REF_REASON reason +#define REF_ARG , const char *reason #else -void grpc_call_internal_ref(grpc_call *c) { - grpc_call_stack_ref(CALL_STACK_FROM_CALL(c)); +#define REF_REASON "" +#define REF_ARG +#endif +void grpc_call_internal_ref(grpc_call *c REF_ARG) { + GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON); } -void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) { - grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c)); +void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { + GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } -#endif static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) { size_t i; @@ -742,8 +735,15 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, char *grpc_call_get_peer(grpc_call *call) { grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - char *result = elem->filter->get_peer(&exec_ctx, elem); + char *result; GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); + result = elem->filter->get_peer(&exec_ctx, elem); + if (result == NULL) { + result = grpc_channel_get_target(call->channel); + } + if (result == NULL) { + result = gpr_strdup("unknown"); + } grpc_exec_ctx_finish(&exec_ctx); return result; } diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 20907ac6d6..b53340df8e 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -58,7 +58,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_completion_queue *cq); -grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call); #ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_call_internal_ref(grpc_call *call, const char *reason); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 14fe97c30d..573a0e742f 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -63,7 +63,6 @@ typedef struct registered_call { struct grpc_channel { int is_client; - gpr_refcount refs; gpr_uint32 max_message_length; grpc_mdelem *default_authority; @@ -81,6 +80,8 @@ struct grpc_channel { /* the protobuf library will (by default) start warning at 100megs */ #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) +static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, int success); + grpc_channel *grpc_channel_create_from_filters( grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_filter **filters, size_t num_filters, @@ -93,8 +94,6 @@ grpc_channel *grpc_channel_create_from_filters( channel->target = gpr_strdup(target); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; - /* decremented by grpc_channel_destroy */ - gpr_ref_init(&channel->refs, 1); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; @@ -153,7 +152,9 @@ grpc_channel *grpc_channel_create_from_filters( gpr_free(default_authority); } - grpc_channel_stack_init(exec_ctx, filters, num_filters, channel, args, + grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters, + num_filters, args, + is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL", CHANNEL_STACK_FROM_CHANNEL(channel)); return channel; @@ -250,17 +251,25 @@ grpc_call *grpc_channel_create_registered_call( rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline); } -#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG -void grpc_channel_internal_ref(grpc_channel *c, const char *reason) { - gpr_log(GPR_DEBUG, "CHANNEL: ref %p %d -> %d [%s]", c, c->refs.count, - c->refs.count + 1, reason); +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#define REF_REASON reason +#define REF_ARG , const char *reason #else -void grpc_channel_internal_ref(grpc_channel *c) { +#define REF_REASON "" +#define REF_ARG #endif - gpr_ref(&c->refs); +void grpc_channel_internal_ref(grpc_channel *c REF_ARG) { + GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON); } -static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) { +void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, + grpc_channel *c REF_ARG) { + GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON); +} + +static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + grpc_channel *channel = arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel)); while (channel->registered_calls) { registered_call *rc = channel->registered_calls; @@ -279,20 +288,6 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) { gpr_free(channel); } -#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG -void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel, - const char *reason) { - gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel, - channel->refs.count, channel->refs.count - 1, reason); -#else -void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, - grpc_channel *channel) { -#endif - if (gpr_unref(&channel->refs)) { - destroy_channel(exec_ctx, channel); - } -} - void grpc_channel_destroy(grpc_channel *channel) { grpc_transport_op op; grpc_channel_element *elem; diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 7dea609ebc..3d2ff23542 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -53,7 +53,7 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); -#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG +#ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel, const char *reason); diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index df2774b527..ad1c9b334e 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -83,7 +83,6 @@ typedef struct { gpr_mu mu; callback_phase phase; int success; - int removed; grpc_closure on_complete; grpc_timer alarm; grpc_connectivity_state state; @@ -135,30 +134,15 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, int due_to_completion) { int delete = 0; - grpc_channel_element *client_channel_elem = NULL; - gpr_mu_lock(&w->mu); - if (w->removed == 0) { - w->removed = 1; - client_channel_elem = grpc_channel_stack_last_element( - grpc_channel_get_channel_stack(w->channel)); - if (client_channel_elem->filter == &grpc_client_channel_filter) { - grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem, - grpc_cq_pollset(w->cq)); - } else { - grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem, - grpc_cq_pollset(w->cq)); - } - } - gpr_mu_unlock(&w->mu); if (due_to_completion) { - gpr_mu_lock(&w->mu); - w->success = 1; - gpr_mu_unlock(&w->mu); grpc_timer_cancel(exec_ctx, &w->alarm); } gpr_mu_lock(&w->mu); + if (due_to_completion) { + w->success = 1; + } switch (w->phase) { case WAITING: w->phase = CALLING_BACK; @@ -212,7 +196,6 @@ void grpc_channel_watch_connectivity_state( w->phase = WAITING; w->state = last_observed_state; w->success = 0; - w->removed = 0; w->cq = cq; w->tag = tag; w->channel = channel; @@ -223,16 +206,14 @@ void grpc_channel_watch_connectivity_state( if (client_channel_elem->filter == &grpc_client_channel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); - grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem, - grpc_cq_pollset(cq)); grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem, - &w->state, &w->on_complete); + grpc_cq_pollset(cq), &w->state, + &w->on_complete); } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) { GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity"); - grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem, - grpc_cq_pollset(cq)); grpc_client_uchannel_watch_connectivity_state( - &exec_ctx, client_channel_elem, &w->state, &w->on_complete); + &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state, + &w->on_complete); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index fe7e1072ac..97ec23408f 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -171,7 +171,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); args->args = final_args; - args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c new file mode 100644 index 0000000000..1b6f06ded1 --- /dev/null +++ b/src/core/surface/channel_ping.c @@ -0,0 +1,79 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/channel.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/surface/api_trace.h" +#include "src/core/surface/completion_queue.h" + +typedef struct { + grpc_closure closure; + void *tag; + grpc_completion_queue *cq; + grpc_cq_completion completion_storage; +} ping_result; + +static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg, + grpc_cq_completion *storage) { + gpr_free(arg); +} + +static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { + ping_result *pr = arg; + grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr, + &pr->completion_storage); +} + +void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, + void *tag, void *reserved) { + grpc_transport_op op; + ping_result *pr = gpr_malloc(sizeof(*pr)); + grpc_channel_element *top_elem = + grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_ASSERT(reserved == NULL); + memset(&op, 0, sizeof(op)); + pr->tag = tag; + pr->cq = cq; + grpc_closure_init(&pr->closure, ping_done, pr); + op.send_ping = &pr->closure; + op.bind_pollset = grpc_cq_pollset(cq); + grpc_cq_begin_op(cq); + top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op); + grpc_exec_ctx_finish(&exec_ctx); +} diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 4a55544ac1..a60e9d20da 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -49,7 +49,6 @@ typedef struct { } call_data; typedef struct { - grpc_channel *master; grpc_status_code error_code; const char *error_message; } channel_data; @@ -84,8 +83,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_channel_get_target(chand->master); + return NULL; } static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -111,10 +109,8 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { - channel_data *chand = elem->channel_data; GPR_ASSERT(args->is_first); GPR_ASSERT(args->is_last); - chand->master = args->master; } static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index c9a54d9237..92bd53411d 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -228,7 +228,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; - args->master = f->master; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); @@ -305,22 +304,22 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, f->master = channel; GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory"); resolver = grpc_resolver_create(target, &f->base); - if (!resolver) { - grpc_exec_ctx_finish(&exec_ctx); - return NULL; + if (resolver) { + grpc_client_channel_set_resolver( + &exec_ctx, grpc_channel_get_channel_stack(channel), resolver); + GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create"); } - - grpc_client_channel_set_resolver( - &exec_ctx, grpc_channel_get_channel_stack(channel), resolver); - GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create"); grpc_subchannel_factory_unref(&exec_ctx, &f->base); GRPC_SECURITY_CONNECTOR_UNREF(&security_connector->base, "channel_create"); - grpc_channel_args_destroy(args_copy); if (new_args_from_connector != NULL) { grpc_channel_args_destroy(new_args_from_connector); } + if (!resolver) { + GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "subchannel_factory"); + channel = NULL; + } grpc_exec_ctx_finish(&exec_ctx); return channel; diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index c7811a6d88..e362bb4376 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -32,14 +32,21 @@ */ #include <grpc/grpc.h> +#include "src/core/census/grpc_filter.h" +#include "src/core/channel/channel_args.h" +#include "src/core/channel/compress_filter.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/completion_queue.h" #include "src/core/surface/server.h" -#include "src/core/channel/compress_filter.h" grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { - const grpc_channel_filter *filters[] = {&grpc_compress_filter}; + const grpc_channel_filter *filters[3]; + size_t num_filters = 0; + filters[num_filters++] = &grpc_compress_filter; + if (grpc_channel_args_is_census_enabled(args)) { + filters[num_filters++] = &grpc_server_census_filter; + } GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); - return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters), + return grpc_server_create_from_filters(filters, num_filters, args); } |