aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_config/channel_connectivity.c2
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c35
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c43
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c49
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c44
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c13
-rw-r--r--src/core/lib/channel/channel_args.h2
-rw-r--r--src/core/lib/channel/handshaker.c191
-rw-r--r--src/core/lib/channel/handshaker.h145
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c30
-rw-r--r--src/core/lib/iomgr/network_status_tracker.c9
-rw-r--r--src/core/lib/iomgr/tcp_server.h4
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c13
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c3
-rw-r--r--src/core/lib/iomgr/udp_server.c40
-rw-r--r--src/core/lib/support/log.c19
-rw-r--r--src/core/lib/support/slice.c11
-rw-r--r--src/core/lib/transport/transport.h2
18 files changed, 575 insertions, 80 deletions
diff --git a/src/core/ext/client_config/channel_connectivity.c b/src/core/ext/client_config/channel_connectivity.c
index c1220e3a8c..ce3c13a4ee 100644
--- a/src/core/ext/client_config/channel_connectivity.c
+++ b/src/core/ext/client_config/channel_connectivity.c
@@ -59,7 +59,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
}
gpr_log(GPR_ERROR,
"grpc_channel_check_connectivity_state called on something that is "
- "not a (u)client channel, but '%s'",
+ "not a client channel, but '%s'",
client_channel_elem->filter->name);
grpc_exec_ctx_finish(&exec_ctx);
return GRPC_CHANNEL_SHUTDOWN;
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 85f9efb3b6..645a011748 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -45,6 +45,7 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/compress_filter.h"
+#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/surface/api_trace.h"
@@ -63,6 +64,8 @@ typedef struct {
grpc_endpoint *tcp;
grpc_closure connected;
+
+ grpc_handshake_manager *handshake_mgr;
} connector;
static void connector_ref(grpc_connector *con) {
@@ -74,6 +77,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
/* c->initial_string_buffer does not need to be destroyed */
+ grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
gpr_free(c);
}
}
@@ -83,9 +87,21 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
connector_unref(exec_ctx, arg);
}
+static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
+ grpc_channel_args *args, void *user_data) {
+ connector *c = user_data;
+ c->result->transport =
+ grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1);
+ GPR_ASSERT(c->result->transport);
+ grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0);
+ c->result->channel_args = args;
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL);
+}
+
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
- grpc_closure *notify;
grpc_endpoint *tcp = c->tcp;
if (tcp != NULL) {
if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
@@ -97,19 +113,17 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector_ref(arg);
grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
&c->initial_string_sent);
+ } else {
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, c->handshake_mgr, tcp, c->args.channel_args,
+ c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
}
- c->result->transport =
- grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, tcp, 1);
- grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
- 0);
- GPR_ASSERT(c->result->transport);
- c->result->channel_args = grpc_channel_args_copy(c->args.channel_args);
} else {
memset(c->result, 0, sizeof(*c->result));
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}
- notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}
static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {}
@@ -171,6 +185,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1);
+ c->handshake_mgr = grpc_handshake_manager_create();
args->args = final_args;
s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 9acacbd92d..01d949add3 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -44,6 +44,7 @@
#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
@@ -69,6 +70,11 @@ typedef struct {
grpc_endpoint *newly_connecting_endpoint;
grpc_closure connected_closure;
+
+ grpc_handshake_manager *handshake_mgr;
+
+ // TODO(roth): Remove once we eliminate on_secure_handshake_done().
+ grpc_channel_args *tmp_args;
} connector;
static void connector_ref(grpc_connector *con) {
@@ -80,6 +86,8 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
/* c->initial_string_buffer does not need to be destroyed */
+ grpc_channel_args_destroy(c->tmp_args);
+ grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
gpr_free(c);
}
}
@@ -89,7 +97,6 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *secure_endpoint,
grpc_auth_context *auth_context) {
connector *c = arg;
- grpc_closure *notify;
gpr_mu_lock(&c->mu);
grpc_error *error = GRPC_ERROR_NONE;
if (c->connecting_endpoint == NULL) {
@@ -110,25 +117,36 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
0);
auth_context_arg = grpc_auth_context_to_arg(auth_context);
- c->result->channel_args = grpc_channel_args_copy_and_add(
- c->args.channel_args, &auth_context_arg, 1);
+ c->result->channel_args =
+ grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1);
}
- notify = c->notify;
+ grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
}
+static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
+ grpc_channel_args *args, void *user_data) {
+ connector *c = user_data;
+ // TODO(roth, jboeuf): Convert security connector handshaking to use new
+ // handshake API, and then move the code from on_secure_handshake_done()
+ // into this function.
+ c->tmp_args = args;
+ grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector,
+ endpoint, c->args.deadline,
+ on_secure_handshake_done, c);
+}
+
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
connector *c = arg;
- grpc_channel_security_connector_do_handshake(
- exec_ctx, c->security_connector, c->connecting_endpoint, c->args.deadline,
- on_secure_handshake_done, c);
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args,
+ c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
}
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
- grpc_closure *notify;
grpc_endpoint *tcp = c->newly_connecting_endpoint;
if (tcp != NULL) {
gpr_mu_lock(&c->mu);
@@ -144,13 +162,13 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
&c->initial_string_sent);
} else {
- grpc_channel_security_connector_do_handshake(
- exec_ctx, c->security_connector, tcp, c->args.deadline,
- on_secure_handshake_done, c);
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, c->handshake_mgr, tcp, c->args.channel_args,
+ c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
}
} else {
memset(c->result, 0, sizeof(*c->result));
- notify = c->notify;
+ grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}
@@ -229,6 +247,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
+ c->handshake_mgr = grpc_handshake_manager_create();
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
args->args = final_args;
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index e5c987925c..a33fc93382 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -37,16 +37,26 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
+
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
- grpc_endpoint *tcp, grpc_pollset *accepting_pollset,
- grpc_tcp_server_acceptor *acceptor) {
+typedef struct server_connect_state {
+ grpc_server *server;
+ grpc_pollset *accepting_pollset;
+ grpc_tcp_server_acceptor *acceptor;
+ grpc_handshake_manager *handshake_mgr;
+} server_connect_state;
+
+static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
+ grpc_channel_args *args, void *user_data) {
+ server_connect_state *state = user_data;
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
* grpc_tcp_server_destroy(). This is fine here, but similar code
@@ -54,18 +64,40 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
* (as in server_secure_chttp2.c) needs to add synchronization to avoid this
* case.
*/
- grpc_transport *transport = grpc_create_chttp2_transport(
- exec_ctx, grpc_server_get_channel_args(server), tcp, 0);
- grpc_server_setup_transport(exec_ctx, server, transport, accepting_pollset,
- grpc_server_get_channel_args(server));
+ grpc_transport *transport =
+ grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0);
+ grpc_server_setup_transport(exec_ctx, state->server, transport,
+ state->accepting_pollset,
+ grpc_server_get_channel_args(state->server));
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
+ // Clean up.
+ grpc_channel_args_destroy(args);
+ grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
+ gpr_free(state);
+}
+
+static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp,
+ grpc_pollset *accepting_pollset,
+ grpc_tcp_server_acceptor *acceptor) {
+ server_connect_state *state = gpr_malloc(sizeof(server_connect_state));
+ state->server = server;
+ state->accepting_pollset = accepting_pollset;
+ state->acceptor = acceptor;
+ state->handshake_mgr = grpc_handshake_manager_create();
+ // TODO(roth): We should really get this timeout value from channel
+ // args instead of hard-coding it.
+ const gpr_timespec deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server),
+ deadline, acceptor, on_handshake_done, state);
}
/* Server callback: start listening on our ports */
static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_pollset **pollsets, size_t pollset_count) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, new_transport,
+ grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept,
server);
}
@@ -74,6 +106,7 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp;
+ grpc_tcp_server_shutdown_listeners(exec_ctx, tcp);
grpc_tcp_server_unref(exec_ctx, tcp);
grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL);
}
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index c42810e913..67e5ee897c 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -42,6 +42,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
@@ -68,6 +69,12 @@ typedef struct server_secure_state {
typedef struct server_secure_connect {
server_secure_state *state;
grpc_pollset *accepting_pollset;
+ grpc_tcp_server_acceptor *acceptor;
+ grpc_handshake_manager *handshake_mgr;
+ // TODO(roth): Remove the following two fields when we eliminate
+ // grpc_server_security_connector_do_handshake().
+ gpr_timespec deadline;
+ grpc_channel_args *args;
} server_secure_connect;
static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); }
@@ -97,13 +104,11 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
transport = grpc_create_chttp2_transport(
exec_ctx, grpc_server_get_channel_args(state->state->server),
secure_endpoint, 0);
- grpc_channel_args *args_copy;
grpc_arg args_to_add[2];
args_to_add[0] = grpc_server_credentials_to_arg(state->state->creds);
args_to_add[1] = grpc_auth_context_to_arg(auth_context);
- args_copy = grpc_channel_args_copy_and_add(
- grpc_server_get_channel_args(state->state->server), args_to_add,
- GPR_ARRAY_SIZE(args_to_add));
+ grpc_channel_args *args_copy = grpc_channel_args_copy_and_add(
+ state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add));
grpc_server_setup_transport(exec_ctx, state->state->server, transport,
state->accepting_pollset, args_copy);
grpc_channel_args_destroy(args_copy);
@@ -118,10 +123,25 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
} else {
gpr_log(GPR_ERROR, "Secure transport failed with error %d", status);
}
+ grpc_channel_args_destroy(state->args);
state_unref(state->state);
gpr_free(state);
}
+static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
+ grpc_channel_args *args, void *user_data) {
+ server_secure_connect *state = user_data;
+ // TODO(roth, jboeuf): Convert security connector handshaking to use new
+ // handshake API, and then move the code from on_secure_handshake_done()
+ // into this function.
+ grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
+ state->handshake_mgr = NULL;
+ state->args = args;
+ grpc_server_security_connector_do_handshake(
+ exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline,
+ on_secure_handshake_done, state);
+}
+
static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
@@ -129,11 +149,16 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
state->state = statep;
state_ref(state->state);
state->accepting_pollset = accepting_pollset;
- grpc_server_security_connector_do_handshake(
- exec_ctx, state->state->sc, acceptor, tcp,
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_seconds(120, GPR_TIMESPAN)),
- on_secure_handshake_done, state);
+ state->acceptor = acceptor;
+ state->handshake_mgr = grpc_handshake_manager_create();
+ // TODO(roth): We should really get this timeout value from channel
+ // args instead of hard-coding it.
+ state->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds(120, GPR_TIMESPAN));
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, state->handshake_mgr, tcp,
+ grpc_server_get_channel_args(state->state->server), state->deadline,
+ acceptor, on_handshake_done, state);
}
/* Server callback: start listening on our ports */
@@ -166,6 +191,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
state->destroy_callback = callback;
tcp = state->tcp;
gpr_mu_unlock(&state->mu);
+ grpc_tcp_server_shutdown_listeners(exec_ctx, tcp);
grpc_tcp_server_unref(exec_ctx, tcp);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index a32d45c1c2..6bd65cea02 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -2005,6 +2005,7 @@ static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
+ grpc_error *err = GRPC_ERROR_NONE;
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
@@ -2013,15 +2014,13 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing,
t->read_buffer.slices[i]);
};
- if (i != t->read_buffer.count) {
+ if (errors[1] == GRPC_ERROR_NONE) {
+ err = GRPC_ERROR_REF(error);
+ } else {
errors[2] = try_http_parsing(exec_ctx, t);
+ err = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
+ GPR_ARRAY_SIZE(errors));
}
- grpc_error *err =
- errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE &&
- errors[2] == GRPC_ERROR_NONE
- ? GRPC_ERROR_NONE
- : GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
- GPR_ARRAY_SIZE(errors));
for (i = 0; i < GPR_ARRAY_SIZE(errors); i++) {
GRPC_ERROR_UNREF(errors[i]);
}
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 653d04f427..aec61ee7c6 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -37,6 +37,8 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
+// Channel args are intentionally immutable, to avoid the need for locking.
+
/** Copy the arguments in \a src into a new instance */
grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
new file mode 100644
index 0000000000..b3ee0ed6f9
--- /dev/null
+++ b/src/core/lib/channel/handshaker.c
@@ -0,0 +1,191 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/impl/codegen/alloc.h>
+#include <grpc/impl/codegen/log.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/channel/handshaker.h"
+
+//
+// grpc_handshaker
+//
+
+void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable,
+ grpc_handshaker* handshaker) {
+ handshaker->vtable = vtable;
+}
+
+void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {
+ handshaker->vtable->destroy(exec_ctx, handshaker);
+}
+
+void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker) {
+ handshaker->vtable->shutdown(exec_ctx, handshaker);
+}
+
+void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker,
+ grpc_endpoint* endpoint,
+ grpc_channel_args* args,
+ gpr_timespec deadline,
+ grpc_tcp_server_acceptor* acceptor,
+ grpc_handshaker_done_cb cb, void* user_data) {
+ handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args,
+ deadline, acceptor, cb, user_data);
+}
+
+//
+// grpc_handshake_manager
+//
+
+// State used while chaining handshakers.
+struct grpc_handshaker_state {
+ // The index of the handshaker to invoke next.
+ size_t index;
+ // The deadline for all handshakers.
+ gpr_timespec deadline;
+ // The acceptor to call the handshakers with.
+ grpc_tcp_server_acceptor* acceptor;
+ // The final callback and user_data to invoke after the last handshaker.
+ grpc_handshaker_done_cb final_cb;
+ void* final_user_data;
+};
+
+struct grpc_handshake_manager {
+ // An array of handshakers added via grpc_handshake_manager_add().
+ size_t count;
+ grpc_handshaker** handshakers;
+ // State used while chaining handshakers.
+ struct grpc_handshaker_state* state;
+};
+
+grpc_handshake_manager* grpc_handshake_manager_create() {
+ grpc_handshake_manager* mgr = gpr_malloc(sizeof(grpc_handshake_manager));
+ memset(mgr, 0, sizeof(*mgr));
+ return mgr;
+}
+
+static bool is_power_of_2(size_t n) { return (n & (n - 1)) == 0; }
+
+void grpc_handshake_manager_add(grpc_handshaker* handshaker,
+ grpc_handshake_manager* mgr) {
+ // To avoid allocating memory for each handshaker we add, we double
+ // the number of elements every time we need more.
+ size_t realloc_count = 0;
+ if (mgr->count == 0) {
+ realloc_count = 2;
+ } else if (mgr->count >= 2 && is_power_of_2(mgr->count)) {
+ realloc_count = mgr->count * 2;
+ }
+ if (realloc_count > 0) {
+ mgr->handshakers =
+ gpr_realloc(mgr->handshakers, realloc_count * sizeof(grpc_handshaker*));
+ }
+ mgr->handshakers[mgr->count++] = handshaker;
+}
+
+void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr) {
+ for (size_t i = 0; i < mgr->count; ++i) {
+ grpc_handshaker_destroy(exec_ctx, mgr->handshakers[i]);
+ }
+ gpr_free(mgr->handshakers);
+ gpr_free(mgr);
+}
+
+void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr) {
+ // FIXME: maybe check which handshaker is currently in progress, and
+ // only shut down that one?
+ for (size_t i = 0; i < mgr->count; ++i) {
+ grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]);
+ }
+ if (mgr->state != NULL) {
+ gpr_free(mgr->state);
+ mgr->state = NULL;
+ }
+}
+
+// A function used as the handshaker-done callback when chaining
+// handshakers together.
+static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* endpoint,
+ grpc_channel_args* args, void* user_data) {
+ grpc_handshake_manager* mgr = user_data;
+ GPR_ASSERT(mgr->state != NULL);
+ GPR_ASSERT(mgr->state->index < mgr->count);
+ grpc_handshaker_done_cb cb = call_next_handshaker;
+ // If this is the last handshaker, use the caller-supplied callback
+ // and user_data instead of chaining back to this function again.
+ if (mgr->state->index == mgr->count - 1) {
+ cb = mgr->state->final_cb;
+ user_data = mgr->state->final_user_data;
+ }
+ // Invoke handshaker.
+ grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->state->index],
+ endpoint, args, mgr->state->deadline,
+ mgr->state->acceptor, cb, user_data);
+ ++mgr->state->index;
+ // If this is the last handshaker, clean up state.
+ if (mgr->state->index == mgr->count) {
+ gpr_free(mgr->state);
+ mgr->state = NULL;
+ }
+}
+
+void grpc_handshake_manager_do_handshake(
+ grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
+ grpc_endpoint* endpoint, const grpc_channel_args* args,
+ gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
+ grpc_handshaker_done_cb cb, void* user_data) {
+ grpc_channel_args* args_copy = grpc_channel_args_copy(args);
+ if (mgr->count == 0) {
+ // No handshakers registered, so we just immediately call the done
+ // callback with the passed-in endpoint.
+ cb(exec_ctx, endpoint, args_copy, user_data);
+ } else {
+ GPR_ASSERT(mgr->state == NULL);
+ mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state));
+ memset(mgr->state, 0, sizeof(*mgr->state));
+ mgr->state->deadline = deadline;
+ mgr->state->acceptor = acceptor;
+ mgr->state->final_cb = cb;
+ mgr->state->final_user_data = user_data;
+ call_next_handshaker(exec_ctx, endpoint, args_copy, mgr);
+ }
+}
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
new file mode 100644
index 0000000000..b1e91dba4f
--- /dev/null
+++ b/src/core/lib/channel/handshaker.h
@@ -0,0 +1,145 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H
+#define GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H
+
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/impl/codegen/time.h>
+
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+
+/// Handshakers are used to perform initial handshakes on a connection
+/// before the client sends the initial request. Some examples of what
+/// a handshaker can be used for includes support for HTTP CONNECT on
+/// the client side and various types of security initialization.
+///
+/// In general, handshakers should be used via a handshake manager.
+
+///
+/// grpc_handshaker
+///
+
+typedef struct grpc_handshaker grpc_handshaker;
+
+/// Callback type invoked when a handshaker is done.
+/// Takes ownership of \a args.
+typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx,
+ grpc_endpoint* endpoint,
+ grpc_channel_args* args,
+ void* user_data);
+
+struct grpc_handshaker_vtable {
+ /// Destroys the handshaker.
+ void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
+
+ /// Shuts down the handshaker (e.g., to clean up when the operation is
+ /// aborted in the middle).
+ void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
+
+ /// Performs handshaking. When finished, calls \a cb with \a user_data.
+ /// Takes ownership of \a args.
+ /// \a acceptor will be NULL for client-side handshakers.
+ void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
+ grpc_endpoint* endpoint, grpc_channel_args* args,
+ gpr_timespec deadline,
+ grpc_tcp_server_acceptor* acceptor,
+ grpc_handshaker_done_cb cb, void* user_data);
+};
+
+/// Base struct. To subclass, make this the first member of the
+/// implementation struct.
+struct grpc_handshaker {
+ const struct grpc_handshaker_vtable* vtable;
+};
+
+/// Called by concrete implementations to initialize the base struct.
+void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable,
+ grpc_handshaker* handshaker);
+
+/// Convenient wrappers for invoking methods via the vtable.
+/// These probably do not need to be called from anywhere but
+/// grpc_handshake_manager.
+void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker);
+void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker);
+void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
+ grpc_handshaker* handshaker,
+ grpc_endpoint* endpoint,
+ grpc_channel_args* args,
+ gpr_timespec deadline,
+ grpc_tcp_server_acceptor* acceptor,
+ grpc_handshaker_done_cb cb, void* user_data);
+
+///
+/// grpc_handshake_manager
+///
+
+typedef struct grpc_handshake_manager grpc_handshake_manager;
+
+/// Creates a new handshake manager. Caller takes ownership.
+grpc_handshake_manager* grpc_handshake_manager_create();
+
+/// Adds a handshaker to the handshake manager.
+/// Takes ownership of \a mgr.
+void grpc_handshake_manager_add(grpc_handshaker* handshaker,
+ grpc_handshake_manager* mgr);
+
+/// Destroys the handshake manager.
+void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr);
+
+/// Shuts down the handshake manager (e.g., to clean up when the operation is
+/// aborted in the middle).
+/// The caller must still call grpc_handshake_manager_destroy() after
+/// calling this function.
+void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
+ grpc_handshake_manager* mgr);
+
+/// Invokes handshakers in the order they were added.
+/// Does NOT take ownership of \a args. Instead, makes a copy before
+/// invoking the first handshaker.
+/// \a acceptor will be NULL for client-side handshakers.
+/// If successful, invokes \a cb with \a user_data after all handshakers
+/// have completed.
+void grpc_handshake_manager_do_handshake(
+ grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
+ grpc_endpoint* endpoint, const grpc_channel_args* args,
+ gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
+ grpc_handshaker_done_cb cb, void* user_data);
+
+#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 4b593f4b2c..16a5e3083e 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -844,6 +844,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
*worker_hdl = &worker;
grpc_error *error = GRPC_ERROR_NONE;
+ /* Avoid malloc for small number of elements. */
+ enum { inline_elements = 96 };
+ struct pollfd pollfd_space[inline_elements];
+ struct grpc_fd_watcher watcher_space[inline_elements];
+
/* pollset->mu already held */
int added_worker = 0;
int locked = 1;
@@ -899,15 +904,23 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int r;
size_t i, fd_count;
nfds_t pfd_count;
- /* TODO(ctiller): inline some elements to avoid an allocation */
grpc_fd_watcher *watchers;
struct pollfd *pfds;
timeout = poll_deadline_to_millis_timeout(deadline, now);
- /* TODO(ctiller): perform just one malloc here if we exceed the inline
- * case */
- pfds = gpr_malloc(sizeof(*pfds) * (pollset->fd_count + 2));
- watchers = gpr_malloc(sizeof(*watchers) * (pollset->fd_count + 2));
+
+ if (pollset->fd_count + 2 <= inline_elements) {
+ pfds = pollfd_space;
+ watchers = watcher_space;
+ } else {
+ /* Allocate one buffer to hold both pfds and watchers arrays */
+ const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
+ const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
+ void *buf = gpr_malloc(pfd_size + watch_size);
+ pfds = buf;
+ watchers = (void *)((char *)buf + pfd_size);
+ }
+
fd_count = 0;
pfd_count = 2;
pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
@@ -974,8 +987,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
}
- gpr_free(pfds);
- gpr_free(watchers);
+ if (pfds != pollfd_space) {
+ /* pfds and watchers are in the same memory block pointed to by pfds */
+ gpr_free(pfds);
+ }
+
GPR_TIMER_END("maybe_work_and_unlock", 0);
locked = 0;
} else {
diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c
index 90c074b007..b4bb7e3cf7 100644
--- a/src/core/lib/iomgr/network_status_tracker.c
+++ b/src/core/lib/iomgr/network_status_tracker.c
@@ -56,6 +56,15 @@ void grpc_network_status_init(void) {
// TODO(makarandd): Install callback with OS to monitor network status.
}
+void grpc_destroy_network_status_monitor() {
+ for (endpoint_ll_node *curr = head; curr != NULL;) {
+ endpoint_ll_node *next = curr->next;
+ gpr_free(curr);
+ curr = next;
+ }
+ gpr_mu_destroy(&g_endpoint_mutex);
+}
+
void grpc_network_status_register_endpoint(grpc_endpoint *ep) {
gpr_mu_lock(&g_endpoint_mutex);
if (head == NULL) {
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 875a6ca14a..5a25d39a0c 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -105,4 +105,8 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
a call (exec_ctx!=NULL) to shutdown_complete. */
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s);
+/* Shutdown the fds of listeners. */
+void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_server *s);
+
#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index cb2ff782d6..38ebd2dbcb 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -741,4 +741,17 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
}
+void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_server *s) {
+ gpr_mu_lock(&s->mu);
+ /* shutdown all fd's */
+ if (s->active_ports) {
+ grpc_tcp_listener *sp;
+ for (sp = s->head; sp; sp = sp->next) {
+ grpc_fd_shutdown(exec_ctx, sp->emfd);
+ }
+ }
+ gpr_mu_unlock(&s->mu);
+}
+
#endif
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index 7b0966704c..1b125e7005 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -540,4 +540,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
gpr_mu_unlock(&s->mu);
}
+void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
+ grpc_tcp_server *s) {}
+
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 1ebccf2ee2..48032412a2 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -60,6 +60,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -128,7 +129,7 @@ grpc_udp_server *grpc_udp_server_create(void) {
}
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1, NULL);
+ grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
gpr_mu_destroy(&s->mu);
gpr_cv_destroy(&s->cv);
@@ -138,7 +139,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
}
static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server,
- bool success) {
+ grpc_error *error) {
grpc_udp_server *s = server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
@@ -217,14 +218,23 @@ static int prepare_socket(int fd, const struct sockaddr *addr,
goto error;
}
- if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1)) {
- gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
- strerror(errno));
+ if (grpc_set_socket_nonblocking(fd, 1) != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR, "Unable to set nonblocking %d: %s", fd, strerror(errno));
+ goto error;
+ }
+ if (grpc_set_socket_cloexec(fd, 1) != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR, "Unable to set cloexec %d: %s", fd, strerror(errno));
+ goto error;
}
- if (grpc_set_socket_ip_pktinfo_if_possible(fd) &&
- addr->sa_family == AF_INET6) {
- grpc_set_socket_ipv6_recvpktinfo_if_possible(fd);
+ if (grpc_set_socket_ip_pktinfo_if_possible(fd) != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR, "Unable to set ip_pktinfo.");
+ goto error;
+ } else if (addr->sa_family == AF_INET6) {
+ if (grpc_set_socket_ipv6_recvpktinfo_if_possible(fd) != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR, "Unable to set ipv6_recvpktinfo.");
+ goto error;
+ }
}
GPR_ASSERT(addr_len < ~(socklen_t)0);
@@ -241,13 +251,13 @@ static int prepare_socket(int fd, const struct sockaddr *addr,
goto error;
}
- if (!grpc_set_socket_sndbuf(fd, buffer_size_bytes)) {
+ if (grpc_set_socket_sndbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes",
buffer_size_bytes);
goto error;
}
- if (!grpc_set_socket_rcvbuf(fd, buffer_size_bytes)) {
+ if (grpc_set_socket_rcvbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes",
buffer_size_bytes);
goto error;
@@ -263,10 +273,10 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
server_port *sp = arg;
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_mu_unlock(&sp->server->mu);
@@ -369,7 +379,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
/* Try listening on IPv6 first. */
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
- fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
+ // TODO(rjshade): Test and propagate the returned grpc_error*:
+ grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
allocated_port1 =
add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
@@ -384,7 +395,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr_len = sizeof(wild4);
}
- fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
+ // TODO(rjshade): Test and propagate the returned grpc_error*:
+ grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd);
if (fd < 0) {
gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
}
diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c
index bae0957df7..899f1218b6 100644
--- a/src/core/lib/support/log.c
+++ b/src/core/lib/support/log.c
@@ -79,17 +79,18 @@ void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) {
void gpr_log_verbosity_init() {
char *verbosity = gpr_getenv("GRPC_VERBOSITY");
- if (verbosity == NULL) return;
- gpr_atm min_severity_to_print = GPR_LOG_VERBOSITY_UNSET;
- if (strcmp(verbosity, "DEBUG") == 0) {
- min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_DEBUG;
- } else if (strcmp(verbosity, "INFO") == 0) {
- min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_INFO;
- } else if (strcmp(verbosity, "ERROR") == 0) {
- min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_ERROR;
+ gpr_atm min_severity_to_print = GPR_LOG_SEVERITY_ERROR;
+ if (verbosity != NULL) {
+ if (strcmp(verbosity, "DEBUG") == 0) {
+ min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_DEBUG;
+ } else if (strcmp(verbosity, "INFO") == 0) {
+ min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_INFO;
+ } else if (strcmp(verbosity, "ERROR") == 0) {
+ min_severity_to_print = (gpr_atm)GPR_LOG_SEVERITY_ERROR;
+ }
+ gpr_free(verbosity);
}
- gpr_free(verbosity);
if ((gpr_atm_no_barrier_load(&g_min_severity_to_print)) ==
GPR_LOG_VERBOSITY_UNSET) {
gpr_atm_no_barrier_store(&g_min_severity_to_print, min_severity_to_print);
diff --git a/src/core/lib/support/slice.c b/src/core/lib/support/slice.c
index b9a7c77bda..8a2c0a9086 100644
--- a/src/core/lib/support/slice.c
+++ b/src/core/lib/support/slice.c
@@ -94,14 +94,16 @@ static void new_slice_unref(void *p) {
}
}
-gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) {
+gpr_slice gpr_slice_new_with_user_data(void *p, size_t len,
+ void (*destroy)(void *),
+ void *user_data) {
gpr_slice slice;
new_slice_refcount *rc = gpr_malloc(sizeof(new_slice_refcount));
gpr_ref_init(&rc->refs, 1);
rc->rc.ref = new_slice_ref;
rc->rc.unref = new_slice_unref;
rc->user_destroy = destroy;
- rc->user_data = p;
+ rc->user_data = user_data;
slice.refcount = &rc->rc;
slice.data.refcounted.bytes = p;
@@ -109,6 +111,11 @@ gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) {
return slice;
}
+gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) {
+ /* Pass "p" to *destroy when the slice is no longer needed. */
+ return gpr_slice_new_with_user_data(p, len, destroy, p);
+}
+
/* gpr_slice_new_with_len support structures - we create a refcount object
extended with the user provided data pointer & destroy function */
typedef struct new_with_len_slice_refcount {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 08c0a237c9..e33fc5c761 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -138,7 +138,7 @@ typedef struct grpc_transport_stream_op {
/** If != GRPC_ERROR_NONE, cancel this stream */
grpc_error *cancel_error;
- /** If != GRPC_ERROR, send grpc-status, grpc-message, and close this
+ /** If != GRPC_ERROR_NONE, send grpc-status, grpc-message, and close this
stream for both reading and writing */
grpc_error *close_error;