aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c34
-rw-r--r--src/core/surface/call.h1
-rw-r--r--src/core/surface/channel.c45
-rw-r--r--src/core/surface/channel.h2
-rw-r--r--src/core/surface/channel_connectivity.c33
-rw-r--r--src/core/surface/channel_create.c1
-rw-r--r--src/core/surface/channel_ping.c79
-rw-r--r--src/core/surface/lame_client.c6
-rw-r--r--src/core/surface/secure_channel_create.c17
-rw-r--r--src/core/surface/server_create.c13
10 files changed, 143 insertions, 88 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 84b9daaa28..5d064ef00d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -336,26 +336,19 @@ void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_cq_pollset(cq));
}
-grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
- return call->cq;
-}
-
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
-void grpc_call_internal_ref(grpc_call *c, const char *reason) {
- grpc_call_stack_ref(CALL_STACK_FROM_CALL(c), reason);
-}
-void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c,
- const char *reason) {
- grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c), reason);
-}
+#define REF_REASON reason
+#define REF_ARG , const char *reason
#else
-void grpc_call_internal_ref(grpc_call *c) {
- grpc_call_stack_ref(CALL_STACK_FROM_CALL(c));
+#define REF_REASON ""
+#define REF_ARG
+#endif
+void grpc_call_internal_ref(grpc_call *c REF_ARG) {
+ GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
}
-void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) {
- grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c));
+void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
+ GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
-#endif
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) {
size_t i;
@@ -742,8 +735,15 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
char *grpc_call_get_peer(grpc_call *call) {
grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- char *result = elem->filter->get_peer(&exec_ctx, elem);
+ char *result;
GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
+ result = elem->filter->get_peer(&exec_ctx, elem);
+ if (result == NULL) {
+ result = grpc_channel_get_target(call->channel);
+ }
+ if (result == NULL) {
+ result = gpr_strdup("unknown");
+ }
grpc_exec_ctx_finish(&exec_ctx);
return result;
}
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 20907ac6d6..b53340df8e 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -58,7 +58,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_completion_queue *cq);
-grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_call_internal_ref(grpc_call *call, const char *reason);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 14fe97c30d..573a0e742f 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -63,7 +63,6 @@ typedef struct registered_call {
struct grpc_channel {
int is_client;
- gpr_refcount refs;
gpr_uint32 max_message_length;
grpc_mdelem *default_authority;
@@ -81,6 +80,8 @@ struct grpc_channel {
/* the protobuf library will (by default) start warning at 100megs */
#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
+static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, int success);
+
grpc_channel *grpc_channel_create_from_filters(
grpc_exec_ctx *exec_ctx, const char *target,
const grpc_channel_filter **filters, size_t num_filters,
@@ -93,8 +94,6 @@ grpc_channel *grpc_channel_create_from_filters(
channel->target = gpr_strdup(target);
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
- /* decremented by grpc_channel_destroy */
- gpr_ref_init(&channel->refs, 1);
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
@@ -153,7 +152,9 @@ grpc_channel *grpc_channel_create_from_filters(
gpr_free(default_authority);
}
- grpc_channel_stack_init(exec_ctx, filters, num_filters, channel, args,
+ grpc_channel_stack_init(exec_ctx, 1, destroy_channel, channel, filters,
+ num_filters, args,
+ is_client ? "CLIENT_CHANNEL" : "SERVER_CHANNEL",
CHANNEL_STACK_FROM_CHANNEL(channel));
return channel;
@@ -250,17 +251,25 @@ grpc_call *grpc_channel_create_registered_call(
rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline);
}
-#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
-void grpc_channel_internal_ref(grpc_channel *c, const char *reason) {
- gpr_log(GPR_DEBUG, "CHANNEL: ref %p %d -> %d [%s]", c, c->refs.count,
- c->refs.count + 1, reason);
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+#define REF_REASON reason
+#define REF_ARG , const char *reason
#else
-void grpc_channel_internal_ref(grpc_channel *c) {
+#define REF_REASON ""
+#define REF_ARG
#endif
- gpr_ref(&c->refs);
+void grpc_channel_internal_ref(grpc_channel *c REF_ARG) {
+ GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON);
}
-static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) {
+void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx,
+ grpc_channel *c REF_ARG) {
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(c), REF_REASON);
+}
+
+static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ grpc_channel *channel = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel));
while (channel->registered_calls) {
registered_call *rc = channel->registered_calls;
@@ -279,20 +288,6 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, grpc_channel *channel) {
gpr_free(channel);
}
-#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
-void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
- const char *reason) {
- gpr_log(GPR_DEBUG, "CHANNEL: unref %p %d -> %d [%s]", channel,
- channel->refs.count, channel->refs.count - 1, reason);
-#else
-void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx,
- grpc_channel *channel) {
-#endif
- if (gpr_unref(&channel->refs)) {
- destroy_channel(exec_ctx, channel);
- }
-}
-
void grpc_channel_destroy(grpc_channel *channel) {
grpc_transport_op op;
grpc_channel_element *elem;
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index 7dea609ebc..3d2ff23542 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -53,7 +53,7 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
-#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_channel_internal_ref(grpc_channel *channel, const char *reason);
void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
const char *reason);
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index df2774b527..ad1c9b334e 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -83,7 +83,6 @@ typedef struct {
gpr_mu mu;
callback_phase phase;
int success;
- int removed;
grpc_closure on_complete;
grpc_timer alarm;
grpc_connectivity_state state;
@@ -135,30 +134,15 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
int due_to_completion) {
int delete = 0;
- grpc_channel_element *client_channel_elem = NULL;
- gpr_mu_lock(&w->mu);
- if (w->removed == 0) {
- w->removed = 1;
- client_channel_elem = grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(w->channel));
- if (client_channel_elem->filter == &grpc_client_channel_filter) {
- grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem,
- grpc_cq_pollset(w->cq));
- } else {
- grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem,
- grpc_cq_pollset(w->cq));
- }
- }
- gpr_mu_unlock(&w->mu);
if (due_to_completion) {
- gpr_mu_lock(&w->mu);
- w->success = 1;
- gpr_mu_unlock(&w->mu);
grpc_timer_cancel(exec_ctx, &w->alarm);
}
gpr_mu_lock(&w->mu);
+ if (due_to_completion) {
+ w->success = 1;
+ }
switch (w->phase) {
case WAITING:
w->phase = CALLING_BACK;
@@ -212,7 +196,6 @@ void grpc_channel_watch_connectivity_state(
w->phase = WAITING;
w->state = last_observed_state;
w->success = 0;
- w->removed = 0;
w->cq = cq;
w->tag = tag;
w->channel = channel;
@@ -223,16 +206,14 @@ void grpc_channel_watch_connectivity_state(
if (client_channel_elem->filter == &grpc_client_channel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
- grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem,
- grpc_cq_pollset(cq));
grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
- &w->state, &w->on_complete);
+ grpc_cq_pollset(cq), &w->state,
+ &w->on_complete);
} else if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity");
- grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem,
- grpc_cq_pollset(cq));
grpc_client_uchannel_watch_connectivity_state(
- &exec_ctx, client_channel_elem, &w->state, &w->on_complete);
+ &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state,
+ &w->on_complete);
}
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index fe7e1072ac..97ec23408f 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -171,7 +171,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1);
args->args = final_args;
- args->master = f->master;
s = grpc_subchannel_create(&c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
grpc_channel_args_destroy(final_args);
diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c
new file mode 100644
index 0000000000..1b6f06ded1
--- /dev/null
+++ b/src/core/surface/channel_ping.c
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/surface/channel.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/surface/api_trace.h"
+#include "src/core/surface/completion_queue.h"
+
+typedef struct {
+ grpc_closure closure;
+ void *tag;
+ grpc_completion_queue *cq;
+ grpc_cq_completion completion_storage;
+} ping_result;
+
+static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_cq_completion *storage) {
+ gpr_free(arg);
+}
+
+static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ ping_result *pr = arg;
+ grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr,
+ &pr->completion_storage);
+}
+
+void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
+ void *tag, void *reserved) {
+ grpc_transport_op op;
+ ping_result *pr = gpr_malloc(sizeof(*pr));
+ grpc_channel_element *top_elem =
+ grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GPR_ASSERT(reserved == NULL);
+ memset(&op, 0, sizeof(op));
+ pr->tag = tag;
+ pr->cq = cq;
+ grpc_closure_init(&pr->closure, ping_done, pr);
+ op.send_ping = &pr->closure;
+ op.bind_pollset = grpc_cq_pollset(cq);
+ grpc_cq_begin_op(cq);
+ top_elem->filter->start_transport_op(&exec_ctx, top_elem, &op);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 4a55544ac1..a60e9d20da 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -49,7 +49,6 @@ typedef struct {
} call_data;
typedef struct {
- grpc_channel *master;
grpc_status_code error_code;
const char *error_message;
} channel_data;
@@ -84,8 +83,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_channel_get_target(chand->master);
+ return NULL;
}
static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -111,10 +109,8 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args) {
- channel_data *chand = elem->channel_data;
GPR_ASSERT(args->is_first);
GPR_ASSERT(args->is_last);
- chand->master = args->master;
}
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index c9a54d9237..92bd53411d 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -228,7 +228,6 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
args->args = final_args;
- args->master = f->master;
s = grpc_subchannel_create(&c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
grpc_channel_args_destroy(final_args);
@@ -305,22 +304,22 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
resolver = grpc_resolver_create(target, &f->base);
- if (!resolver) {
- grpc_exec_ctx_finish(&exec_ctx);
- return NULL;
+ if (resolver) {
+ grpc_client_channel_set_resolver(
+ &exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
}
-
- grpc_client_channel_set_resolver(
- &exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
- GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
grpc_subchannel_factory_unref(&exec_ctx, &f->base);
GRPC_SECURITY_CONNECTOR_UNREF(&security_connector->base, "channel_create");
-
grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(new_args_from_connector);
}
+ if (!resolver) {
+ GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, channel, "subchannel_factory");
+ channel = NULL;
+ }
grpc_exec_ctx_finish(&exec_ctx);
return channel;
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index c7811a6d88..e362bb4376 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -32,14 +32,21 @@
*/
#include <grpc/grpc.h>
+#include "src/core/census/grpc_filter.h"
+#include "src/core/channel/channel_args.h"
+#include "src/core/channel/compress_filter.h"
#include "src/core/surface/api_trace.h"
#include "src/core/surface/completion_queue.h"
#include "src/core/surface/server.h"
-#include "src/core/channel/compress_filter.h"
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
- const grpc_channel_filter *filters[] = {&grpc_compress_filter};
+ const grpc_channel_filter *filters[3];
+ size_t num_filters = 0;
+ filters[num_filters++] = &grpc_compress_filter;
+ if (grpc_channel_args_is_census_enabled(args)) {
+ filters[num_filters++] = &grpc_server_census_filter;
+ }
GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
- return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters),
+ return grpc_server_create_from_filters(filters, num_filters,
args);
}