diff options
author | 2015-04-30 01:52:19 +0200 | |
---|---|---|
committer | 2015-04-30 01:52:19 +0200 | |
commit | f173793e51e726a887b84b4c37588e2e308191cc (patch) | |
tree | 0c03feb8ebdb68a99c51d3f24c3aa39755f14793 /src/core/surface/channel.c | |
parent | 882d7a7eec148046a90e1d111b93e353e3df0c31 (diff) | |
parent | 1685d773ef81420bf747c6988a2ad9baefd126fa (diff) |
Merge branch 'master' of github.com:grpc/grpc into travis-speedup
Conflicts:
.travis.yml
Diffstat (limited to 'src/core/surface/channel.c')
-rw-r--r-- | src/core/surface/channel.c | 150 |
1 files changed, 100 insertions, 50 deletions
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index e764a3b9af..78f9144c19 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -43,30 +43,46 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +typedef struct registered_call { + grpc_mdelem *path; + grpc_mdelem *authority; + struct registered_call *next; +} registered_call; + struct grpc_channel { int is_client; gpr_refcount refs; + gpr_uint32 max_message_length; grpc_mdctx *metadata_context; grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; + + gpr_mu registered_call_mu; + registered_call *registered_calls; }; -#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) -#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) (((grpc_channel *)(channel_stack)) - 1) +#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) +#define CHANNEL_FROM_CHANNEL_STACK(channel_stack) \ + (((grpc_channel *)(channel_stack)) - 1) #define CHANNEL_FROM_TOP_ELEM(top_elem) \ CHANNEL_FROM_CHANNEL_STACK(grpc_channel_stack_from_top_element(top_elem)) +/* the protobuf library will (by default) start warning at 100megs */ +#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) + grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t num_filters, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { + size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; - /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if is_client */ + /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if + * is_client */ gpr_ref_init(&channel->refs, 1 + is_client); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); @@ -75,59 +91,41 @@ grpc_channel *grpc_channel_create_from_filters( channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context, CHANNEL_STACK_FROM_CHANNEL(channel)); + gpr_mu_init(&channel->registered_call_mu); + channel->registered_calls = NULL; + + channel->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; + if (args) { + for (i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_MESSAGE_LENGTH)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else if (args->args[i].value.integer < 0) { + gpr_log(GPR_ERROR, "%s ignored: it must be >= 0", + GRPC_ARG_MAX_MESSAGE_LENGTH); + } else { + channel->max_message_length = args->args[i].value.integer; + } + } + } + } + return channel; } -static void do_nothing(void *ignored, grpc_op_error error) {} +static grpc_call *grpc_channel_create_call_internal( + grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, + grpc_mdelem *authority_mdelem, gpr_timespec deadline) { + grpc_mdelem *send_metadata[2]; -grpc_call *grpc_channel_create_call(grpc_channel *channel, - grpc_completion_queue *cq, - const char *method, const char *host, - gpr_timespec absolute_deadline) { - grpc_call *call; - grpc_mdelem *path_mdelem; - grpc_mdelem *authority_mdelem; - grpc_call_op op; - - if (!channel->is_client) { - gpr_log(GPR_ERROR, "Cannot create a call on the server."); - return NULL; - } - - call = grpc_call_create(channel, cq, NULL); + GPR_ASSERT(channel->is_client); - /* Add :path and :authority headers. */ - /* TODO(klempner): Consider optimizing this by stashing mdelems for common - values of method and host. */ - path_mdelem = grpc_mdelem_from_metadata_strings( - channel->metadata_context, grpc_mdstr_ref(channel->path_string), - grpc_mdstr_from_string(channel->metadata_context, method)); - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.metadata = path_mdelem; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - - grpc_mdstr_ref(channel->authority_string); - authority_mdelem = grpc_mdelem_from_metadata_strings( - channel->metadata_context, channel->authority_string, - grpc_mdstr_from_string(channel->metadata_context, host)); - op.data.metadata = authority_mdelem; - grpc_call_execute_op(call, &op); - - if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) { - op.type = GRPC_SEND_DEADLINE; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.deadline = absolute_deadline; - op.done_cb = do_nothing; - op.user_data = NULL; - grpc_call_execute_op(call, &op); - } + send_metadata[0] = path_mdelem; + send_metadata[1] = authority_mdelem; - return call; + return grpc_call_create(channel, cq, NULL, send_metadata, + GPR_ARRAY_SIZE(send_metadata), deadline); } grpc_call *grpc_channel_create_call_old(grpc_channel *channel, @@ -137,6 +135,46 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel, absolute_deadline); } +grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_completion_queue *cq, + const char *method, const char *host, + gpr_timespec deadline) { + return grpc_channel_create_call_internal( + channel, cq, + grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->path_string), + grpc_mdstr_from_string(channel->metadata_context, method)), + grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->authority_string), + grpc_mdstr_from_string(channel->metadata_context, host)), + deadline); +} + +void *grpc_channel_register_call(grpc_channel *channel, const char *method, + const char *host) { + registered_call *rc = gpr_malloc(sizeof(registered_call)); + rc->path = grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->path_string), + grpc_mdstr_from_string(channel->metadata_context, method)); + rc->authority = grpc_mdelem_from_metadata_strings( + channel->metadata_context, grpc_mdstr_ref(channel->authority_string), + grpc_mdstr_from_string(channel->metadata_context, host)); + gpr_mu_lock(&channel->registered_call_mu); + rc->next = channel->registered_calls; + channel->registered_calls = rc; + gpr_mu_unlock(&channel->registered_call_mu); + return rc; +} + +grpc_call *grpc_channel_create_registered_call( + grpc_channel *channel, grpc_completion_queue *completion_queue, + void *registered_call_handle, gpr_timespec deadline) { + registered_call *rc = registered_call_handle; + return grpc_channel_create_call_internal( + channel, completion_queue, grpc_mdelem_ref(rc->path), + grpc_mdelem_ref(rc->authority), deadline); +} + void grpc_channel_internal_ref(grpc_channel *channel) { gpr_ref(&channel->refs); } @@ -148,7 +186,15 @@ static void destroy_channel(void *p, int ok) { grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->authority_string); + while (channel->registered_calls) { + registered_call *rc = channel->registered_calls; + channel->registered_calls = rc->next; + grpc_mdelem_unref(rc->path); + grpc_mdelem_unref(rc->authority); + gpr_free(rc); + } grpc_mdctx_unref(channel->metadata_context); + gpr_mu_destroy(&channel->registered_call_mu); gpr_free(channel); } @@ -196,3 +242,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel) { return channel->grpc_message_string; } + +gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel) { + return channel->max_message_length; +} |