aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Julien Boeuf <jboeuf@google.com>2015-11-17 15:05:45 -0800
committerGravatar Julien Boeuf <jboeuf@google.com>2015-11-17 15:05:45 -0800
commit675b5ce861c3de2c741ee1dd71bf8cdd2662eac6 (patch)
treea63cbd38d47eea554ae5fef9ef931678689afe63 /src/core
parentb0d1e3d95f9d7764a186a25db7e16f87be027c66 (diff)
parentab88da26bad1566d0a0f9a797ec429bd96ae30e2 (diff)
Merge branch 'master' of github.com:grpc/grpc into core_creds_api_change
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_uchannel.c572
-rw-r--r--src/core/channel/client_uchannel.h70
-rw-r--r--src/core/client_config/subchannel.c47
-rw-r--r--src/core/client_config/subchannel.h11
-rw-r--r--src/core/iomgr/closure.c10
-rw-r--r--src/core/iomgr/closure.h9
-rw-r--r--src/core/iomgr/executor.c148
-rw-r--r--src/core/iomgr/executor.h53
-rw-r--r--src/core/iomgr/pollset_posix.c5
-rw-r--r--src/core/iomgr/pollset_posix.h1
-rw-r--r--src/core/iomgr/resolve_address_posix.c22
-rw-r--r--src/core/iomgr/resolve_address_windows.c26
-rw-r--r--src/core/support/histogram.c2
-rw-r--r--src/core/support/time_win32.c4
-rw-r--r--src/core/surface/byte_buffer_reader.c19
-rw-r--r--src/core/surface/channel_connectivity.c62
-rw-r--r--src/core/surface/init.c3
17 files changed, 997 insertions, 67 deletions
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c
new file mode 100644
index 0000000000..510677a844
--- /dev/null
+++ b/src/core/channel/client_uchannel.c
@@ -0,0 +1,572 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/channel/client_uchannel.h"
+
+#include <string.h>
+
+#include "src/core/census/grpc_filter.h"
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/client_channel.h"
+#include "src/core/channel/compress_filter.h"
+#include "src/core/iomgr/iomgr.h"
+#include "src/core/support/string.h"
+#include "src/core/surface/channel.h"
+#include "src/core/transport/connectivity_state.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+/** Microchannel (uchannel) implementation: a lightweight channel without any
+ * load-balancing mechanisms meant for communication from within the core. */
+
+typedef struct call_data call_data;
+
+typedef struct client_uchannel_channel_data {
+ /** metadata context for this channel */
+ grpc_mdctx *mdctx;
+
+ /** master channel - the grpc_channel instance that ultimately owns
+ this channel_data via its channel stack.
+ We occasionally use this to bump the refcount on the master channel
+ to keep ourselves alive through an asynchronous operation. */
+ grpc_channel *master;
+
+ /** connectivity state being tracked */
+ grpc_connectivity_state_tracker state_tracker;
+
+ /** the subchannel wrapped by the microchannel */
+ grpc_subchannel *subchannel;
+
+ /** the callback used to stay subscribed to subchannel connectivity
+ * notifications */
+ grpc_closure connectivity_cb;
+
+ /** the current connectivity state of the wrapped subchannel */
+ grpc_connectivity_state subchannel_connectivity;
+
+ gpr_mu mu_state;
+} channel_data;
+
+typedef enum {
+ CALL_CREATED,
+ CALL_WAITING_FOR_SEND,
+ CALL_WAITING_FOR_CALL,
+ CALL_ACTIVE,
+ CALL_CANCELLED
+} call_state;
+
+struct call_data {
+ /* owning element */
+ grpc_call_element *elem;
+
+ gpr_mu mu_state;
+
+ call_state state;
+ gpr_timespec deadline;
+ grpc_closure async_setup_task;
+ grpc_transport_stream_op waiting_op;
+ /* our child call stack */
+ grpc_subchannel_call *subchannel_call;
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
+};
+
+static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
+ grpc_transport_stream_op *new_op)
+ GRPC_MUST_USE_RESULT;
+
+static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ if (op->send_ops) {
+ grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
+ op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
+ }
+ if (op->recv_ops) {
+ char status[GPR_LTOA_MIN_BUFSIZE];
+ grpc_metadata_batch mdb;
+ gpr_ltoa(GRPC_STATUS_CANCELLED, status);
+ calld->status.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
+ calld->details.md =
+ grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
+ calld->status.prev = calld->details.next = NULL;
+ calld->status.next = &calld->details;
+ calld->details.prev = &calld->status;
+ mdb.list.head = &calld->status;
+ mdb.list.tail = &calld->details;
+ mdb.garbage.head = mdb.garbage.tail = NULL;
+ mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ grpc_sopb_add_metadata(op->recv_ops, mdb);
+ *op->recv_state = GRPC_STREAM_CLOSED;
+ op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
+ }
+ if (op->on_consumed) {
+ op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
+ }
+}
+
+typedef struct {
+ grpc_closure closure;
+ grpc_call_element *elem;
+} waiting_call;
+
+static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op,
+ int continuation);
+
+static int is_empty(void *p, int len) {
+ char *ptr = p;
+ int i;
+ for (i = 0; i < len; i++) {
+ if (ptr[i] != 0) return 0;
+ }
+ return 1;
+}
+
+static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ channel_data *chand = arg;
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
+ chand->subchannel_connectivity,
+ "uchannel_monitor_subchannel");
+ grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
+ &chand->subchannel_connectivity,
+ &chand->connectivity_cb);
+}
+
+static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ call_data *calld = arg;
+ grpc_transport_stream_op op;
+ int have_waiting;
+
+ if (calld->state == CALL_CANCELLED && iomgr_success == 0) {
+ have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
+ }
+ } else if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
+ } else if (calld->state == CALL_WAITING_FOR_CALL) {
+ have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
+ if (calld->subchannel_call != NULL) {
+ calld->state = CALL_ACTIVE;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
+ &calld->waiting_op);
+ }
+ } else {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
+ }
+ }
+ } else {
+ GPR_ASSERT(calld->state == CALL_CANCELLED);
+ gpr_mu_unlock(&calld->mu_state);
+ have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
+ if (have_waiting) {
+ handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
+ }
+ }
+}
+
+static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ call_data *calld = arg;
+ gpr_mu_lock(&calld->mu_state);
+ started_call_locked(exec_ctx, arg, iomgr_success);
+}
+
+static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
+ grpc_transport_stream_op *new_op) {
+ call_data *calld = elem->call_data;
+ grpc_closure *consumed_op = NULL;
+ grpc_transport_stream_op *waiting_op = &calld->waiting_op;
+ GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
+ GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
+ if (new_op->send_ops != NULL) {
+ waiting_op->send_ops = new_op->send_ops;
+ waiting_op->is_last_send = new_op->is_last_send;
+ waiting_op->on_done_send = new_op->on_done_send;
+ }
+ if (new_op->recv_ops != NULL) {
+ waiting_op->recv_ops = new_op->recv_ops;
+ waiting_op->recv_state = new_op->recv_state;
+ waiting_op->on_done_recv = new_op->on_done_recv;
+ }
+ if (new_op->on_consumed != NULL) {
+ if (waiting_op->on_consumed != NULL) {
+ consumed_op = waiting_op->on_consumed;
+ }
+ waiting_op->on_consumed = new_op->on_consumed;
+ }
+ if (new_op->cancel_with_status != GRPC_STATUS_OK) {
+ waiting_op->cancel_with_status = new_op->cancel_with_status;
+ }
+ return consumed_op;
+}
+
+static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_subchannel_call *subchannel_call;
+ char *result;
+
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_ACTIVE) {
+ subchannel_call = calld->subchannel_call;
+ GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
+ gpr_mu_unlock(&calld->mu_state);
+ result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
+ return result;
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
+ return grpc_channel_get_target(chand->master);
+ }
+}
+
+static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op,
+ int continuation) {
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
+ grpc_subchannel_call *subchannel_call;
+ grpc_transport_stream_op op2;
+ GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+
+ gpr_mu_lock(&calld->mu_state);
+ /* make sure the wrapped subchannel has been set (see
+ * grpc_client_uchannel_set_subchannel) */
+ GPR_ASSERT(chand->subchannel != NULL);
+
+ switch (calld->state) {
+ case CALL_ACTIVE:
+ GPR_ASSERT(!continuation);
+ subchannel_call = calld->subchannel_call;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
+ break;
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(exec_ctx, elem, op);
+ break;
+ case CALL_WAITING_FOR_SEND:
+ GPR_ASSERT(!continuation);
+ grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
+ if (!calld->waiting_op.send_ops &&
+ calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
+ gpr_mu_unlock(&calld->mu_state);
+ break;
+ }
+ *op = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ continuation = 1;
+ /* fall through */
+ case CALL_WAITING_FOR_CALL:
+ if (!continuation) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ op2 = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ if (op->on_consumed) {
+ calld->waiting_op.on_consumed = op->on_consumed;
+ op->on_consumed = NULL;
+ } else if (op2.on_consumed) {
+ calld->waiting_op.on_consumed = op2.on_consumed;
+ op2.on_consumed = NULL;
+ }
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(exec_ctx, elem, op);
+ handle_op_after_cancellation(exec_ctx, elem, &op2);
+ grpc_subchannel_cancel_waiting_call(exec_ctx, chand->subchannel, 1);
+ } else {
+ grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
+ gpr_mu_unlock(&calld->mu_state);
+ }
+ break;
+ }
+ /* fall through */
+ case CALL_CREATED:
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(exec_ctx, elem, op);
+ } else {
+ calld->waiting_op = *op;
+ if (op->send_ops == NULL) {
+ calld->state = CALL_WAITING_FOR_SEND;
+ gpr_mu_unlock(&calld->mu_state);
+ } else {
+ grpc_subchannel_call_create_status call_creation_status;
+ grpc_pollset *pollset = calld->waiting_op.bind_pollset;
+ calld->state = CALL_WAITING_FOR_CALL;
+ grpc_closure_init(&calld->async_setup_task, started_call, calld);
+ call_creation_status = grpc_subchannel_create_call(
+ exec_ctx, chand->subchannel, pollset, &calld->subchannel_call,
+ &calld->async_setup_task);
+ if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
+ started_call_locked(exec_ctx, calld, 1);
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
+ }
+ }
+ }
+ break;
+ }
+}
+
+static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ perform_transport_stream_op(exec_ctx, elem, op, 0);
+}
+
+static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_transport_op *op) {
+ channel_data *chand = elem->channel_data;
+
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
+
+ GPR_ASSERT(op->set_accept_stream == NULL);
+ GPR_ASSERT(op->bind_pollset == NULL);
+
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &chand->state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change);
+ op->on_connectivity_state_change = NULL;
+ op->connectivity_state = NULL;
+ }
+
+ if (op->disconnect) {
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
+ GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ }
+}
+
+/* Constructor for call_data */
+static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const void *server_transport_data,
+ grpc_transport_stream_op *initial_op) {
+ call_data *calld = elem->call_data;
+ memset(calld, 0, sizeof(call_data));
+
+ /* TODO(ctiller): is there something useful we can do here? */
+ GPR_ASSERT(initial_op == NULL);
+
+ GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
+ GPR_ASSERT(server_transport_data == NULL);
+ gpr_mu_init(&calld->mu_state);
+ calld->elem = elem;
+ calld->state = CALL_CREATED;
+ calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+}
+
+/* Destructor for call_data */
+static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
+ call_data *calld = elem->call_data;
+ grpc_subchannel_call *subchannel_call;
+
+ /* if the call got activated, we need to destroy the child stack also, and
+ remove it from the in-flight requests tracked by the child_entry we
+ picked */
+ gpr_mu_lock(&calld->mu_state);
+ switch (calld->state) {
+ case CALL_ACTIVE:
+ subchannel_call = calld->subchannel_call;
+ gpr_mu_unlock(&calld->mu_state);
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_uchannel");
+ break;
+ case CALL_CREATED:
+ case CALL_CANCELLED:
+ gpr_mu_unlock(&calld->mu_state);
+ break;
+ case CALL_WAITING_FOR_CALL:
+ case CALL_WAITING_FOR_SEND:
+ GPR_UNREACHABLE_CODE(return );
+ }
+}
+
+/* Constructor for channel_data */
+static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel *master,
+ const grpc_channel_args *args,
+ grpc_mdctx *metadata_context, int is_first,
+ int is_last) {
+ channel_data *chand = elem->channel_data;
+ memset(chand, 0, sizeof(*chand));
+ grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand);
+ GPR_ASSERT(is_last);
+ GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
+ chand->mdctx = metadata_context;
+ chand->master = master;
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
+ "client_uchannel");
+ gpr_mu_init(&chand->mu_state);
+}
+
+/* Destructor for channel_data */
+static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel,
+ &chand->connectivity_cb);
+ grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
+ gpr_mu_destroy(&chand->mu_state);
+}
+
+const grpc_channel_filter grpc_client_uchannel_filter = {
+ cuc_start_transport_stream_op,
+ cuc_start_transport_op,
+ sizeof(call_data),
+ cuc_init_call_elem,
+ cuc_destroy_call_elem,
+ sizeof(channel_data),
+ cuc_init_channel_elem,
+ cuc_destroy_channel_elem,
+ cuc_get_peer,
+ "client-uchannel",
+};
+
+grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
+ channel_data *chand = elem->channel_data;
+ grpc_connectivity_state out;
+ out = grpc_connectivity_state_check(&chand->state_tracker);
+ gpr_mu_lock(&chand->mu_state);
+ if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
+ GRPC_CHANNEL_CONNECTING,
+ "uchannel_connecting_changed");
+ chand->subchannel_connectivity = out;
+ grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
+ &chand->subchannel_connectivity,
+ &chand->connectivity_cb);
+ }
+ gpr_mu_unlock(&chand->mu_state);
+ return out;
+}
+
+void grpc_client_uchannel_watch_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_connectivity_state *state, grpc_closure *on_complete) {
+ channel_data *chand = elem->channel_data;
+ gpr_mu_lock(&chand->mu_state);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &chand->state_tracker, state, on_complete);
+ gpr_mu_unlock(&chand->mu_state);
+}
+
+grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ grpc_channel_element *parent_elem;
+ gpr_mu_lock(&chand->mu_state);
+ parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(
+ grpc_subchannel_get_master(chand->subchannel)));
+ gpr_mu_unlock(&chand->mu_state);
+ return grpc_client_channel_get_connecting_pollset_set(parent_elem);
+}
+
+void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_pollset *pollset) {
+ grpc_pollset_set *master_pollset_set =
+ grpc_client_uchannel_get_connecting_pollset_set(elem);
+ grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset);
+}
+
+void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_pollset *pollset) {
+ grpc_pollset_set *master_pollset_set =
+ grpc_client_uchannel_get_connecting_pollset_set(elem);
+ grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset);
+}
+
+grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
+ grpc_channel_args *args) {
+ grpc_channel *channel = NULL;
+#define MAX_FILTERS 3
+ const grpc_channel_filter *filters[MAX_FILTERS];
+ grpc_mdctx *mdctx = grpc_subchannel_get_mdctx(subchannel);
+ grpc_channel *master = grpc_subchannel_get_master(subchannel);
+ char *target = grpc_channel_get_target(master);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ size_t n = 0;
+
+ grpc_mdctx_ref(mdctx);
+ if (grpc_channel_args_is_census_enabled(args)) {
+ filters[n++] = &grpc_client_census_filter;
+ }
+ filters[n++] = &grpc_compress_filter;
+ filters[n++] = &grpc_client_uchannel_filter;
+ GPR_ASSERT(n <= MAX_FILTERS);
+
+ channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, n,
+ args, mdctx, 1);
+
+ gpr_free(target);
+ return channel;
+}
+
+void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
+ grpc_subchannel *subchannel) {
+ grpc_channel_element *elem =
+ grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel));
+ channel_data *chand = elem->channel_data;
+ GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
+ gpr_mu_lock(&chand->mu_state);
+ chand->subchannel = subchannel;
+ gpr_mu_unlock(&chand->mu_state);
+}
diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h
new file mode 100644
index 0000000000..dfe6695ae3
--- /dev/null
+++ b/src/core/channel/client_uchannel.h
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H
+#define GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H
+
+#include "src/core/channel/channel_stack.h"
+#include "src/core/client_config/resolver.h"
+
+#define GRPC_MICROCHANNEL_SUBCHANNEL_ARG "grpc.microchannel_subchannel_key"
+
+/* A client microchannel (aka uchannel) is a channel wrapping a subchannel, for
+ * the purposes of lightweight RPC communications from within the core.*/
+
+extern const grpc_channel_filter grpc_client_uchannel_filter;
+
+grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
+
+void grpc_client_uchannel_watch_connectivity_state(
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_connectivity_state *state, grpc_closure *on_complete);
+
+grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
+ grpc_channel_element *elem);
+
+void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *channel,
+ grpc_pollset *pollset);
+void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *channel,
+ grpc_pollset *pollset);
+
+grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
+ grpc_channel_args *args);
+
+void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
+ grpc_subchannel *subchannel);
+
+#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H */
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 095000ba4f..0401dd3868 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -312,6 +312,29 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c;
}
+void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *subchannel,
+ int iomgr_success) {
+ waiting_for_connect *w4c;
+ gpr_mu_lock(&subchannel->mu);
+ w4c = subchannel->waiting;
+ subchannel->waiting = NULL;
+ gpr_mu_unlock(&subchannel->mu);
+ while (w4c != NULL) {
+ waiting_for_connect *next = w4c->next;
+ grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
+ w4c->pollset);
+ if (w4c->notify) {
+ w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
+ }
+
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
+ gpr_free(w4c);
+
+ w4c = next;
+ }
+}
+
static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connect_in_args args;
@@ -659,24 +682,12 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
iomgr_success = 0;
}
connectivity_state_changed_locked(exec_ctx, c, "alarm");
+ gpr_mu_unlock(&c->mu);
if (iomgr_success) {
- gpr_mu_unlock(&c->mu);
update_reconnect_parameters(c);
continue_connect(exec_ctx, c);
} else {
- waiting_for_connect *w4c;
- w4c = c->waiting;
- c->waiting = NULL;
- gpr_mu_unlock(&c->mu);
- while (w4c != NULL) {
- waiting_for_connect *next = w4c->next;
- grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
- w4c->pollset);
- w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, 0);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
- gpr_free(w4c);
- w4c = next;
- }
+ grpc_subchannel_cancel_waiting_call(exec_ctx, c, iomgr_success);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
}
@@ -784,3 +795,11 @@ static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk);
return call;
}
+
+grpc_mdctx *grpc_subchannel_get_mdctx(grpc_subchannel *subchannel) {
+ return subchannel->mdctx;
+}
+
+grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) {
+ return subchannel->master;
+}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index a26d08f02e..ec1cc7cc69 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -92,6 +92,11 @@ grpc_subchannel_call_create_status grpc_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset,
grpc_subchannel_call **target, grpc_closure *notify);
+/** cancel \a call in the waiting state. */
+void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *subchannel,
+ int iomgr_success);
+
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel,
@@ -154,4 +159,10 @@ struct grpc_subchannel_args {
grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_subchannel_args *args);
+/** Return the metadata context associated with the subchannel */
+grpc_mdctx *grpc_subchannel_get_mdctx(grpc_subchannel *subchannel);
+
+/** Return the master channel associated with the subchannel */
+grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel);
+
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */
diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c
index d91681990f..b4f1817de4 100644
--- a/src/core/iomgr/closure.c
+++ b/src/core/iomgr/closure.c
@@ -72,6 +72,16 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) {
src->head = src->tail = NULL;
}
+grpc_closure *grpc_closure_list_pop(grpc_closure_list *list) {
+ grpc_closure *head;
+ if (list->head == NULL) {
+ return NULL;
+ }
+ head = list->head;
+ list->head = list->head->next;
+ return head;
+}
+
typedef struct {
grpc_iomgr_cb_func cb;
void *cb_arg;
diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h
index d812659af0..7a9f7ccad0 100644
--- a/src/core/iomgr/closure.h
+++ b/src/core/iomgr/closure.h
@@ -83,9 +83,18 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
+/** add \a closure to the end of \a list and set \a closure's success to \a
+ * success */
void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
int success);
+
+/** append all closures from \a src to \a dst and empty \a src. */
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
+
+/** pop (return and remove) the head closure from \a list. */
+grpc_closure *grpc_closure_list_pop(grpc_closure_list *list);
+
+/** return whether \a list is empty. */
int grpc_closure_list_empty(grpc_closure_list list);
#endif /* GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H */
diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c
new file mode 100644
index 0000000000..457e5cdbac
--- /dev/null
+++ b/src/core/iomgr/executor.c
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/executor.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include "src/core/iomgr/exec_ctx.h"
+
+typedef struct grpc_executor_data {
+ int busy; /**< is the thread currently running? */
+ int shutting_down; /**< has \a grpc_shutdown() been invoked? */
+ int pending_join; /**< has the thread finished but not been joined? */
+ grpc_closure_list closures; /**< collection of pending work */
+ gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a
+ pending_join are true */
+ gpr_thd_options options;
+ gpr_mu mu;
+} grpc_executor;
+
+static grpc_executor g_executor;
+
+void grpc_executor_init() {
+ memset(&g_executor, 0, sizeof(grpc_executor));
+ gpr_mu_init(&g_executor.mu);
+ g_executor.options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&g_executor.options);
+}
+
+/* thread body */
+static void closure_exec_thread_func(void *ignored) {
+ grpc_closure *closure;
+
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ while (1) {
+ gpr_mu_lock(&g_executor.mu);
+ if (g_executor.shutting_down != 0) {
+ gpr_mu_unlock(&g_executor.mu);
+ break;
+ }
+ closure = grpc_closure_list_pop(&g_executor.closures);
+ if (closure == NULL) {
+ /* no more work, time to die */
+ GPR_ASSERT(g_executor.busy == 1);
+ g_executor.busy = 0;
+ gpr_mu_unlock(&g_executor.mu);
+ break;
+ }
+ gpr_mu_unlock(&g_executor.mu);
+ closure->cb(&exec_ctx, closure->cb_arg, closure->success);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+/* Spawn the thread if new work has arrived a no thread is up */
+static void maybe_spawn_locked() {
+ if (grpc_closure_list_empty(g_executor.closures) == 1) {
+ return;
+ }
+ if (g_executor.shutting_down == 1) {
+ return;
+ }
+
+ if (g_executor.busy != 0) {
+ /* Thread still working. New work will be picked up by already running
+ * thread. Not spawning anything. */
+ return;
+ } else if (g_executor.pending_join != 0) {
+ /* Pickup the remains of the previous incarnations of the thread. */
+ gpr_thd_join(g_executor.tid);
+ g_executor.pending_join = 0;
+ }
+
+ /* All previous instances of the thread should have been joined at this point.
+ * Spawn time! */
+ g_executor.busy = 1;
+ gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL,
+ &g_executor.options);
+ g_executor.pending_join = 1;
+}
+
+void grpc_executor_enqueue(grpc_closure *closure, int success) {
+ gpr_mu_lock(&g_executor.mu);
+ if (g_executor.shutting_down == 0) {
+ grpc_closure_list_add(&g_executor.closures, closure, success);
+ maybe_spawn_locked();
+ }
+ gpr_mu_unlock(&g_executor.mu);
+}
+
+void grpc_executor_shutdown() {
+ int pending_join;
+ grpc_closure *closure;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+ gpr_mu_lock(&g_executor.mu);
+ pending_join = g_executor.pending_join;
+ g_executor.shutting_down = 1;
+ gpr_mu_unlock(&g_executor.mu);
+ /* we can release the lock at this point despite the access to the closure
+ * list below because we aren't accepting new work */
+
+ /* Execute pending callbacks, some may be performing cleanups */
+ while ((closure = grpc_closure_list_pop(&g_executor.closures)) != NULL) {
+ closure->cb(&exec_ctx, closure->cb_arg, closure->success);
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
+ if (pending_join) {
+ gpr_thd_join(g_executor.tid);
+ }
+ gpr_mu_destroy(&g_executor.mu);
+}
diff --git a/src/core/iomgr/executor.h b/src/core/iomgr/executor.h
new file mode 100644
index 0000000000..6da446ae9c
--- /dev/null
+++ b/src/core/iomgr/executor.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H
+#define GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H
+
+#include "src/core/iomgr/closure.h"
+
+/** Initialize the global executor.
+ *
+ * This mechanism is meant to outsource work (grpc_closure instances) to a
+ * thread, for those cases where blocking isn't an option but there isn't a
+ * non-blocking solution available. */
+void grpc_executor_init();
+
+/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate
+ * thread */
+void grpc_executor_enqueue(grpc_closure *closure, int success);
+
+/** Shutdown the executor, running all pending work as part of the call */
+void grpc_executor_shutdown();
+
+#endif /* GRPC_INTERNAL_CORE_IOMGR_EXECUTOR_H */
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index bce1ce9714..6f478ccacb 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -121,12 +121,14 @@ void grpc_pollset_kick_ext(grpc_pollset *p,
if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
specific_worker->reevaluate_polling_on_wakeup = 1;
}
+ specific_worker->kicked_specifically = 1;
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
} else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
GPR_TIMER_MARK("kick_yoself", 0);
if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
specific_worker->reevaluate_polling_on_wakeup = 1;
}
+ specific_worker->kicked_specifically = 1;
grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
}
} else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
@@ -242,6 +244,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
worker->reevaluate_polling_on_wakeup = 0;
+ worker->kicked_specifically = 0;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
/* If there's work waiting for the pollset to be idle, and the
@@ -308,7 +311,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (worker->reevaluate_polling_on_wakeup) {
worker->reevaluate_polling_on_wakeup = 0;
pollset->kicked_without_pollers = 0;
- if (queued_work) {
+ if (queued_work || worker->kicked_specifically) {
/* If there's queued work on the list, then set the deadline to be
immediate so we get back out of the polling loop quickly */
deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 34f76db2af..95ebeab1c2 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -51,6 +51,7 @@ struct grpc_fd;
typedef struct grpc_pollset_worker {
grpc_wakeup_fd wakeup_fd;
int reevaluate_polling_on_wakeup;
+ int kicked_specifically;
struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev;
} grpc_pollset_worker;
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index ed0a93fcc9..555c74ce7e 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -41,6 +41,7 @@
#include <sys/un.h>
#include <string.h>
+#include "src/core/iomgr/executor.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/support/block_annotate.h"
@@ -57,8 +58,8 @@ typedef struct {
char *name;
char *default_port;
grpc_resolve_cb cb;
+ grpc_closure request_closure;
void *arg;
- grpc_iomgr_object iomgr_object;
} request;
grpc_resolved_addresses *grpc_blocking_resolve_address(
@@ -149,20 +150,18 @@ done:
return addrs;
}
-/* Thread function to asynch-ify grpc_blocking_resolve_address */
-static void do_request_thread(void *rp) {
+/* Callback to be passed to grpc_executor to asynch-ify
+ * grpc_blocking_resolve_address */
+static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) {
request *r = rp;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_addresses *resolved =
grpc_blocking_resolve_address(r->name, r->default_port);
void *arg = r->arg;
grpc_resolve_cb cb = r->cb;
gpr_free(r->name);
gpr_free(r->default_port);
- cb(&exec_ctx, arg, resolved);
- grpc_iomgr_unregister_object(&r->iomgr_object);
+ cb(exec_ctx, arg, resolved);
gpr_free(r);
- grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -173,17 +172,12 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
- gpr_thd_id id;
- char *tmp;
- gpr_asprintf(&tmp, "resolve_address:name='%s':default_port='%s'", name,
- default_port);
- grpc_iomgr_register_object(&r->iomgr_object, tmp);
- gpr_free(tmp);
+ grpc_closure_init(&r->request_closure, do_request_thread, r);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
r->arg = arg;
- gpr_thd_new(&id, do_request_thread, r, NULL);
+ grpc_executor_enqueue(&r->request_closure, 1);
}
#endif
diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c
index 82a5602996..007c855d10 100644
--- a/src/core/iomgr/resolve_address_windows.c
+++ b/src/core/iomgr/resolve_address_windows.c
@@ -40,6 +40,7 @@
#include <sys/types.h>
#include <string.h>
+#include "src/core/iomgr/executor.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/support/block_annotate.h"
@@ -47,6 +48,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
+#include <grpc/support/log_win32.h>
#include <grpc/support/string_util.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
@@ -55,8 +57,8 @@ typedef struct {
char *name;
char *default_port;
grpc_resolve_cb cb;
+ grpc_closure request_closure;
void *arg;
- grpc_iomgr_object iomgr_object;
} request;
grpc_resolved_addresses *grpc_blocking_resolve_address(
@@ -93,7 +95,9 @@ grpc_resolved_addresses *grpc_blocking_resolve_address(
s = getaddrinfo(host, port, &hints, &result);
GRPC_SCHEDULING_END_BLOCKING_REGION;
if (s != 0) {
- gpr_log(GPR_ERROR, "getaddrinfo: %s", gai_strerror(s));
+ char *error_message = gpr_format_message(s);
+ gpr_log(GPR_ERROR, "getaddrinfo: %s", error_message);
+ gpr_free(error_message);
goto done;
}
@@ -129,9 +133,9 @@ done:
return addrs;
}
-/* Thread function to asynch-ify grpc_blocking_resolve_address */
-static void do_request(void *rp) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+/* Callback to be passed to grpc_executor to asynch-ify
+ * grpc_blocking_resolve_address */
+static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) {
request *r = rp;
grpc_resolved_addresses *resolved =
grpc_blocking_resolve_address(r->name, r->default_port);
@@ -139,10 +143,8 @@ static void do_request(void *rp) {
grpc_resolve_cb cb = r->cb;
gpr_free(r->name);
gpr_free(r->default_port);
- grpc_iomgr_unregister_object(&r->iomgr_object);
+ cb(exec_ctx, arg, resolved);
gpr_free(r);
- cb(&exec_ctx, arg, resolved);
- grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -153,16 +155,12 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
- gpr_thd_id id;
- char *label;
- gpr_asprintf(&label, "resolve:%s", name);
- grpc_iomgr_register_object(&r->iomgr_object, label);
- gpr_free(label);
+ grpc_closure_init(&r->request_closure, do_request_thread, r);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
r->arg = arg;
- gpr_thd_new(&id, do_request, r, NULL);
+ grpc_executor_enqueue(&r->request_closure, 1);
}
#endif
diff --git a/src/core/support/histogram.c b/src/core/support/histogram.c
index 8a1a9d9233..77b48af996 100644
--- a/src/core/support/histogram.c
+++ b/src/core/support/histogram.c
@@ -125,7 +125,7 @@ void gpr_histogram_add(gpr_histogram *h, double x) {
h->buckets[bucket_for(h, x)]++;
}
-int gpr_histogram_merge(gpr_histogram *dst, gpr_histogram *src) {
+int gpr_histogram_merge(gpr_histogram *dst, const gpr_histogram *src) {
if ((dst->num_buckets != src->num_buckets) ||
(dst->multiplier != src->multiplier)) {
/* Fail because these histograms don't match */
diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c
index bc0586d069..623a8d9233 100644
--- a/src/core/support/time_win32.c
+++ b/src/core/support/time_win32.c
@@ -66,14 +66,12 @@ gpr_timespec gpr_now(gpr_clock_type clock) {
now_tv.tv_nsec = now_tb.millitm * 1000000;
break;
case GPR_CLOCK_MONOTONIC:
+ case GPR_CLOCK_PRECISE:
QueryPerformanceCounter(&timestamp);
now_dbl = (timestamp.QuadPart - g_start_time.QuadPart) * g_time_scale;
now_tv.tv_sec = (time_t)now_dbl;
now_tv.tv_nsec = (int)((now_dbl - (double)now_tv.tv_sec) * 1e9);
break;
- case GPR_CLOCK_PRECISE:
- gpr_precise_clock_now(&now_tv);
- break;
}
return now_tv;
}
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c
index 283db83833..9f830df68c 100644
--- a/src/core/surface/byte_buffer_reader.c
+++ b/src/core/surface/byte_buffer_reader.c
@@ -31,6 +31,7 @@
*
*/
+#include <string.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/compression.h>
@@ -103,3 +104,21 @@ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
}
return 0;
}
+
+gpr_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) {
+ gpr_slice in_slice;
+ size_t bytes_read = 0;
+ const size_t input_size = grpc_byte_buffer_length(reader->buffer_out);
+ gpr_slice out_slice = gpr_slice_malloc(input_size);
+ gpr_uint8 *const outbuf = GPR_SLICE_START_PTR(out_slice); /* just an alias */
+
+ while (grpc_byte_buffer_reader_next(reader, &in_slice) != 0) {
+ const size_t slice_length = GPR_SLICE_LENGTH(in_slice);
+ memcpy(&(outbuf[bytes_read]), GPR_SLICE_START_PTR(in_slice), slice_length);
+ bytes_read += slice_length;
+ gpr_slice_unref(in_slice);
+ GPR_ASSERT(bytes_read <= input_size);
+ }
+ return out_slice;
+}
+
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 1a2aef64ef..df2774b527 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -37,6 +37,7 @@
#include <grpc/support/log.h>
#include "src/core/channel/client_channel.h"
+#include "src/core/channel/client_uchannel.h"
#include "src/core/iomgr/timer.h"
#include "src/core/surface/api_trace.h"
#include "src/core/surface/completion_queue.h"
@@ -51,18 +52,24 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
GRPC_API_TRACE(
"grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
(channel, try_to_connect));
- if (client_channel_elem->filter != &grpc_client_channel_filter) {
- gpr_log(GPR_ERROR,
- "grpc_channel_check_connectivity_state called on something that is "
- "not a client channel, but '%s'",
- client_channel_elem->filter->name);
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ state = grpc_client_channel_check_connectivity_state(
+ &exec_ctx, client_channel_elem, try_to_connect);
grpc_exec_ctx_finish(&exec_ctx);
- return GRPC_CHANNEL_FATAL_FAILURE;
+ return state;
}
- state = grpc_client_channel_check_connectivity_state(
- &exec_ctx, client_channel_elem, try_to_connect);
+ if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
+ state = grpc_client_uchannel_check_connectivity_state(
+ &exec_ctx, client_channel_elem, try_to_connect);
+ grpc_exec_ctx_finish(&exec_ctx);
+ return state;
+ }
+ gpr_log(GPR_ERROR,
+ "grpc_channel_check_connectivity_state called on something that is "
+ "not a (u)client channel, but '%s'",
+ client_channel_elem->filter->name);
grpc_exec_ctx_finish(&exec_ctx);
- return state;
+ return GRPC_CHANNEL_FATAL_FAILURE;
}
typedef enum {
@@ -87,7 +94,17 @@ typedef struct {
} state_watcher;
static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, "watch_connectivity");
+ grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
+ "watch_channel_connectivity");
+ } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
+ "watch_uchannel_connectivity");
+ } else {
+ abort();
+ }
gpr_mu_destroy(&w->mu);
gpr_free(w);
}
@@ -125,8 +142,13 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
w->removed = 1;
client_channel_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(w->channel));
- grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem,
- grpc_cq_pollset(w->cq));
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem,
+ grpc_cq_pollset(w->cq));
+ } else {
+ grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem,
+ grpc_cq_pollset(w->cq));
+ }
}
gpr_mu_unlock(&w->mu);
if (due_to_completion) {
@@ -199,18 +221,18 @@ void grpc_channel_watch_connectivity_state(
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
- if (client_channel_elem->filter != &grpc_client_channel_filter) {
- gpr_log(GPR_ERROR,
- "grpc_channel_watch_connectivity_state called on something that is "
- "not a client channel, but '%s'",
- client_channel_elem->filter->name);
- grpc_exec_ctx_enqueue(&exec_ctx, &w->on_complete, 1);
- } else {
- GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
+ if (client_channel_elem->filter == &grpc_client_channel_filter) {
+ GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem,
grpc_cq_pollset(cq));
grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
&w->state, &w->on_complete);
+ } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
+ GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity");
+ grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem,
+ grpc_cq_pollset(cq));
+ grpc_client_uchannel_watch_connectivity_state(
+ &exec_ctx, client_channel_elem, &w->state, &w->on_complete);
}
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 715c90a5e1..b2e66a830e 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -47,6 +47,7 @@
#include "src/core/client_config/resolvers/dns_resolver.h"
#include "src/core/client_config/resolvers/sockaddr_resolver.h"
#include "src/core/debug/trace.h"
+#include "src/core/iomgr/executor.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/profiling/timers.h"
#include "src/core/surface/api_trace.h"
@@ -108,6 +109,7 @@ void grpc_init(void) {
grpc_register_tracer("connectivity_state", &grpc_connectivity_state_trace);
grpc_security_pre_init();
grpc_iomgr_init();
+ grpc_executor_init();
grpc_tracer_init("GRPC_TRACE");
/* Only initialize census if noone else has. */
if (census_enabled() == CENSUS_FEATURE_NONE) {
@@ -132,6 +134,7 @@ void grpc_shutdown(void) {
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
grpc_iomgr_shutdown();
+ grpc_executor_shutdown();
census_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();