diff options
author | Julien Boeuf <jboeuf@google.com> | 2015-11-17 15:05:45 -0800 |
---|---|---|
committer | Julien Boeuf <jboeuf@google.com> | 2015-11-17 15:05:45 -0800 |
commit | 675b5ce861c3de2c741ee1dd71bf8cdd2662eac6 (patch) | |
tree | a63cbd38d47eea554ae5fef9ef931678689afe63 /src/core/surface | |
parent | b0d1e3d95f9d7764a186a25db7e16f87be027c66 (diff) | |
parent | ab88da26bad1566d0a0f9a797ec429bd96ae30e2 (diff) |
Merge branch 'master' of github.com:grpc/grpc into core_creds_api_change
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/byte_buffer_reader.c | 19 | ||||
-rw-r--r-- | src/core/surface/channel_connectivity.c | 62 | ||||
-rw-r--r-- | src/core/surface/init.c | 3 |
3 files changed, 64 insertions, 20 deletions
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c index 283db83833..9f830df68c 100644 --- a/src/core/surface/byte_buffer_reader.c +++ b/src/core/surface/byte_buffer_reader.c @@ -31,6 +31,7 @@ * */ +#include <string.h> #include <grpc/byte_buffer_reader.h> #include <grpc/compression.h> @@ -103,3 +104,21 @@ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, } return 0; } + +gpr_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) { + gpr_slice in_slice; + size_t bytes_read = 0; + const size_t input_size = grpc_byte_buffer_length(reader->buffer_out); + gpr_slice out_slice = gpr_slice_malloc(input_size); + gpr_uint8 *const outbuf = GPR_SLICE_START_PTR(out_slice); /* just an alias */ + + while (grpc_byte_buffer_reader_next(reader, &in_slice) != 0) { + const size_t slice_length = GPR_SLICE_LENGTH(in_slice); + memcpy(&(outbuf[bytes_read]), GPR_SLICE_START_PTR(in_slice), slice_length); + bytes_read += slice_length; + gpr_slice_unref(in_slice); + GPR_ASSERT(bytes_read <= input_size); + } + return out_slice; +} + diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 1a2aef64ef..df2774b527 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -37,6 +37,7 @@ #include <grpc/support/log.h> #include "src/core/channel/client_channel.h" +#include "src/core/channel/client_uchannel.h" #include "src/core/iomgr/timer.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/completion_queue.h" @@ -51,18 +52,24 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( GRPC_API_TRACE( "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2, (channel, try_to_connect)); - if (client_channel_elem->filter != &grpc_client_channel_filter) { - gpr_log(GPR_ERROR, - "grpc_channel_check_connectivity_state called on something that is " - "not a client channel, but '%s'", - client_channel_elem->filter->name); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + state = grpc_client_channel_check_connectivity_state( + &exec_ctx, client_channel_elem, try_to_connect); grpc_exec_ctx_finish(&exec_ctx); - return GRPC_CHANNEL_FATAL_FAILURE; + return state; } - state = grpc_client_channel_check_connectivity_state( - &exec_ctx, client_channel_elem, try_to_connect); + if (client_channel_elem->filter == &grpc_client_uchannel_filter) { + state = grpc_client_uchannel_check_connectivity_state( + &exec_ctx, client_channel_elem, try_to_connect); + grpc_exec_ctx_finish(&exec_ctx); + return state; + } + gpr_log(GPR_ERROR, + "grpc_channel_check_connectivity_state called on something that is " + "not a (u)client channel, but '%s'", + client_channel_elem->filter->name); grpc_exec_ctx_finish(&exec_ctx); - return state; + return GRPC_CHANNEL_FATAL_FAILURE; } typedef enum { @@ -87,7 +94,17 @@ typedef struct { } state_watcher; static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, "watch_connectivity"); + grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(w->channel)); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, + "watch_channel_connectivity"); + } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) { + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, + "watch_uchannel_connectivity"); + } else { + abort(); + } gpr_mu_destroy(&w->mu); gpr_free(w); } @@ -125,8 +142,13 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, w->removed = 1; client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(w->channel)); - grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem, - grpc_cq_pollset(w->cq)); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem, + grpc_cq_pollset(w->cq)); + } else { + grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem, + grpc_cq_pollset(w->cq)); + } } gpr_mu_unlock(&w->mu); if (due_to_completion) { @@ -199,18 +221,18 @@ void grpc_channel_watch_connectivity_state( gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC)); - if (client_channel_elem->filter != &grpc_client_channel_filter) { - gpr_log(GPR_ERROR, - "grpc_channel_watch_connectivity_state called on something that is " - "not a client channel, but '%s'", - client_channel_elem->filter->name); - grpc_exec_ctx_enqueue(&exec_ctx, &w->on_complete, 1); - } else { - GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity"); + if (client_channel_elem->filter == &grpc_client_channel_filter) { + GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity"); grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem, grpc_cq_pollset(cq)); grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem, &w->state, &w->on_complete); + } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) { + GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity"); + grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem, + grpc_cq_pollset(cq)); + grpc_client_uchannel_watch_connectivity_state( + &exec_ctx, client_channel_elem, &w->state, &w->on_complete); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 715c90a5e1..b2e66a830e 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -47,6 +47,7 @@ #include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/client_config/resolvers/sockaddr_resolver.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr.h" #include "src/core/profiling/timers.h" #include "src/core/surface/api_trace.h" @@ -108,6 +109,7 @@ void grpc_init(void) { grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace); grpc_security_pre_init(); grpc_iomgr_init(); + grpc_executor_init(); grpc_tracer_init("GRPC_TRACE"); /* Only initialize census if noone else has. */ if (census_enabled() == CENSUS_FEATURE_NONE) { @@ -132,6 +134,7 @@ void grpc_shutdown(void) { gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { grpc_iomgr_shutdown(); + grpc_executor_shutdown(); census_shutdown(); gpr_timers_global_destroy(); grpc_tracer_shutdown(); |