diff options
43 files changed, 264 insertions, 61 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 3c72c1db27..c194138e4e 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -423,6 +423,12 @@ grpc_call *grpc_channel_create_registered_call( grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag); +/* Returns a newly allocated string representing the endpoint to which this + call is communicating with. The string is in the uri format accepted by + grpc_channel_create. + The returned string should be disposed of with gpr_free(). */ +char *grpc_call_get_peer(grpc_call *call); + /* Create a client channel to 'target'. Additional channel level configuration MAY be provided by grpc_channel_args, though the expectation is that most clients will want to simply pass NULL. See grpc_channel_args definition for @@ -431,8 +437,12 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_channel *grpc_channel_create(const char *target, const grpc_channel_args *args); +/* Return a newly allocated string representing the target a channel was + created for. */ +char *grpc_channel_get_target(grpc_channel *channel); + /* Create a lame client: this client fails every operation attempted on it. */ -grpc_channel *grpc_lame_client_channel_create(void); +grpc_channel *grpc_lame_client_channel_create(const char *target); /* Close and destroy a grpc channel */ void grpc_channel_destroy(grpc_channel *channel); diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index e38dcb58b7..cd7c182ef2 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -191,6 +191,11 @@ void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) { next_elem->filter->start_transport_stream_op(next_elem, op); } +char *grpc_call_next_get_peer(grpc_call_element *elem) { + grpc_call_element *next_elem = elem + 1; + return next_elem->filter->get_peer(next_elem); +} + void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op) { grpc_channel_element *next_elem = elem + 1; next_elem->filter->start_transport_op(next_elem, op); diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 785be8925b..4a608b956e 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -104,6 +104,9 @@ typedef struct { The filter does not need to do any chaining */ void (*destroy_channel_elem)(grpc_channel_element *elem); + /* Implement grpc_call_get_peer() */ + char *(*get_peer)(grpc_call_element *elem); + /* The name of this filter */ const char *name; } grpc_channel_filter; @@ -173,6 +176,8 @@ void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op); /* Call the next operation (depending on call directionality) in a channel stack */ void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op); +/* Pass through a request to get_peer to the next child element */ +char *grpc_call_next_get_peer(grpc_call_element *elem); /* Given the top element of a channel stack, get the channel stack itself */ grpc_channel_stack *grpc_channel_stack_from_top_element( diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 10e01ebbb4..8eb95ca822 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -280,6 +280,26 @@ static grpc_iomgr_closure *merge_into_waiting_op( return consumed_op; } +static char *cc_get_peer(grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_subchannel_call *subchannel_call; + char *result; + + gpr_mu_lock(&calld->mu_state); + if (calld->state == CALL_ACTIVE) { + subchannel_call = calld->subchannel_call; + GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); + gpr_mu_unlock(&calld->mu_state); + result = grpc_subchannel_call_get_peer(subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer"); + return result; + } else { + gpr_mu_unlock(&calld->mu_state); + return grpc_channel_get_target(chand->master); + } +} + static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) { @@ -594,6 +614,7 @@ const grpc_channel_filter grpc_client_channel_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + cc_get_peer, "client-channel", }; diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 14cb3da62d..d07d96f3f9 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -322,4 +322,5 @@ const grpc_channel_filter grpc_compress_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "compress"}; diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 34d07de519..b95ed06f2b 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -119,6 +119,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) { grpc_transport_destroy(cd->transport); } +static char *con_get_peer(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + return grpc_transport_get_peer(chand->transport); +} + const grpc_channel_filter grpc_connected_channel_filter = { con_start_transport_stream_op, con_start_transport_op, @@ -128,6 +133,7 @@ const grpc_channel_filter grpc_connected_channel_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + con_get_peer, "connected", }; diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 63e4912397..6e8c287e3d 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -206,4 +206,5 @@ static void destroy_channel_elem(grpc_channel_element *elem) { const grpc_channel_filter grpc_http_client_filter = { hc_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "http-client"}; + init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + "http-client"}; diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index a6cbb5a7f4..7c798d2fb4 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -280,4 +280,5 @@ static void destroy_channel_elem(grpc_channel_element *elem) { const grpc_channel_filter grpc_http_server_filter = { hs_start_transport_op, grpc_channel_next_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "http-server"}; + init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + "http-server"}; diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 5117723617..d631885aaf 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -127,4 +127,5 @@ const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op, sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "no-op"}; diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 35f172683a..2a12fbc86d 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -640,6 +640,12 @@ void grpc_subchannel_call_unref( } } +char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call) { + grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); + grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); + return top_elem->filter->get_peer(top_elem); +} + void grpc_subchannel_call_process_op(grpc_subchannel_call *call, grpc_transport_stream_op *op) { grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index a23a623277..d1cd33b2af 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -100,6 +100,9 @@ void grpc_subchannel_del_interested_party(grpc_subchannel *channel, void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, grpc_transport_stream_op *op); +/** continue querying for peer */ +char *grpc_subchannel_call_get_peer(grpc_subchannel_call *subchannel_call); + struct grpc_subchannel_args { /** Channel filters for this channel - wrapped factories will likely want to mutate this */ diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 96487958a7..f2b44aad03 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -53,3 +53,7 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); } void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); } + +char *grpc_endpoint_get_peer(grpc_endpoint *ep) { + return ep->vtable->get_peer(ep); +} diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index 881e851800..ee0becf3d6 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -72,12 +72,15 @@ struct grpc_endpoint_vtable { void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); + char *(*get_peer)(grpc_endpoint *ep); }; /* When data is available on the connection, calls the callback with slices. */ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, void *user_data); +char *grpc_endpoint_get_peer(grpc_endpoint *ep); + /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index fa2d2555d6..deae9c6875 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -66,12 +66,12 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, create_sockets(sv); gpr_asprintf(&final_name, "%s:client", name); - p.client = - grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size); + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size, + "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = - grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size); + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size, + "socketpair-client"); gpr_free(final_name); return p; } diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index e91b94f8c8..71ac12e87b 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -36,12 +36,18 @@ #include <errno.h> #include <string.h> -#include "src/core/support/string.h" +#ifdef GPR_POSIX_SOCKET +#include <sys/un.h> +#endif + +#include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/support/string.h" + static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; @@ -161,6 +167,31 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, return ret; } +char *grpc_sockaddr_to_uri(const struct sockaddr *addr) { + char *temp; + char *result; + + switch (addr->sa_family) { + case AF_INET: + grpc_sockaddr_to_string(&temp, addr, 0); + gpr_asprintf(&result, "ipv4:%s", temp); + gpr_free(temp); + return result; + case AF_INET6: + grpc_sockaddr_to_string(&temp, addr, 0); + gpr_asprintf(&result, "ipv6:%s", temp); + gpr_free(temp); + return result; +#ifdef GPR_POSIX_SOCKET + case AF_UNIX: + gpr_asprintf(&result, "unix:%s", ((struct sockaddr_un *)addr)->sun_path); + return result; +#endif + } + + return NULL; +} + int grpc_sockaddr_get_port(const struct sockaddr *addr) { switch (addr->sa_family) { case AF_INET: diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h index bdfb83479b..99f1ed54da 100644 --- a/src/core/iomgr/sockaddr_utils.h +++ b/src/core/iomgr/sockaddr_utils.h @@ -84,4 +84,6 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port); int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, int normalize); +char *grpc_sockaddr_to_uri(const struct sockaddr *addr); + #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index dc0489e64f..427cd86c4e 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -64,6 +64,7 @@ typedef struct { int refs; grpc_iomgr_closure write_closure; grpc_pollset_set *interested_parties; + char *addr_str; } async_connect; static int prepare_socket(const struct sockaddr *addr, int fd) { @@ -99,6 +100,7 @@ static void on_alarm(void *acp, int success) { gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_str); gpr_free(ac); } } @@ -156,7 +158,8 @@ static void on_writable(void *acp, int success) { } } else { grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); - ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, + ac->addr_str); goto finish; } } else { @@ -177,6 +180,7 @@ finish: gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_str); gpr_free(ac); } else { grpc_alarm_cancel(&ac->alarm); @@ -223,13 +227,13 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), err = connect(fd, addr, addr_len); } while (err < 0 && errno == EINTR); - grpc_sockaddr_to_string(&addr_str, addr, 1); + addr_str = grpc_sockaddr_to_uri(addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); fdobj = grpc_fd_create(fd, name); if (err >= 0) { - cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); goto done; } @@ -247,6 +251,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->cb_arg = arg; ac->fd = fdobj; ac->interested_parties = interested_parties; + ac->addr_str = addr_str; + addr_str = NULL; gpr_mu_init(&ac->mu); ac->refs = 2; ac->write_closure.cb = on_writable; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index b6d6efc9fb..1e8432d463 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -44,15 +44,17 @@ #include <sys/socket.h> #include <unistd.h> -#include "src/core/support/string.h" -#include "src/core/debug/trace.h" -#include "src/core/profiling/timers.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/support/string.h" +#include "src/core/debug/trace.h" +#include "src/core/profiling/timers.h" + #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL #else @@ -282,6 +284,8 @@ typedef struct { grpc_iomgr_closure write_closure; grpc_iomgr_closure handle_read_closure; + + char *peer_string; } grpc_tcp; static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); @@ -296,6 +300,7 @@ static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); + gpr_free(tcp->peer_string); gpr_free(tcp); } } @@ -567,13 +572,20 @@ static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { grpc_pollset_add_fd(pollset, tcp->em_fd); } +static char *grpc_tcp_get_peer(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return gpr_strdup(tcp->peer_string); +} + static const grpc_endpoint_vtable vtable = { - grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset, - grpc_tcp_shutdown, grpc_tcp_destroy}; + grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset, + grpc_tcp_shutdown, grpc_tcp_destroy, grpc_tcp_get_peer}; -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { +grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, + const char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; + tcp->peer_string = gpr_strdup(peer_string); tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h index 44279d5a26..d752feaeea 100644 --- a/src/core/iomgr/tcp_posix.h +++ b/src/core/iomgr/tcp_posix.h @@ -53,6 +53,7 @@ extern int grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ -grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size); +grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size, + const char *peer_string); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 5854031c9b..8538600112 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -332,7 +332,7 @@ static void on_read(void *arg, int success) { grpc_set_socket_no_sigpipe_if_possible(fd); - grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); fdobj = grpc_fd_create(fd, name); @@ -342,8 +342,9 @@ static void on_read(void *arg, int success) { for (i = 0; i < sp->server->pollset_count; i++) { grpc_pollset_add_fd(sp->server->pollsets[i], fdobj); } - sp->server->cb(sp->server->cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + sp->server->cb( + sp->server->cb_arg, + grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); gpr_free(name); gpr_free(addr_str); diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 9e49a807f1..9a69f53a5a 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -344,6 +344,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_auth_filter = { - auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "client-auth"}; + auth_start_transport_op, grpc_channel_next_op, + sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "client-auth"}; diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c index 3548198046..e189380ec5 100644 --- a/src/core/security/secure_endpoint.c +++ b/src/core/security/secure_endpoint.c @@ -331,9 +331,14 @@ static void endpoint_add_to_pollset(grpc_endpoint *secure_ep, grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset); } +static char *endpoint_get_peer(grpc_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + return grpc_endpoint_get_peer(ep->wrapped_ep); +} + static const grpc_endpoint_vtable vtable = { endpoint_notify_on_read, endpoint_write, endpoint_add_to_pollset, - endpoint_shutdown, endpoint_unref}; + endpoint_shutdown, endpoint_unref, endpoint_get_peer}; grpc_endpoint *grpc_secure_endpoint_create( struct tsi_frame_protector *protector, grpc_endpoint *transport, diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 10eef6d237..69789c2f0d 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -120,6 +120,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_server_auth_filter = { - auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "server-auth"}; + auth_start_transport_op, grpc_channel_next_op, + sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), + init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "server-auth"}; diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 6e643b591c..2ba851da3d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1253,6 +1253,11 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) { elem->filter->start_transport_stream_op(elem, op); } +char *grpc_call_get_peer(grpc_call *call) { + grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); + return elem->filter->get_peer(elem); +} + grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a6438ff512..4052c65cc6 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -36,12 +36,14 @@ #include <stdlib.h> #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + #include "src/core/iomgr/iomgr.h" #include "src/core/support/string.h" #include "src/core/surface/call.h" #include "src/core/surface/init.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> /** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS. * Avoids needing to take a metadata context lock for sending status @@ -73,6 +75,7 @@ struct grpc_channel { gpr_mu registered_call_mu; registered_call *registered_calls; grpc_iomgr_closure destroy_closure; + char *target; }; #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) @@ -85,13 +88,14 @@ struct grpc_channel { #define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024) grpc_channel *grpc_channel_create_from_filters( - const grpc_channel_filter **filters, size_t num_filters, + const char *target, const grpc_channel_filter **filters, size_t num_filters, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) { size_t i; size_t size = sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters); grpc_channel *channel = gpr_malloc(size); memset(channel, 0, sizeof(*channel)); + channel->target = gpr_strdup(target); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; /* decremented by grpc_channel_destroy */ @@ -137,6 +141,10 @@ grpc_channel *grpc_channel_create_from_filters( return channel; } +char *grpc_channel_get_target(grpc_channel *channel) { + return gpr_strdup(channel->target); +} + static grpc_call *grpc_channel_create_call_internal( grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { @@ -222,6 +230,7 @@ static void destroy_channel(void *p, int ok) { } grpc_mdctx_unref(channel->metadata_context); gpr_mu_destroy(&channel->registered_call_mu); + gpr_free(channel->target); gpr_free(channel); } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 4e03eb4411..9e0646efaa 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -38,7 +38,7 @@ #include "src/core/client_config/subchannel_factory.h" grpc_channel *grpc_channel_create_from_filters( - const grpc_channel_filter **filters, size_t count, + const char *target, const grpc_channel_filter **filters, size_t count, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client); /** Get a (borrowed) pointer to this channels underlying channel stack */ diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 91c7b35550..778f7108fd 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -179,7 +179,8 @@ grpc_channel *grpc_channel_create(const char *target, return NULL; } - channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); + channel = + grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 3f2bb5c8a9..c4215a2cfb 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -47,7 +47,10 @@ typedef struct { grpc_linked_mdelem details; } call_data; -typedef struct { grpc_mdctx *mdctx; } channel_data; +typedef struct { + grpc_mdctx *mdctx; + grpc_channel *master; +} channel_data; static void lame_start_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op) { @@ -82,6 +85,11 @@ static void lame_start_transport_stream_op(grpc_call_element *elem, } } +static char *lame_get_peer(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + return grpc_channel_get_target(chand->master); +} + static void lame_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) { if (op->on_connectivity_state_change) { @@ -112,6 +120,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, GPR_ASSERT(is_first); GPR_ASSERT(is_last); chand->mdctx = mdctx; + chand->master = master; } static void destroy_channel_elem(grpc_channel_element *elem) {} @@ -125,11 +134,12 @@ static const grpc_channel_filter lame_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + lame_get_peer, "lame-client", }; -grpc_channel *grpc_lame_client_channel_create(void) { +grpc_channel *grpc_lame_client_channel_create(const char *target) { static const grpc_channel_filter *filters[] = {&lame_filter}; - return grpc_channel_create_from_filters(filters, 1, NULL, grpc_mdctx_create(), - 1); + return grpc_channel_create_from_filters(target, filters, 1, NULL, + grpc_mdctx_create(), 1); } diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index d87ec97b53..a280311ba0 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -196,13 +196,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, if (grpc_find_security_connector_in_args(args) != NULL) { gpr_log(GPR_ERROR, "Cannot set security context in channel args."); - return grpc_lame_client_channel_create(); + return grpc_lame_client_channel_create(target); } if (grpc_credentials_create_security_connector( creds, target, args, NULL, &connector, &new_args_from_connector) != GRPC_SECURITY_OK) { - return grpc_lame_client_channel_create(); + return grpc_lame_client_channel_create(target); } mdctx = grpc_mdctx_create(); @@ -231,7 +231,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, return NULL; } - channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); + channel = + grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); GRPC_RESOLVER_UNREF(resolver, "create"); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index f2d6b11bc7..5ba6f513c0 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -722,6 +722,7 @@ static const grpc_channel_filter server_surface_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "server", }; @@ -878,8 +879,8 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_transport_perform_op(transport, &op); } - channel = - grpc_channel_create_from_filters(filters, num_filters, args, mdctx, 0); + channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args, + mdctx, 0); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0) ->channel_data; diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index e5e6f445b7..e7901da510 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -286,6 +286,7 @@ struct grpc_chttp2_transport { grpc_endpoint *ep; grpc_mdctx *metadata_context; gpr_refcount refs; + char *peer_string; gpr_mu mu; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c923d5e42f..eb435a2ee8 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -168,6 +168,7 @@ static void destruct_transport(grpc_chttp2_transport *t) { grpc_mdctx_unref(t->metadata_context); + gpr_free(t->peer_string); gpr_free(t); } @@ -217,6 +218,7 @@ static void init_transport(grpc_chttp2_transport *t, gpr_ref_init(&t->refs, 2); gpr_mu_init(&t->mu); grpc_mdctx_ref(mdctx); + t->peer_string = grpc_endpoint_get_peer(ep); t->metadata_context = mdctx; t->endpoint_reading = 1; t->global.next_stream_id = is_client ? 1 : 2; @@ -1069,9 +1071,17 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason, * INTEGRATION GLUE */ -static const grpc_transport_vtable vtable = { - sizeof(grpc_chttp2_stream), init_stream, perform_stream_op, - perform_transport_op, destroy_stream, destroy_transport}; +static char *chttp2_get_peer(grpc_transport *t) { + return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string); +} + +static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), + init_stream, + perform_stream_op, + perform_transport_op, + destroy_stream, + destroy_transport, + chttp2_get_peer}; grpc_transport *grpc_create_chttp2_transport( const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx, diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 2689e3028a..69c00b6a4f 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -65,6 +65,10 @@ void grpc_transport_destroy_stream(grpc_transport *transport, transport->vtable->destroy_stream(transport, stream); } +char *grpc_transport_get_peer(grpc_transport *transport) { + return transport->vtable->get_peer(transport); +} + void grpc_transport_stream_op_finish_with_failure( grpc_transport_stream_op *op) { if (op->send_ops) { diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 64503604ee..553779602a 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -182,4 +182,7 @@ void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ void grpc_transport_destroy(grpc_transport *transport); +/* Get the transports peer */ +char *grpc_transport_get_peer(grpc_transport *transport); + #endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */ diff --git a/src/core/transport/transport_impl.h b/src/core/transport/transport_impl.h index 515721dfb6..d3bbdf6c27 100644 --- a/src/core/transport/transport_impl.h +++ b/src/core/transport/transport_impl.h @@ -58,6 +58,9 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_transport *self); + + /* implementation of grpc_transport_get_peer */ + char *(*get_peer)(grpc_transport *self); } grpc_transport_vtable; /* an instance of a grpc transport */ diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index eca2a40c97..60129afc43 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -37,6 +37,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> + #include "test/core/util/test_config.h" static void channel_init_func(grpc_channel_element *elem, grpc_channel *master, @@ -73,11 +75,14 @@ static void channel_func(grpc_channel_element *elem, grpc_transport_op *op) { ++*(int *)(elem->channel_data); } +static char *get_peer(grpc_call_element *elem) { return gpr_strdup("peer"); } + static void test_create_channel_stack(void) { - const grpc_channel_filter filter = { - call_func, channel_func, sizeof(int), - call_init_func, call_destroy_func, sizeof(int), - channel_init_func, channel_destroy_func, "some_test_filter"}; + const grpc_channel_filter filter = {call_func, channel_func, + sizeof(int), call_init_func, + call_destroy_func, sizeof(int), + channel_init_func, channel_destroy_func, + get_peer, "some_test_filter"}; const grpc_channel_filter *filters = &filter; grpc_channel_stack *channel_stack; grpc_call_stack *call_stack; diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 790758918f..e7c113b36b 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -79,6 +79,7 @@ void test_connect(const char *server_host, const char *client_host, int port, size_t details_capacity = 0; int was_cancelled = 2; grpc_call_details call_details; + char *peer; if (port == 0) { port = grpc_pick_unused_port_or_die(); @@ -185,6 +186,10 @@ void test_connect(const char *server_host, const char *client_host, int port, cq_expect_completion(cqv, tag(1), 1); cq_verify(cqv); + peer = grpc_call_get_peer(c); + gpr_log(GPR_DEBUG, "got peer: '%s'", peer); + gpr_free(peer); + GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); GPR_ASSERT(0 == strcmp(details, "xyz")); GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index 37b61cf37f..807fc8e7bc 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -80,7 +80,7 @@ static void client_setup_transport(void *ts, grpc_transport *transport, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( - filters, nfilters, cs->client_args, mdctx, 1); + "socketpair-target", filters, nfilters, cs->client_args, mdctx, 1); cs->f->client = channel; diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index 2ec2697288..21d4404237 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -80,7 +80,7 @@ static void client_setup_transport(void *ts, grpc_transport *transport, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( - filters, nfilters, cs->client_args, mdctx, 1); + "socketpair-target", filters, nfilters, cs->client_args, mdctx, 1); cs->f->client = channel; diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c index 3aa364c5e0..c59628b959 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c @@ -81,7 +81,7 @@ static void client_setup_transport(void *ts, grpc_transport *transport, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( - filters, nfilters, cs->client_args, mdctx, 1); + "socketpair-target", filters, nfilters, cs->client_args, mdctx, 1); cs->f->client = channel; diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 6194b841d8..ca783afccd 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -114,11 +114,17 @@ static void simple_request_body(grpc_end2end_test_fixture f) { char *details = NULL; size_t details_capacity = 0; int was_cancelled = 2; + char *peer; c = grpc_channel_create_call(f.client, f.cq, "/foo", "foo.test.google.fr:1234", deadline); GPR_ASSERT(c); + peer = grpc_call_get_peer(c); + GPR_ASSERT(peer != NULL); + gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer); + gpr_free(peer); + grpc_metadata_array_init(&initial_metadata_recv); grpc_metadata_array_init(&trailing_metadata_recv); grpc_metadata_array_init(&request_metadata_recv); @@ -151,6 +157,15 @@ static void simple_request_body(grpc_end2end_test_fixture f) { cq_expect_completion(cqv, tag(101), 1); cq_verify(cqv); + peer = grpc_call_get_peer(s); + GPR_ASSERT(peer != NULL); + gpr_log(GPR_DEBUG, "server_peer=%s", peer); + gpr_free(peer); + peer = grpc_call_get_peer(c); + GPR_ASSERT(peer != NULL); + gpr_log(GPR_DEBUG, "client_peer=%s", peer); + gpr_free(peer); + op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index a23c64928e..4e7fb446a7 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -172,7 +172,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); grpc_endpoint_add_to_pollset(ep, &g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); @@ -207,7 +207,8 @@ static void large_read_test(ssize_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size, + "test"); grpc_endpoint_add_to_pollset(ep, &g_pollset); written_bytes = fill_socket(sv[0]); @@ -340,7 +341,7 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), - GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); grpc_endpoint_add_to_pollset(ep, &g_pollset); state.ep = ep; @@ -394,7 +395,7 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"), - GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); grpc_endpoint_add_to_pollset(ep, &g_pollset); close(sv[0]); @@ -459,10 +460,10 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( grpc_endpoint_test_fixture f; create_sockets(sv); - f.client_ep = - grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), slice_size); - f.server_ep = - grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), slice_size); + f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), + slice_size, "test"); + f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), + slice_size, "test"); grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset); grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset); diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c index b2facd33b1..3a7a3a36ee 100644 --- a/test/core/surface/lame_client_test.c +++ b/test/core/surface/lame_client_test.c @@ -57,7 +57,7 @@ int main(int argc, char **argv) { grpc_metadata_array_init(&trailing_metadata_recv); - chan = grpc_lame_client_channel_create(); + chan = grpc_lame_client_channel_create("lampoon:national"); GPR_ASSERT(chan); cq = grpc_completion_queue_create(); call = grpc_channel_create_call(chan, cq, "/Foo", "anywhere", |