aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/census/census_log.h2
-rw-r--r--src/core/ext/census/mlog.h2
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c11
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c11
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c9
-rw-r--r--src/core/lib/channel/channel_args.c6
-rw-r--r--src/core/lib/channel/channel_args.h8
-rw-r--r--src/core/lib/iomgr/endpoint.c2
-rw-r--r--src/core/lib/iomgr/endpoint.h5
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c2
-rw-r--r--src/core/lib/iomgr/socket_mutator.c98
-rw-r--r--src/core/lib/iomgr/socket_mutator.h80
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.c9
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.h5
-rw-r--r--src/core/lib/iomgr/tcp_client.h1
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c16
-rw-r--r--src/core/lib/iomgr/tcp_posix.c8
-rw-r--r--src/core/lib/iomgr/tcp_uv.c99
-rw-r--r--src/core/lib/iomgr/tcp_windows.c5
-rw-r--r--src/core/lib/security/credentials/plugin/plugin_credentials.c2
-rw-r--r--src/core/lib/security/transport/handshake.c2
-rw-r--r--src/core/lib/security/transport/secure_endpoint.c13
-rw-r--r--src/core/lib/transport/transport.c5
-rw-r--r--src/core/lib/transport/transport.h5
-rw-r--r--src/core/lib/transport/transport_impl.h3
25 files changed, 336 insertions, 73 deletions
diff --git a/src/core/ext/census/census_log.h b/src/core/ext/census/census_log.h
index 534ecc5705..1b185a53b9 100644
--- a/src/core/ext/census/census_log.h
+++ b/src/core/ext/census/census_log.h
@@ -84,7 +84,7 @@ const void *census_log_read_next(size_t *bytes_available);
*/
size_t census_log_remaining_space(void);
-/* Returns the number of times gprc_stats_log_start_write() failed due to
+/* Returns the number of times grpc_stats_log_start_write() failed due to
out-of-space. */
int census_log_out_of_space_count(void);
diff --git a/src/core/ext/census/mlog.h b/src/core/ext/census/mlog.h
index a256426f91..18805ad994 100644
--- a/src/core/ext/census/mlog.h
+++ b/src/core/ext/census/mlog.h
@@ -88,7 +88,7 @@ const void* census_log_read_next(size_t* bytes_available);
*/
size_t census_log_remaining_space(void);
-/* Returns the number of times gprc_stats_log_start_write() failed due to
+/* Returns the number of times grpc_stats_log_start_write() failed due to
out-of-space. */
int64_t census_log_out_of_space_count(void);
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 41e177c495..44ac9a017e 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -186,6 +186,7 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
* addresses failed to connect). There won't be any user_data/token
* available */
if (wc_arg->target != NULL) {
+ GPR_ASSERT(wc_arg->lb_token != NULL);
initial_metadata_add_lb_token(wc_arg->initial_metadata,
wc_arg->lb_token_mdelem_storage,
GRPC_MDELEM_REF(wc_arg->lb_token));
@@ -605,10 +606,10 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
* right grpclb status. */
rr_connectivity_data *rr_conn_data = arg;
glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
+ gpr_mu_lock(&glb_policy->mu);
if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN &&
!glb_policy->shutting_down) {
- gpr_mu_lock(&glb_policy->mu);
/* RR not shutting down. Mimic the RR's policy state */
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
rr_conn_data->state, GRPC_ERROR_REF(error),
@@ -617,12 +618,12 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_conn_data->state,
&rr_conn_data->on_change);
- gpr_mu_unlock(&glb_policy->mu);
} else {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"rr_connectivity_cb");
gpr_free(rr_conn_data);
}
+ gpr_mu_unlock(&glb_policy->mu);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@@ -1081,6 +1082,7 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
grpc_op *op = ops;
+ gpr_mu_lock(&glb_policy->mu);
if (glb_policy->lb_response_payload != NULL) {
gpr_backoff_reset(&glb_policy->lb_call_backoff_state);
/* Received data from the LB server. Look inside
@@ -1109,7 +1111,6 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
/* update serverlist */
if (serverlist->num_servers > 0) {
- gpr_mu_lock(&glb_policy->mu);
if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
@@ -1125,7 +1126,6 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
rr_handover_locked(exec_ctx, glb_policy, error);
}
- gpr_mu_unlock(&glb_policy->mu);
} else {
if (grpc_lb_glb_trace) {
gpr_log(GPR_INFO,
@@ -1153,9 +1153,11 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
&glb_policy->lb_on_response_received); /* loop */
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
+ gpr_mu_unlock(&glb_policy->mu);
} else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */
+ gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"lb_on_response_received_empty_payload");
}
@@ -1175,7 +1177,6 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
query_for_backends_locked(exec_ctx, glb_policy);
}
gpr_mu_unlock(&glb_policy->mu);
-
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"grpclb_on_retry_timer");
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 1ffa9165b2..fb8fbae0ab 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -2294,6 +2294,14 @@ static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
}
+/*******************************************************************************
+ * MONITORING
+ */
+static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_transport *t) {
+ return ((grpc_chttp2_transport *)t)->ep;
+}
+
static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
"chttp2",
init_stream,
@@ -2303,7 +2311,8 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
perform_transport_op,
destroy_stream,
destroy_transport,
- chttp2_get_peer};
+ chttp2_get_peer,
+ chttp2_get_endpoint};
grpc_transport *grpc_create_chttp2_transport(
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 2ab9a8c19d..a4c110101e 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -42,6 +42,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
+#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
@@ -1095,6 +1096,11 @@ static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
return NULL;
}
+static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_transport *gt) {
+ return NULL;
+}
+
static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_transport_op *op) {}
@@ -1107,4 +1113,5 @@ const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
perform_op,
destroy_stream,
destroy_transport,
- get_peer};
+ get_peer,
+ get_endpoint};
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index cfc072c0b5..401a2ad4fe 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -298,6 +298,12 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
}
}
+grpc_channel_args *grpc_channel_args_set_socket_mutator(
+ grpc_channel_args *a, grpc_socket_mutator *mutator) {
+ grpc_arg tmp = grpc_socket_mutator_to_arg(mutator);
+ return grpc_channel_args_copy_and_add(a, &tmp, 1);
+}
+
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b) {
int c = GPR_ICMP(a->num_args, b->num_args);
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 1e05303471..88fc0e37a3 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -36,6 +36,7 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include "src/core/lib/iomgr/socket_mutator.h"
// Channel args are intentionally immutable, to avoid the need for locking.
@@ -100,6 +101,13 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b);
+/** Returns a channel arg instance with socket mutator added. The socket mutator
+ * will perform its mutate_fd method on all file descriptors used by the
+ * channel.
+ * If \a a is non-MULL, its args are copied. */
+grpc_channel_args *grpc_channel_args_set_socket_mutator(
+ grpc_channel_args *a, grpc_socket_mutator *mutator);
+
/** Returns the value of argument \a name from \a args, or NULL if not found. */
const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
const char *name);
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index a4b6668276..2d300f4560 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -66,6 +66,8 @@ char* grpc_endpoint_get_peer(grpc_endpoint* ep) {
return ep->vtable->get_peer(ep);
}
+int grpc_endpoint_get_fd(grpc_endpoint* ep) { return ep->vtable->get_fd(ep); }
+
grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) {
return ep->vtable->get_workqueue(ep);
}
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index bf211ca16a..1609b64f2b 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -61,6 +61,7 @@ struct grpc_endpoint_vtable {
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep);
char *(*get_peer)(grpc_endpoint *ep);
+ int (*get_fd)(grpc_endpoint *ep);
};
/* When data is available on the connection, calls the callback with slices.
@@ -73,6 +74,10 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
+/* Get the file descriptor used by \a ep. Return -1 if \a ep is not using an fd.
+ */
+int grpc_endpoint_get_fd(grpc_endpoint *ep);
+
/* Retrieve a reference to the workqueue associated with this endpoint */
grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep);
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index db51ec4939..91041a7c28 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -163,7 +163,7 @@ static void fd_global_shutdown(void);
#define PI_ADD_REF(p, r) pi_add_ref((p))
#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
-#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
+#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
diff --git a/src/core/lib/iomgr/socket_mutator.c b/src/core/lib/iomgr/socket_mutator.c
new file mode 100644
index 0000000000..8b1efb6bab
--- /dev/null
+++ b/src/core/lib/iomgr/socket_mutator.c
@@ -0,0 +1,98 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/iomgr/socket_mutator.h"
+
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/useful.h>
+
+void grpc_socket_mutator_init(grpc_socket_mutator *mutator,
+ const grpc_socket_mutator_vtable *vtable) {
+ mutator->vtable = vtable;
+ gpr_ref_init(&mutator->refcount, 1);
+}
+
+grpc_socket_mutator *grpc_socket_mutator_ref(grpc_socket_mutator *mutator) {
+ gpr_ref(&mutator->refcount);
+ return mutator;
+}
+
+bool grpc_socket_mutator_mutate_fd(grpc_socket_mutator *mutator, int fd) {
+ return mutator->vtable->mutate_fd(fd, mutator);
+}
+
+int grpc_socket_mutator_compare(grpc_socket_mutator *a,
+ grpc_socket_mutator *b) {
+ int c = GPR_ICMP(a, b);
+ if (c != 0) {
+ grpc_socket_mutator *sma = a;
+ grpc_socket_mutator *smb = b;
+ c = GPR_ICMP(sma->vtable, smb->vtable);
+ if (c == 0) {
+ c = sma->vtable->compare(sma, smb);
+ }
+ }
+ return c;
+}
+
+void grpc_socket_mutator_unref(grpc_socket_mutator *mutator) {
+ if (gpr_unref(&mutator->refcount)) {
+ mutator->vtable->destory(mutator);
+ }
+}
+
+static void *socket_mutator_arg_copy(void *p) {
+ return grpc_socket_mutator_ref(p);
+}
+
+static void socket_mutator_arg_destroy(void *p) {
+ grpc_socket_mutator_unref(p);
+}
+
+static int socket_mutator_cmp(void *a, void *b) {
+ return grpc_socket_mutator_compare((grpc_socket_mutator *)a,
+ (grpc_socket_mutator *)b);
+}
+
+static const grpc_arg_pointer_vtable socket_mutator_arg_vtable = {
+ socket_mutator_arg_copy, socket_mutator_arg_destroy, socket_mutator_cmp};
+
+grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_POINTER;
+ arg.key = GRPC_ARG_SOCKET_MUTATOR;
+ arg.value.pointer.vtable = &socket_mutator_arg_vtable;
+ arg.value.pointer.p = mutator;
+ return arg;
+}
diff --git a/src/core/lib/iomgr/socket_mutator.h b/src/core/lib/iomgr/socket_mutator.h
new file mode 100644
index 0000000000..2f5b6c248e
--- /dev/null
+++ b/src/core/lib/iomgr/socket_mutator.h
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H
+#define GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H
+
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/support/sync.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/** The virtual table of grpc_socket_mutator */
+typedef struct {
+ /** Mutates the socket opitons of \a fd */
+ bool (*mutate_fd)(int fd, grpc_socket_mutator *mutator);
+ /** Compare socket mutator \a a and \a b */
+ int (*compare)(grpc_socket_mutator *a, grpc_socket_mutator *b);
+ /** Destroys the socket mutator instance */
+ void (*destory)(grpc_socket_mutator *mutator);
+} grpc_socket_mutator_vtable;
+
+/** The Socket Mutator interface allows changes on socket options */
+struct grpc_socket_mutator {
+ const grpc_socket_mutator_vtable *vtable;
+ gpr_refcount refcount;
+};
+
+/** called by concrete implementations to initialize the base struct */
+void grpc_socket_mutator_init(grpc_socket_mutator *mutator,
+ const grpc_socket_mutator_vtable *vtable);
+
+/** Wrap \a mutator as a grpc_arg */
+grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator);
+
+/** Perform the file descriptor mutation operation of \a mutator on \a fd */
+bool grpc_socket_mutator_mutate_fd(grpc_socket_mutator *mutator, int fd);
+
+/** Compare if \a a and \a b are the same mutator or have same settings */
+int grpc_socket_mutator_compare(grpc_socket_mutator *a, grpc_socket_mutator *b);
+
+grpc_socket_mutator *grpc_socket_mutator_ref(grpc_socket_mutator *mutator);
+void grpc_socket_mutator_unref(grpc_socket_mutator *mutator);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H */
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c
index bc28bbe316..88e9ade253 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.c
+++ b/src/core/lib/iomgr/socket_utils_common_posix.c
@@ -209,6 +209,15 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) {
return GRPC_ERROR_NONE;
}
+/* set a socket using a grpc_socket_mutator */
+grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator) {
+ GPR_ASSERT(mutator);
+ if (!grpc_socket_mutator_mutate_fd(mutator, fd)) {
+ return GRPC_ERROR_CREATE("grpc_socket_mutator failed.");
+ }
+ return GRPC_ERROR_NONE;
+}
+
static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;
static int g_ipv6_loopback_available;
diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h
index 175fb2b717..e84d3781a1 100644
--- a/src/core/lib/iomgr/socket_utils_posix.h
+++ b/src/core/lib/iomgr/socket_utils_posix.h
@@ -39,7 +39,9 @@
#include <sys/socket.h>
#include <unistd.h>
+#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/socket_mutator.h"
/* a wrapper for accept or accept4 */
int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock,
@@ -88,6 +90,9 @@ grpc_error *grpc_set_socket_sndbuf(int fd, int buffer_size_bytes);
/* Tries to set the socket's receive buffer to given size. */
grpc_error *grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes);
+/* Tries to set the socket using a grpc_socket_mutator */
+grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator);
+
/* An enum to keep track of IPv4/IPv6 socket modes.
Currently, this information is only used when a socket is first created, but
diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h
index 18e6e60ebc..0485661316 100644
--- a/src/core/lib/iomgr/tcp_client.h
+++ b/src/core/lib/iomgr/tcp_client.h
@@ -34,6 +34,7 @@
#ifndef GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H
#define GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H
+#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/pollset_set.h"
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index bc08c94ee0..13347735df 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -51,6 +51,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_posix.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/socket_mutator.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/iomgr/timer.h"
@@ -73,7 +74,8 @@ typedef struct {
grpc_channel_args *channel_args;
} async_connect;
-static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) {
+static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd,
+ const grpc_channel_args *channel_args) {
grpc_error *err = GRPC_ERROR_NONE;
GPR_ASSERT(fd >= 0);
@@ -88,6 +90,16 @@ static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) {
}
err = grpc_set_socket_no_sigpipe_if_possible(fd);
if (err != GRPC_ERROR_NONE) goto error;
+ if (channel_args) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) {
+ GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER);
+ grpc_socket_mutator *mutator = channel_args->args[i].value.pointer.p;
+ err = grpc_set_socket_with_mutator(fd, mutator);
+ if (err != GRPC_ERROR_NONE) goto error;
+ }
+ }
+ }
goto done;
error:
@@ -287,7 +299,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy));
addr = &addr4_copy;
}
- if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) {
+ if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
return;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index c48f4e83cf..12a4797e6f 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -493,6 +493,11 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
+static int tcp_get_fd(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return tcp->fd;
+}
+
static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return grpc_fd_get_workqueue(tcp->em_fd);
@@ -511,7 +516,8 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_shutdown,
tcp_destroy,
tcp_get_resource_user,
- tcp_get_peer};
+ tcp_get_peer,
+ tcp_get_fd};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
grpc_resource_quota *resource_quota,
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 8b58c04ff5..6e2ad1dbe9 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -38,14 +38,17 @@
#include <limits.h>
#include <string.h>
+#include <grpc/slice_buffer.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/slice_buffer.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/network_status_tracker.h"
+#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/iomgr/tcp_uv.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
int grpc_tcp_trace = 0;
@@ -62,15 +65,14 @@ typedef struct {
grpc_closure *read_cb;
grpc_closure *write_cb;
- GRPC_SLICE read_slice;
- GRPC_SLICE_buffer *read_slices;
- GRPC_SLICE_buffer *write_slices;
+ grpc_slice read_slice;
+ grpc_slice_buffer *read_slices;
+ grpc_slice_buffer *write_slices;
uv_buf_t *write_buffers;
- grpc_resource_user resource_user;
+ grpc_resource_user *resource_user;
bool shutting_down;
- bool resource_user_shutting_down;
char *peer_string;
grpc_pollset *pollset;
@@ -78,23 +80,23 @@ typedef struct {
static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); }
-static void tcp_free(grpc_tcp *tcp) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user);
+static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp);
- grpc_exec_ctx_finish(&exec_ctx);
}
/*#define GRPC_TCP_REFCOUNT_DEBUG*/
#ifdef GRPC_TCP_REFCOUNT_DEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
-#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file,
- int line) {
+#define TCP_UNREF(exec_ctx, tcp, reason) \
+ tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
+#define TCP_REF(tcp, reason) \
+ tcp_ref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
+static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
+ const char *reason, const char *file, int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count - 1);
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ tcp_free(exec_ctx, tcp);
}
}
@@ -105,11 +107,11 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
gpr_ref(&tcp->refcount);
}
#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_tcp *tcp) {
+static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+ tcp_free(exec_ctx, tcp);
}
}
@@ -122,7 +124,7 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
grpc_tcp *tcp = handle->data;
(void)suggested_size;
tcp->read_slice = grpc_resource_user_slice_malloc(
- &exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ &exec_ctx, tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice);
buf->len = GRPC_SLICE_LENGTH(tcp->read_slice);
grpc_exec_ctx_finish(&exec_ctx);
@@ -130,7 +132,7 @@ static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size,
static void read_callback(uv_stream_t *stream, ssize_t nread,
const uv_buf_t *buf) {
- GRPC_SLICE sub;
+ grpc_slice sub;
grpc_error *error;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp *tcp = stream->data;
@@ -139,7 +141,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
// Nothing happened. Wait for the next callback
return;
}
- TCP_UNREF(tcp, "read");
+ TCP_UNREF(&exec_ctx, tcp, "read");
tcp->read_cb = NULL;
// TODO(murgatroid99): figure out what the return value here means
uv_read_stop(stream);
@@ -147,8 +149,8 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
error = GRPC_ERROR_CREATE("EOF");
} else if (nread > 0) {
// Successful read
- sub = GRPC_SLICE_sub_no_ref(tcp->read_slice, 0, (size_t)nread);
- GRPC_SLICE_buffer_add(tcp->read_slices, sub);
+ sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread);
+ grpc_slice_buffer_add(tcp->read_slices, sub);
error = GRPC_ERROR_NONE;
if (grpc_tcp_trace) {
size_t i;
@@ -156,8 +158,8 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
gpr_log(GPR_DEBUG, "read: error=%s", str);
grpc_error_free_string(str);
for (i = 0; i < tcp->read_slices->count; i++) {
- char *dump = gpr_dump_slice(tcp->read_slices->slices[i],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ char *dump = grpc_dump_slice(tcp->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string,
dump);
gpr_free(dump);
@@ -172,14 +174,14 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
}
static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- GRPC_SLICE_buffer *read_slices, grpc_closure *cb) {
+ grpc_slice_buffer *read_slices, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
int status;
grpc_error *error = GRPC_ERROR_NONE;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_slices = read_slices;
- GRPC_SLICE_buffer_reset_and_unref(read_slices);
+ grpc_slice_buffer_reset_and_unref(read_slices);
TCP_REF(tcp, "read");
// TODO(murgatroid99): figure out what the return value here means
status =
@@ -202,7 +204,7 @@ static void write_callback(uv_write_t *req, int status) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure *cb = tcp->write_cb;
tcp->write_cb = NULL;
- TCP_UNREF(tcp, "write");
+ TCP_UNREF(&exec_ctx, tcp, "write");
if (status == 0) {
error = GRPC_ERROR_NONE;
} else {
@@ -213,28 +215,28 @@ static void write_callback(uv_write_t *req, int status) {
gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
}
gpr_free(tcp->write_buffers);
- grpc_resource_user_free(&exec_ctx, &tcp->resource_user,
+ grpc_resource_user_free(&exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * tcp->write_slices->count);
grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- GRPC_SLICE_buffer *write_slices,
+ grpc_slice_buffer *write_slices,
grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
uv_buf_t *buffers;
unsigned int buffer_count;
unsigned int i;
- GRPC_SLICE *slice;
+ grpc_slice *slice;
uv_write_t *write_req;
if (grpc_tcp_trace) {
size_t j;
for (j = 0; j < write_slices->count; j++) {
- char *data = gpr_dump_slice(write_slices->slices[j],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ char *data = grpc_dump_slice(write_slices->slices[j],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_free(data);
}
@@ -259,7 +261,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->write_cb = cb;
buffer_count = (unsigned int)tcp->write_slices->count;
buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count);
- grpc_resource_user_alloc(exec_ctx, &tcp->resource_user,
+ grpc_resource_user_alloc(exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * buffer_count, NULL);
for (i = 0; i < buffer_count; i++) {
slice = &tcp->write_slices->slices[i];
@@ -295,22 +297,6 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void shutdown_callback(uv_shutdown_t *req, int status) {}
-static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- TCP_UNREF(arg, "resource_user");
-}
-
-static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_tcp *tcp) {
- if (!tcp->resource_user_shutting_down) {
- tcp->resource_user_shutting_down = true;
- TCP_REF(tcp, "resource_user");
- grpc_resource_user_shutdown(
- exec_ctx, &tcp->resource_user,
- grpc_closure_create(resource_user_shutdown_done, tcp));
- }
-}
-
static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
if (!tcp->shutting_down) {
@@ -324,8 +310,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
uv_close((uv_handle_t *)tcp->handle, uv_close_callback);
- uv_resource_user_maybe_shutdown(exec_ctx, tcp);
- TCP_UNREF(tcp, "destroy");
+ TCP_UNREF(exec_ctx, tcp, "destroy");
}
static char *uv_get_peer(grpc_endpoint *ep) {
@@ -335,15 +320,18 @@ static char *uv_get_peer(grpc_endpoint *ep) {
static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- return &tcp->resource_user;
+ return tcp->resource_user;
}
static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
+static int uv_get_fd(grpc_endpoint *ep) { return -1; }
+
static grpc_endpoint_vtable vtable = {
uv_endpoint_read, uv_endpoint_write, uv_get_workqueue,
uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown,
- uv_destroy, uv_get_resource_user, uv_get_peer};
+ uv_destroy, uv_get_resource_user, uv_get_peer,
+ uv_get_fd};
grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,
@@ -364,8 +352,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string);
tcp->shutting_down = false;
- tcp->resource_user_shutting_down = false;
- grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
+ tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 1fb7edc2b1..62afbcef51 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -402,6 +402,8 @@ static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
return tcp->resource_user;
}
+static int win_get_fd(grpc_endpoint *ep) { return -1; }
+
static grpc_endpoint_vtable vtable = {win_read,
win_write,
win_get_workqueue,
@@ -410,7 +412,8 @@ static grpc_endpoint_vtable vtable = {win_read,
win_shutdown,
win_destroy,
win_get_resource_user,
- win_get_peer};
+ win_get_peer,
+ win_get_fd};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_resource_quota *resource_quota,
diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.c b/src/core/lib/security/credentials/plugin/plugin_credentials.c
index 61c10862da..5d950098a0 100644
--- a/src/core/lib/security/credentials/plugin/plugin_credentials.c
+++ b/src/core/lib/security/credentials/plugin/plugin_credentials.c
@@ -104,6 +104,8 @@ static void plugin_md_request_metadata_ready(void *request,
grpc_slice_unref(md_array[i].value);
}
gpr_free(md_array);
+ } else if (num_md == 0) {
+ r->cb(&exec_ctx, r->user_data, NULL, 0, GRPC_CREDENTIALS_OK, NULL);
}
}
gpr_free(r);
diff --git a/src/core/lib/security/transport/handshake.c b/src/core/lib/security/transport/handshake.c
index 2eb5544f43..9623797610 100644
--- a/src/core/lib/security/transport/handshake.c
+++ b/src/core/lib/security/transport/handshake.c
@@ -125,7 +125,7 @@ static void security_handshake_done(grpc_exec_ctx *exec_ctx,
h->auth_context);
} else {
const char *msg = grpc_error_string(error);
- gpr_log(GPR_INFO, "Security handshake failed: %s", msg);
+ gpr_log(GPR_DEBUG, "Security handshake failed: %s", msg);
grpc_error_free_string(msg);
if (h->secure_endpoint != NULL) {
diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c
index fba3314812..1b278410e8 100644
--- a/src/core/lib/security/transport/secure_endpoint.c
+++ b/src/core/lib/security/transport/secure_endpoint.c
@@ -31,7 +31,12 @@
*
*/
-#include "src/core/lib/security/transport/secure_endpoint.h"
+/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
+ using that endpoint. Because of various transitive includes in uv.h,
+ including windows.h on Windows, uv.h must be included before other system
+ headers. Therefore, sockaddr.h must always be included first */
+#include "src/core/lib/iomgr/sockaddr.h"
+
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
@@ -39,6 +44,7 @@
#include <grpc/support/sync.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/security/transport/secure_endpoint.h"
#include "src/core/lib/security/transport/tsi_error.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
@@ -366,6 +372,8 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
return grpc_endpoint_get_peer(ep->wrapped_ep);
}
+static int endpoint_get_fd(grpc_endpoint *secure_ep) { return -1; }
+
static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
return grpc_endpoint_get_workqueue(ep->wrapped_ep);
@@ -385,7 +393,8 @@ static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_shutdown,
endpoint_destroy,
endpoint_get_resource_user,
- endpoint_get_peer};
+ endpoint_get_peer,
+ endpoint_get_fd};
grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *transport,
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 866cd9ea87..b448126da8 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -160,6 +160,11 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
return transport->vtable->get_peer(exec_ctx, transport);
}
+grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport) {
+ return transport->vtable->get_endpoint(exec_ctx, transport);
+}
+
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op,
grpc_error *error) {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 8916b28b72..96c26749c8 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -37,6 +37,7 @@
#include <stddef.h>
#include "src/core/lib/channel/context.h"
+#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
@@ -295,6 +296,10 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
grpc_transport *transport);
+/* Get the endpoint used by \a transport */
+grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
+ grpc_transport *transport);
+
/* Allocate a grpc_transport_op, and preconfigure the on_consumed closure to
\a on_consumed and then delete the returned transport op */
grpc_transport_op *grpc_make_transport_op(grpc_closure *on_consumed);
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index fc7140671b..8553148c35 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -74,6 +74,9 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_get_peer */
char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
+
+ /* implementation of grpc_transport_get_endpoint */
+ grpc_endpoint *(*get_endpoint)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
} grpc_transport_vtable;
/* an instance of a grpc transport */