diff options
author | Vijay Pai <vpai@google.com> | 2015-06-18 08:42:29 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2015-06-18 08:42:29 -0700 |
commit | d03adc340ae04df7ba6635192fdd3bfa9a3c607f (patch) | |
tree | d13cd0303fcb3b9b453e40d57412d7751410dfc3 /src | |
parent | d2a82d9da7dbe2759d7bdd400cc835229e904c07 (diff) | |
parent | 6174b9a4d092c145d6cd3c90ab59bf5a0939329e (diff) |
Merge remote-tracking branch 'ctiller-repo/we-dont-need-no-backup' into poller2
Diffstat (limited to 'src')
30 files changed, 224 insertions, 113 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 1b0e33b123..166d559a45 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -115,3 +115,27 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) { } return 0; } + +grpc_compression_level grpc_channel_args_get_compression_level( + const grpc_channel_args *a) { + size_t i; + if (a) { + for (i = 0; a && i < a->num_args; ++i) { + if (a->args[i].type == GRPC_ARG_INTEGER && + !strcmp(GRPC_COMPRESSION_LEVEL_ARG, a->args[i].key)) { + return a->args[i].value.integer; + break; + } + } + } + return GRPC_COMPRESS_LEVEL_NONE; +} + +void grpc_channel_args_set_compression_level( + grpc_channel_args **a, grpc_compression_level level) { + grpc_arg tmp; + tmp.type = GRPC_ARG_INTEGER; + tmp.key = GRPC_COMPRESSION_LEVEL_ARG; + tmp.value.integer = level; + *a = grpc_channel_args_copy_and_add(*a, &tmp); +} diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h index eb5bf63986..bf747b26e6 100644 --- a/src/core/channel/channel_args.h +++ b/src/core/channel/channel_args.h @@ -34,21 +34,31 @@ #ifndef GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H #define GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H +#include <grpc/compression.h> #include <grpc/grpc.h> /* Copy some arguments */ grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); -/* Copy some arguments and add the to_add parameter in the end. +/** Copy some arguments and add the to_add parameter in the end. If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, const grpc_arg *to_add); -/* Destroy arguments created by grpc_channel_args_copy */ +/** Destroy arguments created by grpc_channel_args_copy */ void grpc_channel_args_destroy(grpc_channel_args *a); -/* Reads census_enabled settings from channel args. Returns 1 if census_enabled - is specified in channel args, otherwise returns 0. */ +/** Reads census_enabled settings from channel args. Returns 1 if census_enabled + * is specified in channel args, otherwise returns 0. */ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a); +/** Returns the compression level set in \a a. */ +grpc_compression_level grpc_channel_args_get_compression_level( + const grpc_channel_args *a); + +/** Sets the compression level in \a a to \a level. Setting it to + * GRPC_COMPRESS_LEVEL_NONE disables compression for the channel. */ +void grpc_channel_args_set_compression_level( + grpc_channel_args **a, grpc_compression_level level); + #endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index d3e6af7287..6690265d75 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -158,11 +158,8 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) { const grpc_channel_filter grpc_child_channel_top_filter = { lb_start_transport_op, lb_channel_op, - sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem, - sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem, - "child-channel", }; diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c index 42ee23d87f..5be8fa66e9 100644 --- a/src/core/channel/client_setup.c +++ b/src/core/channel/client_setup.c @@ -56,6 +56,8 @@ struct grpc_client_setup { gpr_cv cv; grpc_client_setup_request *active_request; int refs; + /** The set of pollsets that are currently interested in this + connection being established */ grpc_pollset_set interested_parties; }; @@ -83,9 +85,7 @@ static void destroy_setup(grpc_client_setup *s) { gpr_free(s); } -static void destroy_request(grpc_client_setup_request *r) { - gpr_free(r); -} +static void destroy_request(grpc_client_setup_request *r) { gpr_free(r); } /* initiate handshaking */ static void setup_initiate(grpc_transport_setup *sp) { @@ -94,7 +94,6 @@ static void setup_initiate(grpc_transport_setup *sp) { int in_alarm = 0; r->setup = s; - /* TODO(klempner): Actually set a deadline */ r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60)); gpr_mu_lock(&s->mu); @@ -119,25 +118,23 @@ static void setup_initiate(grpc_transport_setup *sp) { } } +/** implementation of add_interested_party for setup vtable */ static void setup_add_interested_party(grpc_transport_setup *sp, grpc_pollset *pollset) { grpc_client_setup *s = (grpc_client_setup *)sp; gpr_mu_lock(&s->mu); - grpc_pollset_set_add_pollset(&s->interested_parties, pollset); - gpr_mu_unlock(&s->mu); } +/** implementation of del_interested_party for setup vtable */ static void setup_del_interested_party(grpc_transport_setup *sp, grpc_pollset *pollset) { grpc_client_setup *s = (grpc_client_setup *)sp; gpr_mu_lock(&s->mu); - grpc_pollset_set_del_pollset(&s->interested_parties, pollset); - gpr_mu_unlock(&s->mu); } @@ -234,7 +231,8 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r, return result; } -static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) { +static void backoff_alarm_done(void *arg /* grpc_client_setup_request */, + int success) { grpc_client_setup_request *r = arg; grpc_client_setup *s = r->setup; /* Handle status cancelled? */ diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index 36ead843d2..4db48df6cb 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -31,6 +31,7 @@ * */ +#include <stdlib.h> #include <grpc/compression.h> const char *grpc_compression_algorithm_name( @@ -47,3 +48,20 @@ const char *grpc_compression_algorithm_name( } return "error"; } + +/* TODO(dgq): Add the ability to specify parameters to the individual + * compression algorithms */ +grpc_compression_algorithm grpc_compression_algorithm_for_level( + grpc_compression_level level) { + switch (level) { + case GRPC_COMPRESS_LEVEL_NONE: + return GRPC_COMPRESS_NONE; + case GRPC_COMPRESS_LEVEL_LOW: + case GRPC_COMPRESS_LEVEL_MED: + case GRPC_COMPRESS_LEVEL_HIGH: + return GRPC_COMPRESS_DEFLATE; + default: + /* we shouldn't be making it here */ + abort(); + } +} diff --git a/src/core/compression/message_compress.h b/src/core/compression/message_compress.h index aba701a6ee..b3eb8f579f 100644 --- a/src/core/compression/message_compress.h +++ b/src/core/compression/message_compress.h @@ -49,4 +49,4 @@ int grpc_msg_compress(grpc_compression_algorithm algorithm, int grpc_msg_decompress(grpc_compression_algorithm algorithm, gpr_slice_buffer *input, gpr_slice_buffer *output); -#endif /* GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H */ +#endif /* GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H */ diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index cee374fe8e..06699e88c2 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -93,6 +93,10 @@ void grpc_httpcli_context_init(grpc_httpcli_context *context); void grpc_httpcli_context_destroy(grpc_httpcli_context *context); /* Asynchronously perform a HTTP GET. + 'context' specifies the http context under which to do the get + 'pollset' indicates a grpc_pollset that is interested in the result + of the get - work on this pollset may be used to progress the get + operation 'request' contains request parameters - these are caller owned and can be destroyed once the call returns 'deadline' contains a deadline for the request (or gpr_inf_future) @@ -106,7 +110,19 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset, grpc_httpcli_response_cb on_response, void *user_data); /* Asynchronously perform a HTTP POST. - When there is no body, pass in NULL as body_bytes. + 'context' specifies the http context under which to do the post + 'pollset' indicates a grpc_pollset that is interested in the result + of the post - work on this pollset may be used to progress the post + operation + 'request' contains request parameters - these are caller owned and can be + destroyed once the call returns + 'body_bytes' and 'body_size' specify the payload for the post. + When there is no body, pass in NULL as body_bytes. + 'deadline' contains a deadline for the request (or gpr_inf_future) + 'em' points to a caller owned event manager that must be alive for the + lifetime of the request + 'on_response' is a callback to report results to (and 'user_data' is a user + supplied pointer to pass to said call) Does not support ?var1=val1&var2=val2 in the path. */ void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset, const grpc_httpcli_request *request, diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index da3c94ec11..7472b6144f 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -65,7 +65,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset); May unlock GRPC_POLLSET_MU(pollset) during its execution. */ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline); -/* Break a pollset out of polling work +/* Break one polling thread out of polling work for this pollset. Requires GRPC_POLLSET_MU(pollset) locked. */ void grpc_pollset_kick(grpc_pollset *pollset); diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h index 0a404b601b..77e32a8d51 100644 --- a/src/core/iomgr/pollset_kick_posix.h +++ b/src/core/iomgr/pollset_kick_posix.h @@ -37,6 +37,11 @@ #include "src/core/iomgr/wakeup_fd_posix.h" #include <grpc/support/sync.h> +/* pollset kicking allows breaking a thread out of polling work for + a given pollset. + writing a byte to a pipe is used as a posix-ly portable base + mechanism, and eventfds are utilized on Linux for better performance. */ + typedef struct grpc_kick_fd_info { grpc_wakeup_fd_info wakeup_fd; /* used for polling list and free list */ @@ -67,7 +72,7 @@ void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state); * applicable. Intended for testing. */ void grpc_pollset_kick_global_init_fallback_fd(void); -/* Must be called before entering poll(). If return value is -1, this consumed +/* Must be called before entering poll(). If return value is NULL, this consumed an existing kick. Otherwise the return value is an FD to add to the poll set. */ grpc_kick_fd_info *grpc_pollset_kick_pre_poll( @@ -82,6 +87,7 @@ void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, grpc_kick_fd_info *fd_info); +/* Actually kick */ void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */ diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index e5e3435feb..b4a526b9e7 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -97,15 +97,7 @@ static int multipoll_with_epoll_pollset_maybe_work( * here. */ - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout_ms = -1; - } else { - timeout_ms = gpr_time_to_millis( - gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500))); - if (timeout_ms < 0) { - return 1; - } - } + timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); pollset->counter += 1; gpr_mu_unlock(&pollset->mu); diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index d21c52c0f0..2f108da66a 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -113,15 +113,7 @@ static int multipoll_with_poll_pollset_maybe_work( grpc_kick_fd_info *kfd; h = pollset->data.ptr; - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout = -1; - } else { - timeout = gpr_time_to_millis( - gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500))); - if (timeout < 0) { - return 1; - } - } + timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); if (h->pfd_capacity < h->fd_count + 1) { h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1); gpr_free(h->pfds); diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 0fe54c8f1d..46d3d132ce 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -163,7 +163,8 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); GPR_ASSERT(!pollset->shutting_down); pollset->shutting_down = 1; - if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && pollset->counter == 0) { + if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && + pollset->counter == 0) { pollset->called_shutdown = 1; call_shutdown = 1; } @@ -187,6 +188,22 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); } +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { + gpr_timespec timeout; + static const int max_spin_polling_us = 10; + if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + return -1; + } + if (gpr_time_cmp( + deadline, + gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) { + return 0; + } + timeout = gpr_time_sub(deadline, now); + return gpr_time_to_millis( + gpr_time_add(timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1))); +} + /* * basic_pollset - a vtable that provides polling for zero or one file * descriptor via poll() @@ -343,15 +360,7 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset, GRPC_FD_UNREF(fd, "basicpoll"); fd = pollset->data.ptr = NULL; } - if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { - timeout = -1; - } else { - timeout = gpr_time_to_millis( - gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500))); - if (timeout < 0) { - return 1; - } - } + timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state); if (kfd == NULL) { /* Already kicked */ diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 92c258e0cd..ba3d638d41 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -94,6 +94,15 @@ int grpc_kick_read_fd(grpc_pollset *p); /* Call after polling has been kicked to leave the kicked state */ void grpc_kick_drain(grpc_pollset *p); +/* Convert a timespec to milliseconds: + - very small or negative poll times are clamped to zero to do a + non-blocking poll (which becomes spin polling) + - other small values are rounded up to one millisecond + - longer than a millisecond polls are rounded up to the next nearest + millisecond to avoid spinning + - infinite timeouts are converted to -1 */ +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now); + /* turn a pollset into a multipoller: platform specific */ typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset, struct grpc_fd **fds, diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 335ffb21b9..98e3b552a7 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -39,7 +39,7 @@ /* A grpc_pollset_set is a set of pollsets that are interested in an action. Adding a pollset to a pollset_set automatically adds any fd's (etc) that have been registered with the set_set with that pollset. - Registering fd's automatically iterates all current pollsets. */ + Registering fd's automatically adds them to all current pollsets. */ #ifdef GPR_POSIX_SOCKET #include "src/core/iomgr/pollset_set_posix.h" diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 6a21925908..b9c209cd2c 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -42,9 +42,9 @@ void grpc_pollset_set_init(grpc_pollset_set *pollset_set) {} void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {} void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, - grpc_pollset *pollset) {} + grpc_pollset *pollset) {} void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set, - grpc_pollset *pollset) {} + grpc_pollset *pollset) {} #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h index f40a5043c8..0fa08b52b0 100644 --- a/src/core/iomgr/tcp_client.h +++ b/src/core/iomgr/tcp_client.h @@ -41,7 +41,9 @@ /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and - NULL on failure) */ + NULL on failure). + interested_parties points to a set of pollsets that would be interested + in this connection being established (in order to continue their work) */ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), void *arg, grpc_pollset_set *interested_parties, const struct sockaddr *addr, int addr_len, diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 2ac35f863a..5854031c9b 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -108,6 +108,7 @@ struct grpc_tcp_server { /* destroyed port count: how many ports are completely destroyed */ size_t destroyed_ports; + /* is this server shutting down? (boolean) */ int shutdown; /* all listening ports */ @@ -119,7 +120,9 @@ struct grpc_tcp_server { void (*shutdown_complete)(void *); void *shutdown_complete_arg; + /* all pollsets interested in new connections */ grpc_pollset **pollsets; + /* number of pollsets in the pollsets array */ size_t pollset_count; }; @@ -160,6 +163,9 @@ static void destroyed_port(void *server, int success) { static void dont_care_about_shutdown_completion(void *ignored) {} +/* called when all listening endpoints have been shutdown, so no further + events will be received on them - at this point it's safe to destroy + things */ static void deactivated_all_ports(grpc_tcp_server *s) { size_t i; diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index d11706ece0..e9bd45db68 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -53,6 +53,10 @@ typedef struct { grpc_credentials *creds; grpc_mdstr *host; grpc_mdstr *method; + /* pollset bound to this call; if we need to make external + network requests, they should be done under this pollset + so that work can progress when this call wants work to + progress */ grpc_pollset *pollset; grpc_transport_op op; size_t op_md_idx; diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c index f878e6aaa1..d249be7d2e 100644 --- a/src/core/support/log_win32.c +++ b/src/core/support/log_win32.c @@ -94,23 +94,22 @@ void gpr_default_log(gpr_log_func_args *args) { fprintf(stderr, "%s%s.%09u %5lu %s:%d] %s\n", gpr_log_severity_string(args->severity), time_buffer, - (int)(now.tv_nsec), GetCurrentThreadId(), - args->file, args->line, args->message); + (int)(now.tv_nsec), GetCurrentThreadId(), args->file, args->line, + args->message); } char *gpr_format_message(DWORD messageid) { LPTSTR tmessage; char *message; - DWORD status = FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | - FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, messageid, - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPTSTR)(&tmessage), 0, NULL); + DWORD status = FormatMessage( + FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPTSTR)(&tmessage), 0, NULL); if (status == 0) return gpr_strdup("Unable to retrieve error string"); message = gpr_tchar_to_char(tmessage); LocalFree(tmessage); return message; } -#endif /* GPR_WIN32 */ +#endif /* GPR_WIN32 */ diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c index 86829a686f..283db83833 100644 --- a/src/core/surface/byte_buffer_reader.c +++ b/src/core/surface/byte_buffer_reader.c @@ -64,11 +64,11 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, grpc_msg_decompress(reader->buffer_in->data.raw.compression, &reader->buffer_in->data.raw.slice_buffer, &decompressed_slices_buffer); - reader->buffer_out = grpc_raw_byte_buffer_create( - decompressed_slices_buffer.slices, - decompressed_slices_buffer.count); + reader->buffer_out = + grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, + decompressed_slices_buffer.count); gpr_slice_buffer_destroy(&decompressed_slices_buffer); - } else { /* not compressed, use the input buffer as output */ + } else { /* not compressed, use the input buffer as output */ reader->buffer_out = reader->buffer_in; } reader->current.index = 0; diff --git a/src/core/surface/call.c b/src/core/surface/call.c index bcb8c7bfec..cf0a595147 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1288,7 +1288,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: - if (!are_write_flags_valid(op->flags)){ + if (!are_write_flags_valid(op->flags)) { return GRPC_CALL_ERROR_INVALID_FLAGS; } req = &reqs[out++]; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index a613bac8cd..fb3662b50d 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -79,7 +79,7 @@ typedef union { typedef struct { grpc_ioreq_op op; grpc_ioreq_data data; - gpr_uint32 flags; /**< A copy of the write flags from grpc_op */ + gpr_uint32 flags; /**< A copy of the write flags from grpc_op */ } grpc_ioreq; typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index caaced75c4..1cd1dc822d 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1606,15 +1606,16 @@ static int init_settings_frame_parser(transport *t) { int ok; if (t->incoming_stream_id != 0) { - gpr_log(GPR_ERROR, "settings frame received for stream %d", t->incoming_stream_id); + gpr_log(GPR_ERROR, "settings frame received for stream %d", + t->incoming_stream_id); drop_connection(t); return 0; } ok = GRPC_CHTTP2_PARSE_OK == - grpc_chttp2_settings_parser_begin_frame( - &t->simple_parsers.settings, t->incoming_frame_size, - t->incoming_frame_flags, t->settings[PEER_SETTINGS]); + grpc_chttp2_settings_parser_begin_frame( + &t->simple_parsers.settings, t->incoming_frame_size, + t->incoming_frame_flags, t->settings[PEER_SETTINGS]); if (!ok) { drop_connection(t); return 0; diff --git a/src/cpp/client/channel_arguments.cc b/src/cpp/client/channel_arguments.cc index 87f8349eef..679c4f1503 100644 --- a/src/cpp/client/channel_arguments.cc +++ b/src/cpp/client/channel_arguments.cc @@ -34,6 +34,7 @@ #include <grpc++/channel_arguments.h> #include <grpc/grpc_security.h> +#include "src/core/channel/channel_args.h" namespace grpc { @@ -41,6 +42,10 @@ void ChannelArguments::SetSslTargetNameOverride(const grpc::string& name) { SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, name); } +void ChannelArguments::SetCompressionLevel(grpc_compression_level level) { + SetInt(GRPC_COMPRESSION_LEVEL_ARG, level); +} + grpc::string ChannelArguments::GetSslTargetNameOverride() const { for (unsigned int i = 0; i < args_.size(); i++) { if (grpc::string(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG) == args_[i].key) { diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 82ded5cc7a..21f94d3cf5 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -204,7 +204,7 @@ namespace Grpc.Core.Tests BenchmarkUtil.RunBenchmark(100, 100, () => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); }); } - + [Test] public void UnknownMethodHandler() { diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index d92a9d1b37..a9625a1799 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -100,7 +100,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; if (!host || !method) { [NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."]; } - // TODO(jcanizales): Throw if the requestWriter was already started. + if (requestWriter.state != GRXWriterStateNotStarted) { + [NSException raise:NSInvalidArgumentException format:@"The requests writer can't be already started."]; + } if ((self = [super init])) { static dispatch_once_t initialization; dispatch_once(&initialization, ^{ diff --git a/src/objective-c/README.md b/src/objective-c/README.md index 0e5eb9a25f..e997b76d14 100644 --- a/src/objective-c/README.md +++ b/src/objective-c/README.md @@ -163,7 +163,7 @@ files: * [Podspec](https://github.com/grpc/grpc/blob/master/gRPC.podspec) for the Objective-C gRPC runtime library. This can be tedious to configure manually. -* [Podspec](https://github.com/jcanizales/protobuf/blob/add-podspec/Protobuf.podspec) for the +* [Podspec](https://github.com/google/protobuf/blob/master/Protobuf.podspec) for the Objective-C Protobuf runtime library. [Protocol Buffers]:https://developers.google.com/protocol-buffers/ diff --git a/src/python/src/grpc/_adapter/_intermediary_low_test.py b/src/python/src/grpc/_adapter/_intermediary_low_test.py index 478346341b..1a9b0c69f3 100644 --- a/src/python/src/grpc/_adapter/_intermediary_low_test.py +++ b/src/python/src/grpc/_adapter/_intermediary_low_test.py @@ -29,6 +29,8 @@ """Tests for the old '_low'.""" +import Queue +import threading import time import unittest @@ -43,6 +45,7 @@ _BYTE_SEQUENCE_SEQUENCE = tuple( bytes(bytearray((row + column) % 256 for column in range(row))) for row in range(_STREAM_LENGTH)) + class LonelyClientTest(unittest.TestCase): def testLonelyClient(self): @@ -79,6 +82,14 @@ class LonelyClientTest(unittest.TestCase): del completion_queue +def _drive_completion_queue(completion_queue, event_queue): + while True: + event = completion_queue.get(_FUTURE) + if event.kind is _low.Event.Kind.STOP: + break + event_queue.put(event) + + class EchoTest(unittest.TestCase): def setUp(self): @@ -88,24 +99,26 @@ class EchoTest(unittest.TestCase): self.server = _low.Server(self.server_completion_queue) port = self.server.add_http2_addr('[::]:0') self.server.start() + self.server_events = Queue.Queue() + self.server_completion_queue_thread = threading.Thread( + target=_drive_completion_queue, + args=(self.server_completion_queue, self.server_events)) + self.server_completion_queue_thread.start() self.client_completion_queue = _low.CompletionQueue() self.channel = _low.Channel('%s:%d' % (self.host, port), None) + self.client_events = Queue.Queue() + self.client_completion_queue_thread = threading.Thread( + target=_drive_completion_queue, + args=(self.client_completion_queue, self.client_events)) + self.client_completion_queue_thread.start() def tearDown(self): self.server.stop() self.server_completion_queue.stop() self.client_completion_queue.stop() - while True: - event = self.server_completion_queue.get(_FUTURE) - if event is not None and event.kind is _low.Event.Kind.STOP: - break - while True: - event = self.client_completion_queue.get(_FUTURE) - if event is not None and event.kind is _low.Event.Kind.STOP: - break - self.server_completion_queue = None - self.client_completion_queue = None + self.server_completion_queue_thread.join() + self.client_completion_queue_thread.join() del self.server def _perform_echo_test(self, test_data): @@ -144,7 +157,7 @@ class EchoTest(unittest.TestCase): client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) self.server.service(service_tag) - service_accepted = self.server_completion_queue.get(_FUTURE) + service_accepted = self.server_events.get() self.assertIsNotNone(service_accepted) self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED) self.assertIs(service_accepted.tag, service_tag) @@ -165,7 +178,7 @@ class EchoTest(unittest.TestCase): server_leading_binary_metadata_value) server_call.premetadata() - metadata_accepted = self.client_completion_queue.get(_FUTURE) + metadata_accepted = self.client_events.get() self.assertIsNotNone(metadata_accepted) self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind) self.assertEqual(metadata_tag, metadata_accepted.tag) @@ -179,14 +192,14 @@ class EchoTest(unittest.TestCase): for datum in test_data: client_call.write(datum, write_tag) - write_accepted = self.client_completion_queue.get(_FUTURE) + write_accepted = self.client_events.get() self.assertIsNotNone(write_accepted) self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED) self.assertIs(write_accepted.tag, write_tag) self.assertIs(write_accepted.write_accepted, True) server_call.read(read_tag) - read_accepted = self.server_completion_queue.get(_FUTURE) + read_accepted = self.server_events.get() self.assertIsNotNone(read_accepted) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertEqual(read_tag, read_accepted.tag) @@ -194,14 +207,14 @@ class EchoTest(unittest.TestCase): server_data.append(read_accepted.bytes) server_call.write(read_accepted.bytes, write_tag) - write_accepted = self.server_completion_queue.get(_FUTURE) + write_accepted = self.server_events.get() self.assertIsNotNone(write_accepted) self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind) self.assertEqual(write_tag, write_accepted.tag) self.assertTrue(write_accepted.write_accepted) client_call.read(read_tag) - read_accepted = self.client_completion_queue.get(_FUTURE) + read_accepted = self.client_events.get() self.assertIsNotNone(read_accepted) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertEqual(read_tag, read_accepted.tag) @@ -209,14 +222,14 @@ class EchoTest(unittest.TestCase): client_data.append(read_accepted.bytes) client_call.complete(complete_tag) - complete_accepted = self.client_completion_queue.get(_FUTURE) + complete_accepted = self.client_events.get() self.assertIsNotNone(complete_accepted) self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED) self.assertIs(complete_accepted.tag, complete_tag) self.assertIs(complete_accepted.complete_accepted, True) server_call.read(read_tag) - read_accepted = self.server_completion_queue.get(_FUTURE) + read_accepted = self.server_events.get() self.assertIsNotNone(read_accepted) self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) self.assertEqual(read_tag, read_accepted.tag) @@ -228,8 +241,8 @@ class EchoTest(unittest.TestCase): server_trailing_binary_metadata_value) server_call.status(_low.Status(_low.Code.OK, details), status_tag) - server_terminal_event_one = self.server_completion_queue.get(_FUTURE) - server_terminal_event_two = self.server_completion_queue.get(_FUTURE) + server_terminal_event_one = self.server_events.get() + server_terminal_event_two = self.server_events.get() if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED: status_accepted = server_terminal_event_one rpc_accepted = server_terminal_event_two @@ -246,8 +259,8 @@ class EchoTest(unittest.TestCase): self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status) client_call.read(read_tag) - client_terminal_event_one = self.client_completion_queue.get(_FUTURE) - client_terminal_event_two = self.client_completion_queue.get(_FUTURE) + client_terminal_event_one = self.client_events.get() + client_terminal_event_two = self.client_events.get() if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: read_accepted = client_terminal_event_one finish_accepted = client_terminal_event_two @@ -303,22 +316,26 @@ class CancellationTest(unittest.TestCase): self.server = _low.Server(self.server_completion_queue) port = self.server.add_http2_addr('[::]:0') self.server.start() + self.server_events = Queue.Queue() + self.server_completion_queue_thread = threading.Thread( + target=_drive_completion_queue, + args=(self.server_completion_queue, self.server_events)) + self.server_completion_queue_thread.start() self.client_completion_queue = _low.CompletionQueue() self.channel = _low.Channel('%s:%d' % (self.host, port), None) + self.client_events = Queue.Queue() + self.client_completion_queue_thread = threading.Thread( + target=_drive_completion_queue, + args=(self.client_completion_queue, self.client_events)) + self.client_completion_queue_thread.start() def tearDown(self): self.server.stop() self.server_completion_queue.stop() self.client_completion_queue.stop() - while True: - event = self.server_completion_queue.get(0) - if event is not None and event.kind is _low.Event.Kind.STOP: - break - while True: - event = self.client_completion_queue.get(0) - if event is not None and event.kind is _low.Event.Kind.STOP: - break + self.server_completion_queue_thread.join() + self.client_completion_queue_thread.join() del self.server def testCancellation(self): @@ -340,29 +357,29 @@ class CancellationTest(unittest.TestCase): client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) self.server.service(service_tag) - service_accepted = self.server_completion_queue.get(_FUTURE) + service_accepted = self.server_events.get() server_call = service_accepted.service_acceptance.call server_call.accept(self.server_completion_queue, finish_tag) server_call.premetadata() - metadata_accepted = self.client_completion_queue.get(_FUTURE) + metadata_accepted = self.client_events.get() self.assertIsNotNone(metadata_accepted) for datum in test_data: client_call.write(datum, write_tag) - write_accepted = self.client_completion_queue.get(_FUTURE) + write_accepted = self.client_events.get() server_call.read(read_tag) - read_accepted = self.server_completion_queue.get(_FUTURE) + read_accepted = self.server_events.get() server_data.append(read_accepted.bytes) server_call.write(read_accepted.bytes, write_tag) - write_accepted = self.server_completion_queue.get(_FUTURE) + write_accepted = self.server_events.get() self.assertIsNotNone(write_accepted) client_call.read(read_tag) - read_accepted = self.client_completion_queue.get(_FUTURE) + read_accepted = self.client_events.get() client_data.append(read_accepted.bytes) client_call.cancel() @@ -373,8 +390,8 @@ class CancellationTest(unittest.TestCase): server_call.read(read_tag) - server_terminal_event_one = self.server_completion_queue.get(_FUTURE) - server_terminal_event_two = self.server_completion_queue.get(_FUTURE) + server_terminal_event_one = self.server_events.get() + server_terminal_event_two = self.server_events.get() if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: read_accepted = server_terminal_event_one rpc_accepted = server_terminal_event_two @@ -388,7 +405,7 @@ class CancellationTest(unittest.TestCase): self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status) - finish_event = self.client_completion_queue.get(_FUTURE) + finish_event = self.client_events.get() self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind) self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'), finish_event.status) diff --git a/src/ruby/.rspec b/src/ruby/.rspec index dd579f7a13..cd7c5fb5b2 100755 --- a/src/ruby/.rspec +++ b/src/ruby/.rspec @@ -1,2 +1,4 @@ -I. --require spec_helper +--format documentation +--color diff --git a/src/ruby/spec/spec_helper.rb b/src/ruby/spec/spec_helper.rb index 101165c146..270d2e97d3 100644 --- a/src/ruby/spec/spec_helper.rb +++ b/src/ruby/spec/spec_helper.rb @@ -53,3 +53,5 @@ RSpec.configure do |config| include RSpec::LoggingHelper config.capture_log_messages end + +RSpec::Expectations.configuration.warn_about_potential_false_positives = false |