diff options
author | Craig Tiller <ctiller@google.com> | 2017-04-07 06:46:08 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-04-07 06:46:08 -0700 |
commit | 5f399d173bd73b4d48442e2241859c4a5fb854f1 (patch) | |
tree | 272adf760f8bf0e37347e131075a51b610746785 /src | |
parent | 7b6c7a3927f75704cb3f7f02024f284b6817405a (diff) | |
parent | b567bb43670f7f2523164fdabdea8786a7595854 (diff) |
Merge github.com:grpc/grpc into minimal_test
Diffstat (limited to 'src')
50 files changed, 1111 insertions, 440 deletions
diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c index 0ae2357276..8dc17acfde 100644 --- a/src/core/ext/filters/max_age/max_age_filter.c +++ b/src/core/ext/filters/max_age/max_age_filter.c @@ -41,7 +41,6 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/http2_errors.h" -#include "src/core/lib/transport/service_config.h" #define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX #define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c index f46e849932..6ab176e8ad 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c @@ -57,12 +57,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server, char *name; gpr_asprintf(&name, "fd:%d", fd); - grpc_resource_quota *resource_quota = grpc_resource_quota_from_channel_args( - grpc_server_get_channel_args(server)); grpc_endpoint *server_endpoint = - grpc_tcp_create(grpc_fd_create(fd, name), resource_quota, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name); - grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); + grpc_tcp_create(&exec_ctx, grpc_fd_create(fd, name), + grpc_server_get_channel_args(server), name); gpr_free(name); diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h index f9de0c715e..6407a6ad3f 100644 --- a/src/core/lib/iomgr/endpoint_pair.h +++ b/src/core/lib/iomgr/endpoint_pair.h @@ -41,8 +41,7 @@ typedef struct { grpc_endpoint *server; } grpc_endpoint_pair; -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size); +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + grpc_channel_args *args); #endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c index b9ff969e81..5542a372d8 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.c +++ b/src/core/lib/iomgr/endpoint_pair_posix.c @@ -62,22 +62,25 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE); } -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + grpc_channel_args *args) { int sv[2]; grpc_endpoint_pair p; char *final_name; create_sockets(sv); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), resource_quota, - read_slice_size, "socketpair-server"); + p.client = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], final_name), args, + "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), resource_quota, - read_slice_size, "socketpair-client"); + p.server = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[0], final_name), args, + "socketpair-client"); gpr_free(final_name); + + grpc_exec_ctx_finish(&exec_ctx); return p; } diff --git a/src/core/lib/iomgr/endpoint_pair_uv.c b/src/core/lib/iomgr/endpoint_pair_uv.c index ff24894c6d..9718eb0523 100644 --- a/src/core/lib/iomgr/endpoint_pair_uv.c +++ b/src/core/lib/iomgr/endpoint_pair_uv.c @@ -41,9 +41,8 @@ #include "src/core/lib/iomgr/endpoint_pair.h" -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + grpc_channel_args *args) { grpc_endpoint_pair endpoint_pair; // TODO(mlumish): implement this properly under libuv GPR_ASSERT(false && diff --git a/src/core/lib/iomgr/endpoint_pair_windows.c b/src/core/lib/iomgr/endpoint_pair_windows.c index 93f71b745c..25d6264dfb 100644 --- a/src/core/lib/iomgr/endpoint_pair_windows.c +++ b/src/core/lib/iomgr/endpoint_pair_windows.c @@ -83,15 +83,18 @@ static void create_sockets(SOCKET sv[2]) { } grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size) { + const char *name, grpc_channel_args *channel_args) { SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); - p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), - resource_quota, "endpoint:server"); - p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), - resource_quota, "endpoint:client"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + p.client = grpc_tcp_create(&exec_ctx, + grpc_winsocket_create(sv[1], "endpoint:client"), + channel_args, "endpoint:server"); + p.server = grpc_tcp_create(&exec_ctx, + grpc_winsocket_create(sv[0], "endpoint:server"), + channel_args, "endpoint:client"); + grpc_exec_ctx_finish(&exec_ctx); return p; } diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 7014b98349..e5cf54f10a 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1717,7 +1717,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker.pt_id = pthread_self(); gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0); - *worker_hdl = &worker; + if (worker_hdl) *worker_hdl = &worker; gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); @@ -1795,7 +1795,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->po.mu); } - *worker_hdl = NULL; + if (worker_hdl) *worker_hdl = NULL; gpr_tls_set(&g_current_thread_pollset, (intptr_t)0); gpr_tls_set(&g_current_thread_worker, (intptr_t)0); diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index d90f223362..9834cdd197 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -871,7 +871,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; - *worker_hdl = &worker; + if (worker_hdl) *worker_hdl = &worker; grpc_error *error = GRPC_ERROR_NONE; /* Avoid malloc for small number of elements. */ @@ -1092,7 +1092,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); } } - *worker_hdl = NULL; + if (worker_hdl) *worker_hdl = NULL; GPR_TIMER_END("pollset_work", 0); GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error)); return error; diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index b5be5504b9..13409a4de8 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -111,6 +111,12 @@ static void try_engine(const char *engine) { } } +/* This should be used for testing purposes ONLY */ +void grpc_set_event_engine_test_only( + const grpc_event_engine_vtable *ev_engine) { + g_event_engine = ev_engine; +} + /* Call this only after calling grpc_event_engine_init() */ const char *grpc_get_poll_strategy_name() { return g_poll_strategy_name; } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1a9e5c115a..becc4d359e 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -183,4 +183,7 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; +/* This should be used for testing purposes ONLY */ +void grpc_set_event_engine_test_only(const grpc_event_engine_vtable *); + #endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index e19ce697b8..9bf3cdac89 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -75,6 +75,10 @@ void grpc_pollset_destroy(grpc_pollset *pollset); and it is guaranteed that it will not be released by grpc_pollset_work AFTER worker has been destroyed. + It's legal for worker to be NULL: in that case, this specific thread can not + be directly woken with a kick, but maybe be indirectly (with a kick against + the pollset as a whole). + Tries not to block past deadline. May call grpc_closure_list_run on grpc_closure_list, without holding the pollset diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 17043c1ea1..04c6b71747 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -120,7 +120,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; - *worker_hdl = &worker; + if (worker_hdl) *worker_hdl = &worker; int added_worker = 0; worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = @@ -185,7 +185,7 @@ done: remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } gpr_cv_destroy(&worker.cv); - *worker_hdl = NULL; + if (worker_hdl) *worker_hdl = NULL; return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 8dcd80d001..c3ee878651 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -142,6 +142,8 @@ struct grpc_resource_quota { /* Amount of free memory in the resource quota */ int64_t free_pool; + gpr_atm last_size; + /* Has rq_step been scheduled to occur? */ bool step_scheduled; /* Are we currently reclaiming memory */ @@ -581,6 +583,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { resource_quota->combiner = grpc_combiner_create(NULL); resource_quota->free_pool = INT64_MAX; resource_quota->size = INT64_MAX; + gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX); resource_quota->step_scheduled = false; resource_quota->reclaiming = false; gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0); @@ -643,11 +646,17 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, rq_resize_args *a = gpr_malloc(sizeof(*a)); a->resource_quota = grpc_resource_quota_ref_internal(resource_quota); a->size = (int64_t)size; + gpr_atm_no_barrier_store(&resource_quota->last_size, + (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size)); grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx); grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } +size_t grpc_resource_quota_peek_size(grpc_resource_quota *resource_quota) { + return (size_t)gpr_atm_no_barrier_load(&resource_quota->last_size); +} + /******************************************************************************* * grpc_resource_user channel args api */ diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index b9f62cbf83..6f99be0d51 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -90,6 +90,8 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args( double grpc_resource_quota_get_memory_pressure( grpc_resource_quota *resource_quota); +size_t grpc_resource_quota_peek_size(grpc_resource_quota *resource_quota); + typedef struct grpc_resource_user grpc_resource_user; grpc_resource_user *grpc_resource_user_create( diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 0485661316..bc367bdfa5 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -40,10 +40,6 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/resolve_address.h" -/* Channel arg (integer) setting how large a slice to try and read from the wire - each time recvmsg (or equivalent) is called */ -#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size" - /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and NULL on failure). diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index a108b10da6..a2692707d9 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -137,29 +137,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { grpc_endpoint *grpc_tcp_client_create_from_fd( grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, const char *addr_str) { - size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; - grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); - if (channel_args != NULL) { - for (size_t i = 0; i < channel_args->num_args; i++) { - if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { - grpc_integer_options options = {(int)tcp_read_chunk_size, 1, - 8 * 1024 * 1024}; - tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer( - &channel_args->args[i], options); - } else if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); - resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); - } - } - } - - grpc_endpoint *ep = - grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str); - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); - return ep; + return grpc_tcp_create(exec_ctx, fd, channel_args, addr_str); } static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index a356564766..d6baca50ba 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -63,7 +63,7 @@ typedef struct { int refs; grpc_closure on_connect; grpc_endpoint **endpoint; - grpc_resource_quota *resource_quota; + grpc_channel_args *channel_args; } async_connect; static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx, @@ -72,7 +72,7 @@ static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx, int done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (done) { - grpc_resource_quota_unref_internal(exec_ctx, ac->resource_quota); + grpc_channel_args_destroy(exec_ctx, ac->channel_args); gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_name); gpr_free(ac); @@ -119,7 +119,8 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (!wsa_success) { error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); } else { - *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name); + *ep = + grpc_tcp_create(exec_ctx, socket, ac->channel_args, ac->addr_name); socket = NULL; } } else { @@ -152,17 +153,6 @@ static void tcp_client_connect_impl( grpc_winsocket_callback_info *info; grpc_error *error = GRPC_ERROR_NONE; - grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); - if (channel_args != NULL) { - for (size_t i = 0; i < channel_args->num_args; i++) { - if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); - resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); - } - } - } - *endpoint = NULL; /* Use dualstack sockets where available. */ @@ -225,7 +215,7 @@ static void tcp_client_connect_impl( ac->refs = 2; ac->addr_name = grpc_sockaddr_to_uri(addr); ac->endpoint = endpoint; - ac->resource_quota = resource_quota; + ac->channel_args = grpc_channel_args_copy(channel_args); grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); @@ -247,7 +237,6 @@ failure: } else if (sock != INVALID_SOCKET) { closesocket(sock); } - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); grpc_closure_sched(exec_ctx, on_done, final_error); } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 4d7cf3ff51..5f4b38de2b 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -52,7 +52,9 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/profiling/timers.h" @@ -80,10 +82,14 @@ typedef struct { int fd; bool finished_edge; msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */ - size_t slice_size; + double target_length; + double bytes_read_this_round; gpr_refcount refcount; gpr_atm shutdown_count; + int min_read_chunk_size; + int max_read_chunk_size; + /* garbage after the last read */ grpc_slice_buffer last_read_buffer; @@ -108,6 +114,42 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +static void add_to_estimate(grpc_tcp *tcp, size_t bytes) { + tcp->bytes_read_this_round += (double)bytes; +} + +static void finish_estimate(grpc_tcp *tcp) { + /* If we read >80% of the target buffer in one read loop, increase the size + of the target buffer to either the amount read, or twice its previous + value */ + if (tcp->bytes_read_this_round > tcp->target_length * 0.8) { + tcp->target_length = + GPR_MAX(2 * tcp->target_length, tcp->bytes_read_this_round); + } else { + tcp->target_length = + 0.99 * tcp->target_length + 0.01 * tcp->bytes_read_this_round; + } + tcp->bytes_read_this_round = 0; +} + +static size_t get_target_read_size(grpc_tcp *tcp) { + grpc_resource_quota *rq = grpc_resource_user_quota(tcp->resource_user); + double pressure = grpc_resource_quota_get_memory_pressure(rq); + double target = + tcp->target_length * (pressure > 0.8 ? (1.0 - pressure) / 0.2 : 1.0); + size_t sz = (((size_t)GPR_CLAMP(target, tcp->min_read_chunk_size, + tcp->max_read_chunk_size)) + + 255) & + ~(size_t)255; + /* don't use more than 1/16th of the overall resource quota for a single read + * alloc */ + size_t rqmax = grpc_resource_quota_peek_size(rq); + if (sz > rqmax / 16 && rqmax > 1024) { + sz = rqmax / 16; + } + return sz; +} + static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) { return grpc_error_set_str( grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd), @@ -232,9 +274,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { /* NB: After calling call_read_cb a parallel call of the read handler may * be running. */ if (errno == EAGAIN) { - if (tcp->iov_size > 1) { - tcp->iov_size /= 2; - } + finish_estimate(tcp); /* We've consumed the edge, request a new one */ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { @@ -253,14 +293,13 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } else { + add_to_estimate(tcp, (size_t)read_bytes); GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { grpc_slice_buffer_trim_end( tcp->incoming_buffer, tcp->incoming_buffer->length - (size_t)read_bytes, &tcp->last_read_buffer); - } else if (tcp->iov_size < MAX_READ_IOVEC) { - ++tcp->iov_size; } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE); @@ -285,11 +324,11 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, } static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { - if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { - grpc_resource_user_alloc_slices( - exec_ctx, &tcp->slice_allocator, tcp->slice_size, - (size_t)tcp->iov_size - tcp->incoming_buffer->count, - tcp->incoming_buffer); + size_t target_read_size = get_target_read_size(tcp); + if (tcp->incoming_buffer->length < target_read_size && + tcp->incoming_buffer->count < MAX_READ_IOVEC) { + grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, + target_read_size, 1, tcp->incoming_buffer); } else { tcp_do_read(exec_ctx, tcp); } @@ -540,9 +579,50 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_get_peer, tcp_get_fd}; -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, - grpc_resource_quota *resource_quota, - size_t slice_size, const char *peer_string) { +#define MAX_CHUNK_SIZE 32 * 1024 * 1024 + +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, + const grpc_channel_args *channel_args, + const char *peer_string) { + int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; + int tcp_max_read_chunk_size = 4 * 1024 * 1024; + int tcp_min_read_chunk_size = 256; + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + MAX_CHUNK_SIZE}; + tcp_read_chunk_size = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + MAX_CHUNK_SIZE}; + tcp_min_read_chunk_size = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + MAX_CHUNK_SIZE}; + tcp_max_read_chunk_size = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + channel_args->args[i].value.pointer.p); + } + } + } + + if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) { + tcp_min_read_chunk_size = tcp_max_read_chunk_size; + } + tcp_read_chunk_size = GPR_CLAMP(tcp_read_chunk_size, tcp_min_read_chunk_size, + tcp_max_read_chunk_size); + grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->peer_string = gpr_strdup(peer_string); @@ -552,7 +632,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, tcp->release_fd_cb = NULL; tcp->release_fd = NULL; tcp->incoming_buffer = NULL; - tcp->slice_size = slice_size; + tcp->target_length = (double)tcp_read_chunk_size; + tcp->min_read_chunk_size = tcp_min_read_chunk_size; + tcp->max_read_chunk_size = tcp_max_read_chunk_size; + tcp->bytes_read_this_round = 0; tcp->iov_size = 1; tcp->finished_edge = true; /* paired with unref in grpc_tcp_destroy */ @@ -569,6 +652,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); return &tcp->base; } diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index 1c0d13f96e..1ad5788331 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -47,14 +47,13 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" -#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 - extern int grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ -grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_resource_quota *resource_quota, - size_t read_slice_size, const char *peer_string); +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + const grpc_channel_args *args, + const char *peer_string); /* Return the tcp endpoint's fd, or -1 if this is not available. Does not release the fd. diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index d6a017cf7f..e66ffc9b1c 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -59,6 +59,7 @@ #include <grpc/support/time.h> #include <grpc/support/useful.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -90,7 +91,6 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; - s->resource_quota = grpc_resource_quota_create(NULL); s->expand_wildcard_addrs = false; for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { @@ -98,27 +98,14 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, s->so_reuseport = has_so_reuseport && (args->args[i].value.integer != 0); } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT " must be an integer"); } - } else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { - if (args->args[i].type == GRPC_ARG_POINTER) { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - s->resource_quota = - grpc_resource_quota_ref_internal(args->args[i].value.pointer.p); - } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - gpr_free(s); - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); - } } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { s->expand_wildcard_addrs = (args->args[i].value.integer != 0); } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); return GRPC_ERROR_CREATE_FROM_STATIC_STRING( GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer"); @@ -138,6 +125,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, s->head = NULL; s->tail = NULL; s->nports = 0; + s->channel_args = grpc_channel_args_copy(args); gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0); *server = s; return GRPC_ERROR_NONE; @@ -158,8 +146,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { s->head = sp->next; gpr_free(sp); } - - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); + grpc_channel_args_destroy(exec_ctx, s->channel_args); gpr_free(s); } @@ -286,8 +273,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, - grpc_tcp_create(fdobj, sp->server->resource_quota, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), + grpc_tcp_create(exec_ctx, fdobj, sp->server->channel_args, addr_str), read_notifier_pollset, acceptor); gpr_free(name); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index f5dc8532f9..c15e2e1493 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -103,7 +103,8 @@ struct grpc_tcp_server { /* next pollset to assign a channel to */ gpr_atm next_pollset_to_assign; - grpc_resource_quota *resource_quota; + /* channel args for this server */ + grpc_channel_args *channel_args; }; /* If successful, add a listener to \a s for \a addr, set \a dsmode for the diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 12ce7d3fdd..4c17f08918 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -46,6 +46,7 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/pollset_windows.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -102,7 +103,7 @@ struct grpc_tcp_server { /* shutdown callback */ grpc_closure *shutdown_complete; - grpc_resource_quota *resource_quota; + grpc_channel_args *channel_args; }; /* Public function. Allocates the proper data structures to hold a @@ -112,21 +113,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, const grpc_channel_args *args, grpc_tcp_server **server) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); - s->resource_quota = grpc_resource_quota_create(NULL); - for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { - if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { - if (args->args[i].type == GRPC_ARG_POINTER) { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - s->resource_quota = - grpc_resource_quota_ref_internal(args->args[i].value.pointer.p); - } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - gpr_free(s); - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); - } - } - } + s->channel_args = grpc_channel_args_copy(args); gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); s->active_ports = 0; @@ -155,7 +142,7 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg, grpc_winsocket_destroy(sp->socket); gpr_free(sp); } - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); + grpc_channel_args_destroy(exec_ctx, s->channel_args); gpr_free(s); } @@ -383,8 +370,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_free(utf8_message); } gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string); - ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name), - sp->server->resource_quota, peer_name_string); + ep = grpc_tcp_create(exec_ctx, grpc_winsocket_create(sock, fd_name), + sp->server->channel_args, peer_name_string); gpr_free(fd_name); gpr_free(peer_name_string); } else { diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 9134883226..f74aa68793 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -430,9 +430,19 @@ static grpc_endpoint_vtable vtable = {win_read, win_get_peer, win_get_fd}; -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, - grpc_resource_quota *resource_quota, +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, + grpc_channel_args *channel_args, char *peer_string) { + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + channel_args->args[i].value.pointer.p); + } + } + } grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h index 4402de1c38..abafdb22d2 100644 --- a/src/core/lib/iomgr/tcp_windows.h +++ b/src/core/lib/iomgr/tcp_windows.h @@ -50,8 +50,8 @@ /* Create a tcp endpoint given a winsock handle. * Takes ownership of the handle. */ -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, - grpc_resource_quota *resource_quota, +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, + grpc_channel_args *channel_args, char *peer_string); grpc_error *grpc_tcp_prepare_socket(SOCKET sock); diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index c37ead2318..97d50a91be 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -145,16 +145,28 @@ typedef struct batch_control { grpc_transport_stream_op_batch op; } batch_control; +typedef struct { + gpr_mu child_list_mu; + grpc_call *first_child; +} parent_call; + +typedef struct { + grpc_call *parent; + /** siblings: children of the same parent form a list, and this list is + protected under + parent->mu */ + grpc_call *sibling_next; + grpc_call *sibling_prev; +} child_call; + struct grpc_call { gpr_arena *arena; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; - grpc_call *parent; - grpc_call *first_child; gpr_timespec start_time; - /* protects first_child, and child next/prev links */ - gpr_mu child_list_mu; + /* parent_call* */ gpr_atm parent_call_atm; + child_call *child_call; /* client or server call */ bool is_client; @@ -206,12 +218,6 @@ struct grpc_call { int send_extra_metadata_count; gpr_timespec send_deadline; - /** siblings: children of the same parent form a list, and this list is - protected under - parent->mu */ - grpc_call *sibling_next; - grpc_call *sibling_prev; - grpc_slice_buffer_stream sending_stream; grpc_byte_stream *receiving_stream; @@ -276,6 +282,23 @@ static void add_init_error(grpc_error **composite, grpc_error *new) { *composite = grpc_error_add_child(*composite, new); } +static parent_call *get_or_create_parent_call(grpc_call *call) { + parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); + if (p == NULL) { + p = gpr_arena_alloc(call->arena, sizeof(*p)); + gpr_mu_init(&p->child_list_mu); + if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) { + gpr_mu_destroy(&p->child_list_mu); + p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); + } + } + return p; +} + +static parent_call *get_parent_call(grpc_call *call) { + return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); +} + grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, const grpc_call_create_args *args, grpc_call **out_call) { @@ -291,10 +314,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, sizeof(grpc_call) + channel_stack->call_stack_size); call->arena = arena; *out_call = call; - gpr_mu_init(&call->child_list_mu); call->channel = args->channel; call->cq = args->cq; - call->parent = args->parent_call; call->start_time = gpr_now(GPR_CLOCK_MONOTONIC); /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); @@ -326,11 +347,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); if (args->parent_call != NULL) { + child_call *cc = call->child_call = + gpr_arena_alloc(arena, sizeof(child_call)); + call->child_call->parent = args->parent_call; + GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); GPR_ASSERT(call->is_client); GPR_ASSERT(!args->parent_call->is_client); - gpr_mu_lock(&args->parent_call->child_list_mu); + parent_call *pc = get_or_create_parent_call(args->parent_call); + + gpr_mu_lock(&pc->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( @@ -364,17 +391,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } } - if (args->parent_call->first_child == NULL) { - args->parent_call->first_child = call; - call->sibling_next = call->sibling_prev = call; + if (pc->first_child == NULL) { + pc->first_child = call; + cc->sibling_next = cc->sibling_prev = call; } else { - call->sibling_next = args->parent_call->first_child; - call->sibling_prev = args->parent_call->first_child->sibling_prev; - call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = - call; + cc->sibling_next = pc->first_child; + cc->sibling_prev = pc->first_child->child_call->sibling_prev; + cc->sibling_next->child_call->sibling_prev = + cc->sibling_prev->child_call->sibling_next = call; } - gpr_mu_unlock(&args->parent_call->child_list_mu); + gpr_mu_unlock(&pc->child_list_mu); } call->send_deadline = send_deadline; @@ -469,7 +496,10 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - gpr_mu_destroy(&c->child_list_mu); + parent_call *pc = get_parent_call(c); + if (pc != NULL) { + gpr_mu_destroy(&pc->child_list_mu); + } for (ii = 0; ii < c->send_extra_metadata_count; ii++) { GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); } @@ -499,31 +529,31 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, } void grpc_call_destroy(grpc_call *c) { - int cancel; - grpc_call *parent = c->parent; + child_call *cc = c->child_call; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_call_destroy", 0); GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); - if (parent) { - gpr_mu_lock(&parent->child_list_mu); - if (c == parent->first_child) { - parent->first_child = c->sibling_next; - if (c == parent->first_child) { - parent->first_child = NULL; + if (cc) { + parent_call *pc = get_parent_call(cc->parent); + gpr_mu_lock(&pc->child_list_mu); + if (c == pc->first_child) { + pc->first_child = cc->sibling_next; + if (c == pc->first_child) { + pc->first_child = NULL; } } - c->sibling_prev->sibling_next = c->sibling_next; - c->sibling_next->sibling_prev = c->sibling_prev; - gpr_mu_unlock(&parent->child_list_mu); - GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); + cc->sibling_prev->child_call->sibling_next = cc->sibling_next; + cc->sibling_next->child_call->sibling_prev = cc->sibling_prev; + gpr_mu_unlock(&pc->child_list_mu); + GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child"); } GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && - !gpr_atm_acq_load(&c->received_final_op_atm); + bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 && + gpr_atm_acq_load(&c->received_final_op_atm) == 0; if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); @@ -1079,7 +1109,6 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) { static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { - grpc_call *child_call; grpc_call *next_child_call; grpc_call *call = bctl->call; grpc_error *error = consolidate_batch_errors(bctl); @@ -1104,21 +1133,25 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); - gpr_mu_lock(&call->child_list_mu); - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); + parent_call *pc = get_parent_call(call); + if (pc != NULL) { + grpc_call *child; + gpr_mu_lock(&pc->child_list_mu); + child = pc->first_child; + if (child != NULL) { + do { + next_child_call = child->child_call->sibling_next; + if (child->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); + cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel"); + } + child = next_child_call; + } while (child != pc->first_child); + } + gpr_mu_unlock(&pc->child_list_mu); } - gpr_mu_unlock(&call->child_list_mu); if (call->is_client) { get_final_status(call, set_status_value_directly, diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b4594817e4..3273addf1d 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -345,7 +345,6 @@ static void dump_pending_tags(grpc_completion_queue *cc) {} grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; - grpc_pollset_worker *worker = NULL; gpr_timespec now; GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -426,8 +425,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_lock(cc->mu); continue; } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), - &worker, now, iteration_deadline); + grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, + now, iteration_deadline); if (err != GRPC_ERROR_NONE) { gpr_mu_unlock(cc->mu); const char *msg = grpc_error_string(err); diff --git a/src/csharp/global.json b/src/csharp/global.json index 32ff399ef9..f3c33cef6a 100644 --- a/src/csharp/global.json +++ b/src/csharp/global.json @@ -1,5 +1,5 @@ { "sdk": { - "version": "1.0.0-preview2-003121" + "version": "1.0.0-preview2-003131" } }
\ No newline at end of file diff --git a/src/node/index.js b/src/node/index.js index a294aad8ee..071bfd7927 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -52,42 +52,89 @@ var Metadata = require('./src/metadata.js'); var grpc = require('./src/grpc_extension'); +var protobuf_js_5_common = require('./src/protobuf_js_5_common'); +var protobuf_js_6_common = require('./src/protobuf_js_6_common'); + grpc.setDefaultRootsPem(fs.readFileSync(SSL_ROOTS_PATH, 'ascii')); /** - * Load a gRPC object from an existing ProtoBuf.Reflect object. - * @param {ProtoBuf.Reflect.Namespace} value The ProtoBuf object to load. - * @param {Object=} options Options to apply to the loaded object + * Load a ProtoBuf.js object as a gRPC object. The options object can provide + * the following options: + * - binaryAsBase64: deserialize bytes values as base64 strings instead of + * Buffers. Defaults to false + * - longsAsStrings: deserialize long values as strings instead of objects. + * Defaults to true + * - enumsAsStrings: deserialize enum values as strings instead of numbers. + * Defaults to true + * - deprecatedArgumentOrder: Use the beta method argument order for client + * methods, with optional arguments after the callback. Defaults to false. + * This option is only a temporary stopgap measure to smooth an API breakage. + * It is deprecated, and new code should not use it. + * - protobufjsVersion: Available values are 5, 6, and 'detect'. 5 and 6 + * respectively indicate that an object from the corresponding version of + * ProtoBuf.js is provided in the value argument. If the option is 'detect', + * gRPC will guess what the version is based on the structure of the value. + * Defaults to 'detect'. + * @param {Object} value The ProtoBuf.js reflection object to load + * @param {Object=} options Options to apply to the loaded file * @return {Object<string, *>} The resulting gRPC object */ exports.loadObject = function loadObject(value, options) { - var result = {}; - if (value.className === 'Namespace') { - _.each(value.children, function(child) { - result[child.name] = loadObject(child, options); - }); - return result; - } else if (value.className === 'Service') { - return client.makeProtobufClientConstructor(value, options); - } else if (value.className === 'Message' || value.className === 'Enum') { - return value.build(); + options = _.defaults(options, common.defaultGrpcOptions); + options = _.defaults(options, {'protobufjsVersion': 'detect'}); + var protobufjsVersion; + if (options.protobufjsVersion === 'detect') { + if (protobuf_js_6_common.isProbablyProtobufJs6(value)) { + protobufjsVersion = 6; + } else if (protobuf_js_5_common.isProbablyProtobufJs5(value)) { + protobufjsVersion = 5; + } else { + var error_message = 'Could not detect ProtoBuf.js version. Please ' + + 'specify the version number with the "protobufjs_version" option'; + throw new Error(error_message); + } } else { - return value; + protobufjsVersion = options.protobufjsVersion; + } + switch (protobufjsVersion) { + case 6: return protobuf_js_6_common.loadObject(value, options); + case 5: + var deprecation_message = 'Calling grpc.loadObject with an object ' + + 'generated by ProtoBuf.js 5 is deprecated. Please upgrade to ' + + 'ProtoBuf.js 6.'; + common.log(grpc.logVerbosity.INFO, deprecation_message); + return protobuf_js_5_common.loadObject(value, options); + default: + throw new Error('Unrecognized protobufjsVersion', protobufjsVersion); } }; var loadObject = exports.loadObject; +function applyProtoRoot(filename, root) { + if (_.isString(filename)) { + return filename; + } + filename.root = path.resolve(filename.root) + '/'; + root.resolvePath = function(originPath, importPath, alreadyNormalized) { + return ProtoBuf.util.path.resolve(filename.root, + importPath, + alreadyNormalized); + }; + return filename.file; +} + /** * Load a gRPC object from a .proto file. The options object can provide the * following options: - * - convertFieldsToCamelCase: Loads this file with that option on protobuf.js - * set as specified. See - * https://github.com/dcodeIO/protobuf.js/wiki/Advanced-options for details + * - convertFieldsToCamelCase: Load this file with field names in camel case + * instead of their original case * - binaryAsBase64: deserialize bytes values as base64 strings instead of * Buffers. Defaults to false * - longsAsStrings: deserialize long values as strings instead of objects. * Defaults to true + * - enumsAsStrings: deserialize enum values as strings instead of numbers. + * Defaults to true * - deprecatedArgumentOrder: Use the beta method argument order for client * methods, with optional arguments after the callback. Defaults to false. * This option is only a temporary stopgap measure to smooth an API breakage. @@ -99,29 +146,17 @@ var loadObject = exports.loadObject; * @return {Object<string, *>} The resulting gRPC object */ exports.load = function load(filename, format, options) { - if (!format) { - format = 'proto'; - } - var convertFieldsToCamelCaseOriginal = ProtoBuf.convertFieldsToCamelCase; - if(options && options.hasOwnProperty('convertFieldsToCamelCase')) { - ProtoBuf.convertFieldsToCamelCase = options.convertFieldsToCamelCase; - } - var builder; - try { - switch(format) { - case 'proto': - builder = ProtoBuf.loadProtoFile(filename); - break; - case 'json': - builder = ProtoBuf.loadJsonFile(filename); - break; - default: - throw new Error('Unrecognized format "' + format + '"'); - } - } finally { - ProtoBuf.convertFieldsToCamelCase = convertFieldsToCamelCaseOriginal; - } - return loadObject(builder.ns, options); + /* Note: format is currently unused, because the API for loading a proto + file or a JSON file is identical in Protobuf.js 6. In the future, there is + still the possibility of adding other formats that would be loaded + differently */ + options = _.defaults(options, common.defaultGrpcOptions); + options.protobufjs_version = 6; + var root = new ProtoBuf.Root(); + var parse_options = {keepCase: !options.convertFieldsToCamelCase}; + return loadObject(root.loadSync(applyProtoRoot(filename, root), + parse_options), + options); }; var log_template = _.template( diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index 05f52a1083..83b8a7c1ec 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -228,7 +228,7 @@ function getServer(port, tls) { server_creds = grpc.ServerCredentials.createInsecure(); } var server = new grpc.Server(options); - server.addProtoService(testProto.TestService.service, { + server.addService(testProto.TestService.service, { emptyCall: handleEmpty, unaryCall: handleUnary, streamingOutputCall: handleStreamingOutput, diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js index 7158af775a..ea85029d98 100644 --- a/src/node/performance/benchmark_server.js +++ b/src/node/performance/benchmark_server.js @@ -140,7 +140,7 @@ function BenchmarkServer(host, port, tls, generic, response_size) { streamingCall: makeStreamingGenericCall(response_size) }); } else { - server.addProtoService(serviceProto.BenchmarkService.service, { + server.addService(serviceProto.BenchmarkService.service, { unaryCall: unaryCall, streamingCall: streamingCall }); diff --git a/src/node/performance/worker.js b/src/node/performance/worker.js index 030bf7d7ba..90a9b7d59c 100644 --- a/src/node/performance/worker.js +++ b/src/node/performance/worker.js @@ -44,8 +44,8 @@ var serviceProto = grpc.load({ function runServer(port, benchmark_impl) { var server_creds = grpc.ServerCredentials.createInsecure(); var server = new grpc.Server(); - server.addProtoService(serviceProto.WorkerService.service, - new WorkerServiceImpl(benchmark_impl, server)); + server.addService(serviceProto.WorkerService.service, + new WorkerServiceImpl(benchmark_impl, server)); var address = '0.0.0.0:' + port; server.bind(address, server_creds); server.start(); diff --git a/src/node/src/client.js b/src/node/src/client.js index 44081a3a6c..1aaf35c16c 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -780,6 +780,8 @@ exports.makeClientConstructor = function(methods, serviceName, _.assign(Client.prototype[name], attrs); }); + Client.service = methods; + return Client; }; @@ -823,26 +825,6 @@ exports.waitForClientReady = function(client, deadline, callback) { }; /** - * Creates a constructor for clients for the given service - * @param {ProtoBuf.Reflect.Service} service The service to generate a client - * for - * @param {Object=} options Options to apply to the client - * @return {function(string, Object)} New client constructor - */ -exports.makeProtobufClientConstructor = function(service, options) { - var method_attrs = common.getProtobufServiceAttrs(service, options); - if (!options) { - options = {deprecatedArgumentOrder: false}; - } - var Client = exports.makeClientConstructor( - method_attrs, common.fullyQualifiedName(service), - options); - Client.service = service; - Client.service.grpc_options = options; - return Client; -}; - -/** * Map of status code names to status codes */ exports.status = grpc.status; diff --git a/src/node/src/common.js b/src/node/src/common.js index a0fe4480ea..757969dbdd 100644 --- a/src/node/src/common.js +++ b/src/node/src/common.js @@ -42,74 +42,6 @@ var _ = require('lodash'); /** - * Get a function that deserializes a specific type of protobuf. - * @param {function()} cls The constructor of the message type to deserialize - * @param {bool=} binaryAsBase64 Deserialize bytes fields as base64 strings - * instead of Buffers. Defaults to false - * @param {bool=} longsAsStrings Deserialize long values as strings instead of - * objects. Defaults to true - * @return {function(Buffer):cls} The deserialization function - */ -exports.deserializeCls = function deserializeCls(cls, binaryAsBase64, - longsAsStrings) { - if (binaryAsBase64 === undefined || binaryAsBase64 === null) { - binaryAsBase64 = false; - } - if (longsAsStrings === undefined || longsAsStrings === null) { - longsAsStrings = true; - } - /** - * Deserialize a buffer to a message object - * @param {Buffer} arg_buf The buffer to deserialize - * @return {cls} The resulting object - */ - return function deserialize(arg_buf) { - // Convert to a native object with binary fields as Buffers (first argument) - // and longs as strings (second argument) - return cls.decode(arg_buf).toRaw(binaryAsBase64, longsAsStrings); - }; -}; - -var deserializeCls = exports.deserializeCls; - -/** - * Get a function that serializes objects to a buffer by protobuf class. - * @param {function()} Cls The constructor of the message type to serialize - * @return {function(Cls):Buffer} The serialization function - */ -exports.serializeCls = function serializeCls(Cls) { - /** - * Serialize an object to a Buffer - * @param {Object} arg The object to serialize - * @return {Buffer} The serialized object - */ - return function serialize(arg) { - return new Buffer(new Cls(arg).encode().toBuffer()); - }; -}; - -var serializeCls = exports.serializeCls; - -/** - * Get the fully qualified (dotted) name of a ProtoBuf.Reflect value. - * @param {ProtoBuf.Reflect.Namespace} value The value to get the name of - * @return {string} The fully qualified name of the value - */ -exports.fullyQualifiedName = function fullyQualifiedName(value) { - if (value === null || value === undefined) { - return ''; - } - var name = value.name; - var parent_name = fullyQualifiedName(value.parent); - if (parent_name !== '') { - name = parent_name + '.' + name; - } - return name; -}; - -var fullyQualifiedName = exports.fullyQualifiedName; - -/** * Wrap a function to pass null-like values through without calling it. If no * function is given, just uses the identity; * @param {?function} func The function to wrap @@ -128,44 +60,6 @@ exports.wrapIgnoreNull = function wrapIgnoreNull(func) { }; /** - * Return a map from method names to method attributes for the service. - * @param {ProtoBuf.Reflect.Service} service The service to get attributes for - * @param {Object=} options Options to apply to these attributes - * @return {Object} The attributes map - */ -exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service, - options) { - var prefix = '/' + fullyQualifiedName(service) + '/'; - var binaryAsBase64, longsAsStrings; - if (options) { - binaryAsBase64 = options.binaryAsBase64; - longsAsStrings = options.longsAsStrings; - } - /* This slightly awkward construction is used to make sure we only use - lodash@3.10.1-compatible functions. A previous version used - _.fromPairs, which would be cleaner, but was introduced in lodash - version 4 */ - return _.zipObject(_.map(service.children, function(method) { - return _.camelCase(method.name); - }), _.map(service.children, function(method) { - return { - originalName: method.name, - path: prefix + method.name, - requestStream: method.requestStream, - responseStream: method.responseStream, - requestType: method.resolvedRequestType, - responseType: method.resolvedResponseType, - requestSerialize: serializeCls(method.resolvedRequestType.build()), - requestDeserialize: deserializeCls(method.resolvedRequestType.build(), - binaryAsBase64, longsAsStrings), - responseSerialize: serializeCls(method.resolvedResponseType.build()), - responseDeserialize: deserializeCls(method.resolvedResponseType.build(), - binaryAsBase64, longsAsStrings) - }; - })); -}; - -/** * The logger object for the gRPC module. Defaults to console. */ exports.logger = console; @@ -185,3 +79,14 @@ exports.log = function log(severity, message) { exports.logger.error(message); } }; + +/** + * Default options for loading proto files into gRPC + */ +exports.defaultGrpcOptions = { + convertFieldsToCamelCase: false, + binaryAsBase64: false, + longsAsStrings: true, + enumsAsStrings: true, + deprecatedArgumentOrder: false +}; diff --git a/src/node/src/protobuf_js_5_common.js b/src/node/src/protobuf_js_5_common.js new file mode 100644 index 0000000000..62cf2f4aca --- /dev/null +++ b/src/node/src/protobuf_js_5_common.js @@ -0,0 +1,181 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +'use strict'; + +var _ = require('lodash'); +var client = require('./client'); + +/** + * Get a function that deserializes a specific type of protobuf. + * @param {function()} cls The constructor of the message type to deserialize + * @param {bool=} binaryAsBase64 Deserialize bytes fields as base64 strings + * instead of Buffers. Defaults to false + * @param {bool=} longsAsStrings Deserialize long values as strings instead of + * objects. Defaults to true + * @return {function(Buffer):cls} The deserialization function + */ +exports.deserializeCls = function deserializeCls(cls, binaryAsBase64, + longsAsStrings) { + /** + * Deserialize a buffer to a message object + * @param {Buffer} arg_buf The buffer to deserialize + * @return {cls} The resulting object + */ + return function deserialize(arg_buf) { + // Convert to a native object with binary fields as Buffers (first argument) + // and longs as strings (second argument) + return cls.decode(arg_buf).toRaw(binaryAsBase64, longsAsStrings); + }; +}; + +var deserializeCls = exports.deserializeCls; + +/** + * Get a function that serializes objects to a buffer by protobuf class. + * @param {function()} Cls The constructor of the message type to serialize + * @return {function(Cls):Buffer} The serialization function + */ +exports.serializeCls = function serializeCls(Cls) { + /** + * Serialize an object to a Buffer + * @param {Object} arg The object to serialize + * @return {Buffer} The serialized object + */ + return function serialize(arg) { + return new Buffer(new Cls(arg).encode().toBuffer()); + }; +}; + +var serializeCls = exports.serializeCls; + +/** + * Get the fully qualified (dotted) name of a ProtoBuf.Reflect value. + * @param {ProtoBuf.Reflect.Namespace} value The value to get the name of + * @return {string} The fully qualified name of the value + */ +exports.fullyQualifiedName = function fullyQualifiedName(value) { + if (value === null || value === undefined) { + return ''; + } + var name = value.name; + var parent_name = fullyQualifiedName(value.parent); + if (parent_name !== '') { + name = parent_name + '.' + name; + } + return name; +}; + +var fullyQualifiedName = exports.fullyQualifiedName; + +/** + * Return a map from method names to method attributes for the service. + * @param {ProtoBuf.Reflect.Service} service The service to get attributes for + * @param {Object=} options Options to apply to these attributes + * @return {Object} The attributes map + */ +exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service, + options) { + var prefix = '/' + fullyQualifiedName(service) + '/'; + var binaryAsBase64, longsAsStrings; + if (options) { + binaryAsBase64 = options.binaryAsBase64; + longsAsStrings = options.longsAsStrings; + } + /* This slightly awkward construction is used to make sure we only use + lodash@3.10.1-compatible functions. A previous version used + _.fromPairs, which would be cleaner, but was introduced in lodash + version 4 */ + return _.zipObject(_.map(service.children, function(method) { + return _.camelCase(method.name); + }), _.map(service.children, function(method) { + return { + originalName: method.name, + path: prefix + method.name, + requestStream: method.requestStream, + responseStream: method.responseStream, + requestType: method.resolvedRequestType, + responseType: method.resolvedResponseType, + requestSerialize: serializeCls(method.resolvedRequestType.build()), + requestDeserialize: deserializeCls(method.resolvedRequestType.build(), + binaryAsBase64, longsAsStrings), + responseSerialize: serializeCls(method.resolvedResponseType.build()), + responseDeserialize: deserializeCls(method.resolvedResponseType.build(), + binaryAsBase64, longsAsStrings) + }; + })); +}; + +var getProtobufServiceAttrs = exports.getProtobufServiceAttrs; + +/** + * Load a gRPC object from an existing ProtoBuf.Reflect object. + * @param {ProtoBuf.Reflect.Namespace} value The ProtoBuf object to load. + * @param {Object=} options Options to apply to the loaded object + * @return {Object<string, *>} The resulting gRPC object + */ +exports.loadObject = function loadObject(value, options) { + var result = {}; + if (!value) { + return value; + } + if (value.hasOwnProperty('ns')) { + return loadObject(value.ns, options); + } + if (value.className === 'Namespace') { + _.each(value.children, function(child) { + result[child.name] = loadObject(child, options); + }); + return result; + } else if (value.className === 'Service') { + return client.makeClientConstructor(getProtobufServiceAttrs(value, options), + options); + } else if (value.className === 'Message' || value.className === 'Enum') { + return value.build(); + } else { + return value; + } +}; + +/** + * The primary purpose of this method is to distinguish between reflection + * objects from different versions of ProtoBuf.js. This is just a heuristic, + * checking for properties that are (currently) specific to this version of + * ProtoBuf.js + * @param {Object} obj The object to check + * @return {boolean} Whether the object appears to be a Protobuf.js 5 + * ReflectionObject + */ +exports.isProbablyProtobufJs5 = function isProbablyProtobufJs5(obj) { + return _.isArray(obj.children) && (typeof obj.build === 'function'); +}; diff --git a/src/node/src/protobuf_js_6_common.js b/src/node/src/protobuf_js_6_common.js new file mode 100644 index 0000000000..00f11f2736 --- /dev/null +++ b/src/node/src/protobuf_js_6_common.js @@ -0,0 +1,170 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +'use strict'; + +var _ = require('lodash'); +var client = require('./client'); + +/** + * Get a function that deserializes a specific type of protobuf. + * @param {function()} cls The constructor of the message type to deserialize + * @param {bool=} binaryAsBase64 Deserialize bytes fields as base64 strings + * instead of Buffers. Defaults to false + * @param {bool=} longsAsStrings Deserialize long values as strings instead of + * objects. Defaults to true + * @return {function(Buffer):cls} The deserialization function + */ +exports.deserializeCls = function deserializeCls(cls, options) { + var conversion_options = { + defaults: true, + bytes: options.binaryAsBase64 ? String : Buffer, + longs: options.longsAsStrings ? String : null, + enums: options.enumsAsStrings ? String : null, + oneofs: true + }; + /** + * Deserialize a buffer to a message object + * @param {Buffer} arg_buf The buffer to deserialize + * @return {cls} The resulting object + */ + return function deserialize(arg_buf) { + return cls.decode(arg_buf).toObject(conversion_options); + }; +}; + +var deserializeCls = exports.deserializeCls; + +/** + * Get a function that serializes objects to a buffer by protobuf class. + * @param {function()} Cls The constructor of the message type to serialize + * @return {function(Cls):Buffer} The serialization function + */ +exports.serializeCls = function serializeCls(cls) { + /** + * Serialize an object to a Buffer + * @param {Object} arg The object to serialize + * @return {Buffer} The serialized object + */ + return function serialize(arg) { + var message = cls.fromObject(arg); + return cls.encode(message).finish(); + }; +}; + +var serializeCls = exports.serializeCls; + +/** + * Get the fully qualified (dotted) name of a ProtoBuf.Reflect value. + * @param {ProtoBuf.ReflectionObject} value The value to get the name of + * @return {string} The fully qualified name of the value + */ +exports.fullyQualifiedName = function fullyQualifiedName(value) { + if (value === null || value === undefined) { + return ''; + } + var name = value.name; + var parent_fqn = fullyQualifiedName(value.parent); + if (parent_fqn !== '') { + name = parent_fqn + '.' + name; + } + return name; +}; + +var fullyQualifiedName = exports.fullyQualifiedName; + +/** + * Return a map from method names to method attributes for the service. + * @param {ProtoBuf.Service} service The service to get attributes for + * @param {Object=} options Options to apply to these attributes + * @return {Object} The attributes map + */ +exports.getProtobufServiceAttrs = function getProtobufServiceAttrs(service, + options) { + var prefix = '/' + fullyQualifiedName(service) + '/'; + service.resolveAll(); + return _.zipObject(_.map(service.methods, function(method) { + return _.camelCase(method.name); + }), _.map(service.methods, function(method) { + return { + originalName: method.name, + path: prefix + method.name, + requestStream: !!method.requestStream, + responseStream: !!method.responseStream, + requestType: method.resolvedRequestType, + responseType: method.resolvedResponseType, + requestSerialize: serializeCls(method.resolvedRequestType), + requestDeserialize: deserializeCls(method.resolvedRequestType, options), + responseSerialize: serializeCls(method.resolvedResponseType), + responseDeserialize: deserializeCls(method.resolvedResponseType, options) + }; + })); +}; + +var getProtobufServiceAttrs = exports.getProtobufServiceAttrs; + +exports.loadObject = function loadObject(value, options) { + var result = {}; + if (!value) { + return value; + } + if (value.hasOwnProperty('methods')) { + // It's a service object + var service_attrs = getProtobufServiceAttrs(value, options); + return client.makeClientConstructor(service_attrs); + } + + if (value.hasOwnProperty('nested')) { + // It's a namespace or root object + _.each(value.nested, function(nested, name) { + result[name] = loadObject(nested, options); + }); + return result; + } + + // Otherwise, it's not something we need to change + return value; +}; + +/** + * The primary purpose of this method is to distinguish between reflection + * objects from different versions of ProtoBuf.js. This is just a heuristic, + * checking for properties that are (currently) specific to this version of + * ProtoBuf.js + * @param {Object} obj The object to check + * @return {boolean} Whether the object appears to be a Protobuf.js 6 + * ReflectionObject + */ +exports.isProbablyProtobufJs6 = function isProbablyProtobufJs6(obj) { + return (typeof obj.root === 'object') && (typeof obj.resolve === 'function'); +}; diff --git a/src/node/src/server.js b/src/node/src/server.js index bdb4a56203..3450abed08 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -781,17 +781,31 @@ Server.prototype.addService = function(service, implementation) { /** * Add a proto service to the server, with a corresponding implementation + * @deprecated Use grpc.load and Server#addService instead * @param {Protobuf.Reflect.Service} service The proto service descriptor * @param {Object<String, function>} implementation Map of method names to * method implementation for the provided service. */ Server.prototype.addProtoService = function(service, implementation) { var options; - if (service.grpc_options) { - options = service.grpc_options; + var protobuf_js_5_common = require('./protobuf_js_5_common'); + var protobuf_js_6_common = require('./protobuf_js_6_common'); + common.log(grpc.logVerbosity.INFO, + 'Server#addProtoService is deprecated. Use addService instead'); + if (protobuf_js_5_common.isProbablyProtobufJs5(service)) { + options = _.defaults(service.grpc_options, common.defaultGrpcOptions); + this.addService( + protobuf_js_5_common.getProtobufServiceAttrs(service, options), + implementation); + } else if (protobuf_js_6_common.isProbablyProtobufJs6(service)) { + options = _.defaults(service.grpc_options, common.defaultGrpcOptions); + this.addService( + protobuf_js_6_common.getProtobufServiceAttrs(service, options), + implementation); + } else { + // We assume that this is a service attributes object + this.addService(service, implementation); } - this.addService(common.getProtobufServiceAttrs(service, options), - implementation); }; /** diff --git a/src/node/stress/metrics_server.js b/src/node/stress/metrics_server.js index 3ab4b4c82d..b3f939e8f3 100644 --- a/src/node/stress/metrics_server.js +++ b/src/node/stress/metrics_server.js @@ -63,7 +63,7 @@ function getAllGauges(call) { function MetricsServer(port) { var server = new grpc.Server(); - server.addProtoService(metrics.MetricsService.service, { + server.addService(metrics.MetricsService.service, { getGauge: _.bind(getGauge, this), getAllGauges: _.bind(getAllGauges, this) }); diff --git a/src/node/test/common_test.js b/src/node/test/common_test.js index c57b7388f6..e1ce864f97 100644 --- a/src/node/test/common_test.js +++ b/src/node/test/common_test.js @@ -34,17 +34,26 @@ 'use strict'; var assert = require('assert'); +var _ = require('lodash'); -var common = require('../src/common.js'); +var common = require('../src/common'); +var protobuf_js_6_common = require('../src/protobuf_js_6_common'); + +var serializeCls = protobuf_js_6_common.serializeCls; +var deserializeCls = protobuf_js_6_common.deserializeCls; var ProtoBuf = require('protobufjs'); -var messages_proto = ProtoBuf.loadProtoFile( - __dirname + '/test_messages.proto').build(); +var messages_proto = new ProtoBuf.Root(); +messages_proto = messages_proto.loadSync( + __dirname + '/test_messages.proto', {keepCase: true}).resolveAll(); + +var default_options = common.defaultGrpcOptions; describe('Proto message long int serialize and deserialize', function() { - var longSerialize = common.serializeCls(messages_proto.LongValues); - var longDeserialize = common.deserializeCls(messages_proto.LongValues); + var longSerialize = serializeCls(messages_proto.LongValues); + var longDeserialize = deserializeCls(messages_proto.LongValues, + default_options); var pos_value = '314159265358979'; var neg_value = '-27182818284590'; it('should preserve positive int64 values', function() { @@ -88,8 +97,9 @@ describe('Proto message long int serialize and deserialize', function() { neg_value); }); it('should deserialize as a number with the right option set', function() { - var longNumDeserialize = common.deserializeCls(messages_proto.LongValues, - false, false); + var num_options = _.defaults({longsAsStrings: false}, default_options); + var longNumDeserialize = deserializeCls(messages_proto.LongValues, + num_options); var serialized = longSerialize({int_64: pos_value}); assert.strictEqual(typeof longDeserialize(serialized).int_64, 'string'); /* With the longsAsStrings option disabled, long values are represented as @@ -98,11 +108,12 @@ describe('Proto message long int serialize and deserialize', function() { }); }); describe('Proto message bytes serialize and deserialize', function() { - var sequenceSerialize = common.serializeCls(messages_proto.SequenceValues); - var sequenceDeserialize = common.deserializeCls( - messages_proto.SequenceValues); - var sequenceBase64Deserialize = common.deserializeCls( - messages_proto.SequenceValues, true); + var sequenceSerialize = serializeCls(messages_proto.SequenceValues); + var sequenceDeserialize = deserializeCls( + messages_proto.SequenceValues, default_options); + var b64_options = _.defaults({binaryAsBase64: true}, default_options); + var sequenceBase64Deserialize = deserializeCls( + messages_proto.SequenceValues, b64_options); var buffer_val = new Buffer([0x69, 0xb7]); var base64_val = 'abc='; it('should preserve a buffer', function() { @@ -120,19 +131,73 @@ describe('Proto message bytes serialize and deserialize', function() { var deserialized = sequenceBase64Deserialize(serialized); assert.strictEqual(deserialized.bytes_field, base64_val); }); - /* The next two tests are specific tests to verify that issue - * https://github.com/grpc/grpc/issues/5174 has been fixed. They are skipped - * because they will not pass until a protobuf.js release has been published - * with a fix for https://github.com/dcodeIO/protobuf.js/issues/390 */ - it.skip('should serialize a repeated field as packed by default', function() { - var expected_serialize = new Buffer([0x12, 0x01, 0x01, 0x0a]); + it('should serialize a repeated field as packed by default', function() { + var expected_serialize = new Buffer([0x12, 0x01, 0x0a]); var serialized = sequenceSerialize({repeated_field: [10]}); assert.strictEqual(expected_serialize.compare(serialized), 0); }); - it.skip('should deserialize packed or unpacked repeated', function() { - var serialized = new Buffer([0x12, 0x01, 0x01, 0x0a]); + it('should deserialize packed or unpacked repeated', function() { + var expectedDeserialize = { + bytes_field: new Buffer(''), + repeated_field: [10] + }; + var packedSerialized = new Buffer([0x12, 0x01, 0x0a]); + var unpackedSerialized = new Buffer([0x10, 0x0a]); + var packedDeserialized; + var unpackedDeserialized; assert.doesNotThrow(function() { - sequenceDeserialize(serialized); + packedDeserialized = sequenceDeserialize(packedSerialized); }); + assert.doesNotThrow(function() { + unpackedDeserialized = sequenceDeserialize(unpackedSerialized); + }); + assert.deepEqual(packedDeserialized, expectedDeserialize); + assert.deepEqual(unpackedDeserialized, expectedDeserialize); + }); +}); +describe('Proto message oneof serialize and deserialize', function() { + var oneofSerialize = serializeCls(messages_proto.OneOfValues); + var oneofDeserialize = deserializeCls( + messages_proto.OneOfValues, default_options); + it('Should have idempotent round trips', function() { + var test_message = {oneof_choice: 'int_choice', int_choice: 5}; + var serialized1 = oneofSerialize(test_message); + var deserialized1 = oneofDeserialize(serialized1); + assert.equal(deserialized1.int_choice, 5); + var serialized2 = oneofSerialize(deserialized1); + var deserialized2 = oneofDeserialize(serialized2); + assert.deepEqual(deserialized1, deserialized2); + }); + it('Should emit a property indicating which field was chosen', function() { + var test_message1 = {oneof_choice: 'int_choice', int_choice: 5}; + var serialized1 = oneofSerialize(test_message1); + var deserialized1 = oneofDeserialize(serialized1); + assert.equal(deserialized1.oneof_choice, 'int_choice'); + var test_message2 = {oneof_choice: 'string_choice', string_choice: 'abc'}; + var serialized2 = oneofSerialize(test_message2); + var deserialized2 = oneofDeserialize(serialized2); + assert.equal(deserialized2.oneof_choice, 'string_choice'); + }); +}); +describe('Proto message enum serialize and deserialize', function() { + var enumSerialize = serializeCls(messages_proto.EnumValues); + var enumDeserialize = deserializeCls( + messages_proto.EnumValues, default_options); + var enumIntOptions = _.defaults({enumsAsStrings: false}, default_options); + var enumIntDeserialize = deserializeCls( + messages_proto.EnumValues, enumIntOptions); + it('Should accept both names and numbers', function() { + var nameSerialized = enumSerialize({enum_value: 'ONE'}); + var numberSerialized = enumSerialize({enum_value: 1}); + assert.strictEqual(messages_proto.TestEnum.ONE, 1); + assert.deepEqual(enumDeserialize(nameSerialized), + enumDeserialize(numberSerialized)); + }); + it('Should deserialize as a string the enumsAsStrings option', function() { + var serialized = enumSerialize({enum_value: 'TWO'}); + var nameDeserialized = enumDeserialize(serialized); + var numberDeserialized = enumIntDeserialize(serialized); + assert.deepEqual(nameDeserialized, {enum_value: 'TWO'}); + assert.deepEqual(numberDeserialized, {enum_value: 2}); }); }); diff --git a/src/node/test/credentials_test.js b/src/node/test/credentials_test.js index 305843f665..b66b4bf5ea 100644 --- a/src/node/test/credentials_test.js +++ b/src/node/test/credentials_test.js @@ -228,7 +228,7 @@ describe('client credentials', function() { before(function() { var proto = grpc.load(__dirname + '/test_service.proto'); server = new grpc.Server(); - server.addProtoService(proto.TestService.service, { + server.addService(proto.TestService.service, { unary: function(call, cb) { call.sendMetadata(call.metadata); cb(null, {}); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 1d739562a6..783028fa99 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -34,19 +34,22 @@ 'use strict'; var assert = require('assert'); +var _ = require('lodash'); var surface_client = require('../src/client.js'); +var common = require('../src/common'); var ProtoBuf = require('protobufjs'); var grpc = require('..'); -var math_proto = ProtoBuf.loadProtoFile(__dirname + - '/../../proto/math/math.proto'); +var math_proto = new ProtoBuf.Root(); +math_proto = math_proto.loadSync(__dirname + + '/../../proto/math/math.proto', {keepCase: true}); var mathService = math_proto.lookup('math.Math'); - -var _ = require('lodash'); +var mathServiceAttrs = grpc.loadObject( + mathService, common.defaultGrpcOptions).service; /** * This is used for testing functions with multiple asynchronous calls that @@ -87,11 +90,6 @@ describe('File loader', function() { grpc.load(__dirname + '/test_service.json', 'json'); }); }); - it('Should fail to load a file with an unknown format', function() { - assert.throws(function() { - grpc.load(__dirname + '/test_service.proto', 'fake_format'); - }); - }); }); describe('surface Server', function() { var server; @@ -132,15 +130,40 @@ describe('Server.prototype.addProtoService', function() { afterEach(function() { server.forceShutdown(); }); - it('Should succeed with a single service', function() { + it('Should succeed with a single proto service', function() { assert.doesNotThrow(function() { server.addProtoService(mathService, dummyImpls); }); }); + it('Should succeed with a single service attributes object', function() { + assert.doesNotThrow(function() { + server.addProtoService(mathServiceAttrs, dummyImpls); + }); + }); +}); +describe('Server.prototype.addService', function() { + var server; + var dummyImpls = { + 'div': function() {}, + 'divMany': function() {}, + 'fib': function() {}, + 'sum': function() {} + }; + beforeEach(function() { + server = new grpc.Server(); + }); + afterEach(function() { + server.forceShutdown(); + }); + it('Should succeed with a single service', function() { + assert.doesNotThrow(function() { + server.addService(mathServiceAttrs, dummyImpls); + }); + }); it('Should fail with conflicting method names', function() { - server.addProtoService(mathService, dummyImpls); + server.addService(mathServiceAttrs, dummyImpls); assert.throws(function() { - server.addProtoService(mathService, dummyImpls); + server.addService(mathServiceAttrs, dummyImpls); }); }); it('Should allow method names as originally written', function() { @@ -172,15 +195,15 @@ describe('Server.prototype.addProtoService', function() { it('Should fail if the server has been started', function() { server.start(); assert.throws(function() { - server.addProtoService(mathService, dummyImpls); + server.addService(mathServiceAttrs, dummyImpls); }); }); describe('Default handlers', function() { var client; beforeEach(function() { - server.addProtoService(mathService, {}); + server.addService(mathServiceAttrs, {}); var port = server.bind('localhost:0', server_insecure_creds); - var Client = surface_client.makeProtobufClientConstructor(mathService); + var Client = grpc.loadObject(mathService); client = new Client('localhost:' + port, grpc.credentials.createInsecure()); server.start(); @@ -252,7 +275,7 @@ describe('waitForClientReady', function() { server = new grpc.Server(); port = server.bind('localhost:0', grpc.ServerCredentials.createInsecure()); server.start(); - Client = surface_client.makeProtobufClientConstructor(mathService); + Client = grpc.loadObject(mathService); }); beforeEach(function() { client = new Client('localhost:' + port, grpc.credentials.createInsecure()); @@ -309,16 +332,18 @@ describe('Echo service', function() { var server; var client; before(function() { - var test_proto = ProtoBuf.loadProtoFile(__dirname + '/echo_service.proto'); + var test_proto = new ProtoBuf.Root(); + test_proto = test_proto.loadSync(__dirname + '/echo_service.proto', + {keepCase: true}); var echo_service = test_proto.lookup('EchoService'); + var Client = grpc.loadObject(echo_service); server = new grpc.Server(); - server.addProtoService(echo_service, { + server.addService(Client.service, { echo: function(call, callback) { callback(null, call.request); } }); var port = server.bind('localhost:0', server_insecure_creds); - var Client = surface_client.makeProtobufClientConstructor(echo_service); client = new Client('localhost:' + port, grpc.credentials.createInsecure()); server.start(); }); @@ -432,10 +457,13 @@ describe('Echo metadata', function() { var server; var metadata; before(function() { - var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_proto = new ProtoBuf.Root(); + test_proto = test_proto.loadSync(__dirname + '/test_service.proto', + {keepCase: true}); var test_service = test_proto.lookup('TestService'); + var Client = grpc.loadObject(test_service); server = new grpc.Server(); - server.addProtoService(test_service, { + server.addService(Client.service, { unary: function(call, cb) { call.sendMetadata(call.metadata); cb(null, {}); @@ -460,7 +488,6 @@ describe('Echo metadata', function() { } }); var port = server.bind('localhost:0', server_insecure_creds); - var Client = surface_client.makeProtobufClientConstructor(test_service); client = new Client('localhost:' + port, grpc.credentials.createInsecure()); server.start(); metadata = new grpc.Metadata(); @@ -533,7 +560,9 @@ describe('Client malformed response handling', function() { var client; var badArg = new Buffer([0xFF]); before(function() { - var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_proto = new ProtoBuf.Root(); + test_proto = test_proto.loadSync(__dirname + '/test_service.proto', + {keepCase: true}); var test_service = test_proto.lookup('TestService'); var malformed_test_service = { unary: { @@ -591,7 +620,7 @@ describe('Client malformed response handling', function() { } }); var port = server.bind('localhost:0', server_insecure_creds); - var Client = surface_client.makeProtobufClientConstructor(test_service); + var Client = grpc.loadObject(test_service); client = new Client('localhost:' + port, grpc.credentials.createInsecure()); server.start(); }); @@ -640,7 +669,9 @@ describe('Server serialization failure handling', function() { var client; var server; before(function() { - var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_proto = new ProtoBuf.Root(); + test_proto = test_proto.loadSync(__dirname + '/test_service.proto', + {keepCase: true}); var test_service = test_proto.lookup('TestService'); var malformed_test_service = { unary: { @@ -698,7 +729,7 @@ describe('Server serialization failure handling', function() { } }); var port = server.bind('localhost:0', server_insecure_creds); - var Client = surface_client.makeProtobufClientConstructor(test_service); + var Client = grpc.loadObject(test_service); client = new Client('localhost:' + port, grpc.credentials.createInsecure()); server.start(); }); @@ -747,12 +778,15 @@ describe('Other conditions', function() { var server; var port; before(function() { - var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_proto = new ProtoBuf.Root(); + test_proto = test_proto.loadSync(__dirname + '/test_service.proto', + {keepCase: true}); test_service = test_proto.lookup('TestService'); + Client = grpc.loadObject(test_service); server = new grpc.Server(); var trailer_metadata = new grpc.Metadata(); trailer_metadata.add('trailer-present', 'yes'); - server.addProtoService(test_service, { + server.addService(Client.service, { unary: function(call, cb) { var req = call.request; if (req.error) { @@ -812,7 +846,6 @@ describe('Other conditions', function() { } }); port = server.bind('localhost:0', server_insecure_creds); - Client = surface_client.makeProtobufClientConstructor(test_service); client = new Client('localhost:' + port, grpc.credentials.createInsecure()); server.start(); }); @@ -1093,17 +1126,19 @@ describe('Call propagation', function() { var client; var server; before(function() { - var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); + var test_proto = new ProtoBuf.Root(); + test_proto = test_proto.loadSync(__dirname + '/test_service.proto', + {keepCase: true}); test_service = test_proto.lookup('TestService'); server = new grpc.Server(); - server.addProtoService(test_service, { + Client = grpc.loadObject(test_service); + server.addService(Client.service, { unary: function(call) {}, clientStream: function(stream) {}, serverStream: function(stream) {}, bidiStream: function(stream) {} }); var port = server.bind('localhost:0', server_insecure_creds); - Client = surface_client.makeProtobufClientConstructor(test_service); client = new Client('localhost:' + port, grpc.credentials.createInsecure()); server.start(); }); @@ -1138,7 +1173,7 @@ describe('Call propagation', function() { }); call.cancel(); }; - proxy.addProtoService(test_service, proxy_impl); + proxy.addService(Client.service, proxy_impl); var proxy_port = proxy.bind('localhost:0', server_insecure_creds); proxy.start(); var proxy_client = new Client('localhost:' + proxy_port, @@ -1160,7 +1195,7 @@ describe('Call propagation', function() { }); call.cancel(); }; - proxy.addProtoService(test_service, proxy_impl); + proxy.addService(Client.service, proxy_impl); var proxy_port = proxy.bind('localhost:0', server_insecure_creds); proxy.start(); var proxy_client = new Client('localhost:' + proxy_port, @@ -1180,7 +1215,7 @@ describe('Call propagation', function() { }); call.cancel(); }; - proxy.addProtoService(test_service, proxy_impl); + proxy.addService(Client.service, proxy_impl); var proxy_port = proxy.bind('localhost:0', server_insecure_creds); proxy.start(); var proxy_client = new Client('localhost:' + proxy_port, @@ -1204,7 +1239,7 @@ describe('Call propagation', function() { }); call.cancel(); }; - proxy.addProtoService(test_service, proxy_impl); + proxy.addService(Client.service, proxy_impl); var proxy_port = proxy.bind('localhost:0', server_insecure_creds); proxy.start(); var proxy_client = new Client('localhost:' + proxy_port, @@ -1235,7 +1270,7 @@ describe('Call propagation', function() { } }); }; - proxy.addProtoService(test_service, proxy_impl); + proxy.addService(Client.service, proxy_impl); var proxy_port = proxy.bind('localhost:0', server_insecure_creds); proxy.start(); var proxy_client = new Client('localhost:' + proxy_port, @@ -1259,7 +1294,7 @@ describe('Call propagation', function() { done(); }); }; - proxy.addProtoService(test_service, proxy_impl); + proxy.addService(Client.service, proxy_impl); var proxy_port = proxy.bind('localhost:0', server_insecure_creds); proxy.start(); var proxy_client = new Client('localhost:' + proxy_port, @@ -1279,14 +1314,14 @@ describe('Cancelling surface client', function() { var server; before(function() { server = new grpc.Server(); - server.addProtoService(mathService, { + server.addService(mathServiceAttrs, { 'div': function(stream) {}, 'divMany': function(stream) {}, 'fib': function(stream) {}, 'sum': function(stream) {} }); var port = server.bind('localhost:0', server_insecure_creds); - var Client = surface_client.makeProtobufClientConstructor(mathService); + var Client = surface_client.makeClientConstructor(mathServiceAttrs); client = new Client('localhost:' + port, grpc.credentials.createInsecure()); server.start(); }); diff --git a/src/node/test/test_messages.proto b/src/node/test/test_messages.proto index a1a6a32833..ae70f6e152 100644 --- a/src/node/test/test_messages.proto +++ b/src/node/test/test_messages.proto @@ -41,3 +41,20 @@ message SequenceValues { bytes bytes_field = 1; repeated int32 repeated_field = 2; } + +message OneOfValues { + oneof oneof_choice { + int32 int_choice = 1; + string string_choice = 2; + } +} + +enum TestEnum { + ZERO = 0; + ONE = 1; + TWO = 2; +} + +message EnumValues { + TestEnum enum_value = 1; +}
\ No newline at end of file diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 47838c2c98..f29c44a4cf 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -705,6 +705,10 @@ def _serve(state): state.rpc_states.remove(rpc_state) if _stop_serving(state): return + # We want to force the deletion of the previous event + # ~before~ we poll again; if the event has a reference + # to a shutdown Call object, this can induce spinlock. + event = None def _stop(state, grace): diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb index 43e2fe8cbb..d3e5373b0b 100755 --- a/src/ruby/end2end/channel_closing_driver.rb +++ b/src/ruby/end2end/channel_closing_driver.rb @@ -36,7 +36,7 @@ require_relative './end2end_common' def main STDERR.puts 'start server' - server_runner = ServerRunner.new + server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run sleep 1 diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb index c3184bf939..80fb62899e 100755 --- a/src/ruby/end2end/channel_state_driver.rb +++ b/src/ruby/end2end/channel_state_driver.rb @@ -35,7 +35,7 @@ require_relative './end2end_common' def main STDERR.puts 'start server' - server_runner = ServerRunner.new + server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run sleep 1 diff --git a/src/ruby/end2end/end2end_common.rb b/src/ruby/end2end/end2end_common.rb index 9534bb2078..1c87ceddf1 100755 --- a/src/ruby/end2end/end2end_common.rb +++ b/src/ruby/end2end/end2end_common.rb @@ -55,13 +55,14 @@ end # ServerRunner starts an "echo server" that test clients can make calls to class ServerRunner - def initialize + def initialize(service_impl) + @service_impl = service_impl end def run @srv = GRPC::RpcServer.new port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) - @srv.handle(EchoServerImpl) + @srv.handle(@service_impl) @thd = Thread.new do @srv.run diff --git a/src/ruby/end2end/killed_client_thread_client.rb b/src/ruby/end2end/killed_client_thread_client.rb new file mode 100755 index 0000000000..d5a7db7d58 --- /dev/null +++ b/src/ruby/end2end/killed_client_thread_client.rb @@ -0,0 +1,58 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# Attempt to reproduce +# https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/1327 + +require_relative './end2end_common' + +def main + server_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do + STDERR.puts 'client control port not used' + end + opts.on('--server_port=P', String) do |p| + server_port = p + end + end.parse! + + thd = Thread.new do + stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", + :this_channel_is_insecure) + stub.echo(Echo::EchoRequest.new(request: 'hello')) + fail 'the clients rpc in this test shouldnt complete. ' \ + 'expecting SIGINT to happen in the middle of the call' + end + thd.join +end + +main diff --git a/src/ruby/end2end/killed_client_thread_driver.rb b/src/ruby/end2end/killed_client_thread_driver.rb new file mode 100755 index 0000000000..f76d3e1746 --- /dev/null +++ b/src/ruby/end2end/killed_client_thread_driver.rb @@ -0,0 +1,114 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +# Service that sleeps for a long time upon receiving an 'echo request' +# Also, this notifies @call_started_cv once it has received a request. +class SleepingEchoServerImpl < Echo::EchoServer::Service + def initialize(call_started, call_started_mu, call_started_cv) + @call_started = call_started + @call_started_mu = call_started_mu + @call_started_cv = call_started_cv + end + + def echo(echo_req, _) + @call_started_mu.synchronize do + @call_started.set_true + @call_started_cv.signal + end + sleep 1000 + Echo::EchoReply.new(response: echo_req.request) + end +end + +# Mutable boolean +class BoolHolder + attr_reader :val + + def init + @val = false + end + + def set_true + @val = true + end +end + +def main + STDERR.puts 'start server' + + call_started = BoolHolder.new + call_started_mu = Mutex.new + call_started_cv = ConditionVariable.new + + service_impl = SleepingEchoServerImpl.new(call_started, + call_started_mu, + call_started_cv) + server_runner = ServerRunner.new(service_impl) + server_port = server_runner.run + + STDERR.puts 'start client' + _, client_pid = start_client('killed_client_thread_client.rb', + server_port) + + call_started_mu.synchronize do + call_started_cv.wait(call_started_mu) until call_started.val + end + + # SIGINT the child process now that it's + # in the middle of an RPC (happening on a non-main thread) + Process.kill('SIGINT', client_pid) + STDERR.puts 'sent shutdown' + + begin + Timeout.timeout(10) do + Process.wait(client_pid) + end + rescue Timeout::Error + STDERR.puts "timeout wait for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. ' \ + 'It likely hangs when killed while in the middle of an rpc' + end + + client_exit_code = $CHILD_STATUS + if client_exit_code.termsig != 2 # SIGINT + fail 'expected client exit from SIGINT ' \ + "but got child status: #{client_exit_code}" + end + + server_runner.stop +end + +main diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb index c5d46e074c..6691464dc6 100755 --- a/src/ruby/end2end/sig_handling_driver.rb +++ b/src/ruby/end2end/sig_handling_driver.rb @@ -36,7 +36,7 @@ require_relative './end2end_common' def main STDERR.puts 'start server' - server_runner = ServerRunner.new + server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run sleep 1 diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb index 84d039bf19..670cda0919 100755 --- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb +++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb @@ -36,7 +36,7 @@ require_relative './end2end_common' def main STDERR.puts 'start server' - server_runner = ServerRunner.new + server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run sleep 1 diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 82d340b254..344cb941ff 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -784,7 +784,7 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { Only one operation of each type can be active at once in any given batch */ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { - run_batch_stack st; + run_batch_stack *st = NULL; grpc_rb_call *call = NULL; grpc_event ev; grpc_call_error err; @@ -792,6 +792,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { VALUE rb_write_flag = rb_ivar_get(self, id_write_flag); unsigned write_flag = 0; void *tag = (void*)&st; + if (RTYPEDDATA_DATA(self) == NULL) { rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call"); return Qnil; @@ -806,14 +807,16 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { if (rb_write_flag != Qnil) { write_flag = NUM2UINT(rb_write_flag); } - grpc_run_batch_stack_init(&st, write_flag); - grpc_run_batch_stack_fill_ops(&st, ops_hash); + st = gpr_malloc(sizeof(run_batch_stack)); + grpc_run_batch_stack_init(st, write_flag); + grpc_run_batch_stack_fill_ops(st, ops_hash); /* call grpc_call_start_batch, then wait for it to complete using * pluck_event */ - err = grpc_call_start_batch(call->wrapped, st.ops, st.op_num, tag, NULL); + err = grpc_call_start_batch(call->wrapped, st->ops, st->op_num, tag, NULL); if (err != GRPC_CALL_OK) { - grpc_run_batch_stack_cleanup(&st); + grpc_run_batch_stack_cleanup(st); + gpr_free(st); rb_raise(grpc_rb_eCallError, "grpc_call_start_batch failed with %s (code=%d)", grpc_call_error_detail_of(err), err); @@ -826,8 +829,9 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { } /* Build and return the BatchResult struct result, if there is an error, it's reflected in the status */ - result = grpc_run_batch_stack_build_result(&st); - grpc_run_batch_stack_cleanup(&st); + result = grpc_run_batch_stack_build_result(st); + grpc_run_batch_stack_cleanup(st); + gpr_free(st); return result; } |