aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Julien Boeuf <jboeuf@google.com>2015-11-17 15:05:45 -0800
committerGravatar Julien Boeuf <jboeuf@google.com>2015-11-17 15:05:45 -0800
commit675b5ce861c3de2c741ee1dd71bf8cdd2662eac6 (patch)
treea63cbd38d47eea554ae5fef9ef931678689afe63 /src/core/surface
parentb0d1e3d95f9d7764a186a25db7e16f87be027c66 (diff)
parentab88da26bad1566d0a0f9a797ec429bd96ae30e2 (diff)
Merge branch 'master' of github.com:grpc/grpc into core_creds_api_change
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/byte_buffer_reader.c19
-rw-r--r--src/core/surface/channel_connectivity.c62
-rw-r--r--src/core/surface/init.c3
3 files changed, 64 insertions, 20 deletions
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c
index 283db83833..9f830df68c 100644
--- a/src/core/surface/byte_buffer_reader.c
+++ b/src/core/surface/byte_buffer_reader.c
@@ -31,6 +31,7 @@
*
*/
+#include <string.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/compression.h>
@@ -103,3 +104,21 @@ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
}
return 0;
}
+
+gpr_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) {
+ gpr_slice in_slice;
+ size_t bytes_read = 0;
+ const size_t input_size = grpc_byte_buffer_length(reader->buffer_out);
+ gpr_slice out_slice = gpr_slice_malloc(input_size);
+ gpr_uint8 *const outbuf = GPR_SLICE_START_PTR(out_slice); /* just an alias */
+
+ while (grpc_byte_buffer_reader_next(reader, &in_slice) != 0) {
+ const size_t slice_length = GPR_SLICE_LENGTH(in_slice);
+ memcpy(&(outbuf[bytes_read]), GPR_SLICE_START_PTR(in_slice), slice_length);
+ bytes_read += slice_length;
+ gpr_slice_unref(in_slice);
+ GPR_ASSERT(bytes_read <= input_size);
+ }
+ return out_slice;
+}
+
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 1a2aef64ef..df2774b527 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -37,6 +37,7 @@
#include <grpc/support/log.h>
#include "src/core/channel/client_channel.h"
+#include "src/core/channel/client_uchannel.h"
#include "src/core/iomgr/timer.h"
#include "src/core/surface/api_trace.h"
#include "src/core/surface/completion_queue.h"
@@ -51,18 +52,24 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
GRPC_API_TRACE(
"grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
(channel, try_to_connect));
- if (client_channel_elem->filter != &grpc_client_channel_filter) {
- gpr_log(GPR_ERROR,
- "grpc_channel_check_connectivity_state called on something that is "
- "not a client channel, but '%s'",
- client_channel_elem->filter->name);
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ state = grpc_client_channel_check_connectivity_state(
+ &exec_ctx, client_channel_elem, try_to_connect);
grpc_exec_ctx_finish(&exec_ctx);
- return GRPC_CHANNEL_FATAL_FAILURE;
+ return state;
}
- state = grpc_client_channel_check_connectivity_state(
- &exec_ctx, client_channel_elem, try_to_connect);
+ if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
+ state = grpc_client_uchannel_check_connectivity_state(
+ &exec_ctx, client_channel_elem, try_to_connect);
+ grpc_exec_ctx_finish(&exec_ctx);
+ return state;
+ }
+ gpr_log(GPR_ERROR,
+ "grpc_channel_check_connectivity_state called on something that is "
+ "not a (u)client channel, but '%s'",
+ client_channel_elem->filter->name);
grpc_exec_ctx_finish(&exec_ctx);
- return state;
+ return GRPC_CHANNEL_FATAL_FAILURE;
}
typedef enum {
@@ -87,7 +94,17 @@ typedef struct {
} state_watcher;
static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, "watch_connectivity");
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
+ "watch_channel_connectivity");
+ } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
+ "watch_uchannel_connectivity");
+ } else {
+ abort();
+ }
gpr_mu_destroy(&w->mu);
gpr_free(w);
}
@@ -125,8 +142,13 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
w->removed = 1;
client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(w->channel));
- grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem,
- grpc_cq_pollset(w->cq));
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem,
+ grpc_cq_pollset(w->cq));
+ } else {
+ grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem,
+ grpc_cq_pollset(w->cq));
+ }
}
gpr_mu_unlock(&w->mu);
if (due_to_completion) {
@@ -199,18 +221,18 @@ void grpc_channel_watch_connectivity_state(
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
- if (client_channel_elem->filter != &grpc_client_channel_filter) {
- gpr_log(GPR_ERROR,
- "grpc_channel_watch_connectivity_state called on something that is "
- "not a client channel, but '%s'",
- client_channel_elem->filter->name);
- grpc_exec_ctx_enqueue(&exec_ctx, &w->on_complete, 1);
- } else {
- GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem,
grpc_cq_pollset(cq));
grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
&w->state, &w->on_complete);
+ } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
+ GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity");
+ grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem,
+ grpc_cq_pollset(cq));
+ grpc_client_uchannel_watch_connectivity_state(
+ &exec_ctx, client_channel_elem, &w->state, &w->on_complete);
}
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 715c90a5e1..b2e66a830e 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -47,6 +47,7 @@
#include "src/core/client_config/resolvers/dns_resolver.h"
#include "src/core/client_config/resolvers/sockaddr_resolver.h"
#include "src/core/debug/trace.h"
+#include "src/core/iomgr/executor.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/profiling/timers.h"
#include "src/core/surface/api_trace.h"
@@ -108,6 +109,7 @@ void grpc_init(void) {
grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace);
grpc_security_pre_init();
grpc_iomgr_init();
+ grpc_executor_init();
grpc_tracer_init("GRPC_TRACE");
/* Only initialize census if noone else has. */
if (census_enabled() == CENSUS_FEATURE_NONE) {
@@ -132,6 +134,7 @@ void grpc_shutdown(void) {
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
grpc_iomgr_shutdown();
+ grpc_executor_shutdown();
census_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();