aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2015-06-18 08:42:29 -0700
committerGravatar Vijay Pai <vpai@google.com>2015-06-18 08:42:29 -0700
commitd03adc340ae04df7ba6635192fdd3bfa9a3c607f (patch)
treed13cd0303fcb3b9b453e40d57412d7751410dfc3 /src
parentd2a82d9da7dbe2759d7bdd400cc835229e904c07 (diff)
parent6174b9a4d092c145d6cd3c90ab59bf5a0939329e (diff)
Merge remote-tracking branch 'ctiller-repo/we-dont-need-no-backup' into poller2
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/channel_args.c24
-rw-r--r--src/core/channel/channel_args.h18
-rw-r--r--src/core/channel/child_channel.c3
-rw-r--r--src/core/channel/client_setup.c16
-rw-r--r--src/core/compression/algorithm.c18
-rw-r--r--src/core/compression/message_compress.h2
-rw-r--r--src/core/httpcli/httpcli.h18
-rw-r--r--src/core/iomgr/pollset.h2
-rw-r--r--src/core/iomgr/pollset_kick_posix.h8
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c10
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c10
-rw-r--r--src/core/iomgr/pollset_posix.c29
-rw-r--r--src/core/iomgr/pollset_posix.h9
-rw-r--r--src/core/iomgr/pollset_set.h2
-rw-r--r--src/core/iomgr/pollset_set_windows.c4
-rw-r--r--src/core/iomgr/tcp_client.h4
-rw-r--r--src/core/iomgr/tcp_server_posix.c6
-rw-r--r--src/core/security/client_auth_filter.c4
-rw-r--r--src/core/support/log_win32.c17
-rw-r--r--src/core/surface/byte_buffer_reader.c8
-rw-r--r--src/core/surface/call.c2
-rw-r--r--src/core/surface/call.h2
-rw-r--r--src/core/transport/chttp2_transport.c9
-rw-r--r--src/cpp/client/channel_arguments.cc5
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs2
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m4
-rw-r--r--src/objective-c/README.md2
-rw-r--r--src/python/src/grpc/_adapter/_intermediary_low_test.py95
-rwxr-xr-xsrc/ruby/.rspec2
-rw-r--r--src/ruby/spec/spec_helper.rb2
30 files changed, 224 insertions, 113 deletions
diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c
index 1b0e33b123..166d559a45 100644
--- a/src/core/channel/channel_args.c
+++ b/src/core/channel/channel_args.c
@@ -115,3 +115,27 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) {
}
return 0;
}
+
+grpc_compression_level grpc_channel_args_get_compression_level(
+ const grpc_channel_args *a) {
+ size_t i;
+ if (a) {
+ for (i = 0; a && i < a->num_args; ++i) {
+ if (a->args[i].type == GRPC_ARG_INTEGER &&
+ !strcmp(GRPC_COMPRESSION_LEVEL_ARG, a->args[i].key)) {
+ return a->args[i].value.integer;
+ break;
+ }
+ }
+ }
+ return GRPC_COMPRESS_LEVEL_NONE;
+}
+
+void grpc_channel_args_set_compression_level(
+ grpc_channel_args **a, grpc_compression_level level) {
+ grpc_arg tmp;
+ tmp.type = GRPC_ARG_INTEGER;
+ tmp.key = GRPC_COMPRESSION_LEVEL_ARG;
+ tmp.value.integer = level;
+ *a = grpc_channel_args_copy_and_add(*a, &tmp);
+}
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h
index eb5bf63986..bf747b26e6 100644
--- a/src/core/channel/channel_args.h
+++ b/src/core/channel/channel_args.h
@@ -34,21 +34,31 @@
#ifndef GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H
#define GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H
+#include <grpc/compression.h>
#include <grpc/grpc.h>
/* Copy some arguments */
grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
-/* Copy some arguments and add the to_add parameter in the end.
+/** Copy some arguments and add the to_add parameter in the end.
If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */
grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
const grpc_arg *to_add);
-/* Destroy arguments created by grpc_channel_args_copy */
+/** Destroy arguments created by grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args *a);
-/* Reads census_enabled settings from channel args. Returns 1 if census_enabled
- is specified in channel args, otherwise returns 0. */
+/** Reads census_enabled settings from channel args. Returns 1 if census_enabled
+ * is specified in channel args, otherwise returns 0. */
int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
+/** Returns the compression level set in \a a. */
+grpc_compression_level grpc_channel_args_get_compression_level(
+ const grpc_channel_args *a);
+
+/** Sets the compression level in \a a to \a level. Setting it to
+ * GRPC_COMPRESS_LEVEL_NONE disables compression for the channel. */
+void grpc_channel_args_set_compression_level(
+ grpc_channel_args **a, grpc_compression_level level);
+
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index d3e6af7287..6690265d75 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -158,11 +158,8 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) {
const grpc_channel_filter grpc_child_channel_top_filter = {
lb_start_transport_op, lb_channel_op,
-
sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem,
-
sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem,
-
"child-channel",
};
diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c
index 42ee23d87f..5be8fa66e9 100644
--- a/src/core/channel/client_setup.c
+++ b/src/core/channel/client_setup.c
@@ -56,6 +56,8 @@ struct grpc_client_setup {
gpr_cv cv;
grpc_client_setup_request *active_request;
int refs;
+ /** The set of pollsets that are currently interested in this
+ connection being established */
grpc_pollset_set interested_parties;
};
@@ -83,9 +85,7 @@ static void destroy_setup(grpc_client_setup *s) {
gpr_free(s);
}
-static void destroy_request(grpc_client_setup_request *r) {
- gpr_free(r);
-}
+static void destroy_request(grpc_client_setup_request *r) { gpr_free(r); }
/* initiate handshaking */
static void setup_initiate(grpc_transport_setup *sp) {
@@ -94,7 +94,6 @@ static void setup_initiate(grpc_transport_setup *sp) {
int in_alarm = 0;
r->setup = s;
- /* TODO(klempner): Actually set a deadline */
r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60));
gpr_mu_lock(&s->mu);
@@ -119,25 +118,23 @@ static void setup_initiate(grpc_transport_setup *sp) {
}
}
+/** implementation of add_interested_party for setup vtable */
static void setup_add_interested_party(grpc_transport_setup *sp,
grpc_pollset *pollset) {
grpc_client_setup *s = (grpc_client_setup *)sp;
gpr_mu_lock(&s->mu);
-
grpc_pollset_set_add_pollset(&s->interested_parties, pollset);
-
gpr_mu_unlock(&s->mu);
}
+/** implementation of del_interested_party for setup vtable */
static void setup_del_interested_party(grpc_transport_setup *sp,
grpc_pollset *pollset) {
grpc_client_setup *s = (grpc_client_setup *)sp;
gpr_mu_lock(&s->mu);
-
grpc_pollset_set_del_pollset(&s->interested_parties, pollset);
-
gpr_mu_unlock(&s->mu);
}
@@ -234,7 +231,8 @@ int grpc_client_setup_request_should_continue(grpc_client_setup_request *r,
return result;
}
-static void backoff_alarm_done(void *arg /* grpc_client_setup */, int success) {
+static void backoff_alarm_done(void *arg /* grpc_client_setup_request */,
+ int success) {
grpc_client_setup_request *r = arg;
grpc_client_setup *s = r->setup;
/* Handle status cancelled? */
diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c
index 36ead843d2..4db48df6cb 100644
--- a/src/core/compression/algorithm.c
+++ b/src/core/compression/algorithm.c
@@ -31,6 +31,7 @@
*
*/
+#include <stdlib.h>
#include <grpc/compression.h>
const char *grpc_compression_algorithm_name(
@@ -47,3 +48,20 @@ const char *grpc_compression_algorithm_name(
}
return "error";
}
+
+/* TODO(dgq): Add the ability to specify parameters to the individual
+ * compression algorithms */
+grpc_compression_algorithm grpc_compression_algorithm_for_level(
+ grpc_compression_level level) {
+ switch (level) {
+ case GRPC_COMPRESS_LEVEL_NONE:
+ return GRPC_COMPRESS_NONE;
+ case GRPC_COMPRESS_LEVEL_LOW:
+ case GRPC_COMPRESS_LEVEL_MED:
+ case GRPC_COMPRESS_LEVEL_HIGH:
+ return GRPC_COMPRESS_DEFLATE;
+ default:
+ /* we shouldn't be making it here */
+ abort();
+ }
+}
diff --git a/src/core/compression/message_compress.h b/src/core/compression/message_compress.h
index aba701a6ee..b3eb8f579f 100644
--- a/src/core/compression/message_compress.h
+++ b/src/core/compression/message_compress.h
@@ -49,4 +49,4 @@ int grpc_msg_compress(grpc_compression_algorithm algorithm,
int grpc_msg_decompress(grpc_compression_algorithm algorithm,
gpr_slice_buffer *input, gpr_slice_buffer *output);
-#endif /* GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H */
+#endif /* GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H */
diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h
index cee374fe8e..06699e88c2 100644
--- a/src/core/httpcli/httpcli.h
+++ b/src/core/httpcli/httpcli.h
@@ -93,6 +93,10 @@ void grpc_httpcli_context_init(grpc_httpcli_context *context);
void grpc_httpcli_context_destroy(grpc_httpcli_context *context);
/* Asynchronously perform a HTTP GET.
+ 'context' specifies the http context under which to do the get
+ 'pollset' indicates a grpc_pollset that is interested in the result
+ of the get - work on this pollset may be used to progress the get
+ operation
'request' contains request parameters - these are caller owned and can be
destroyed once the call returns
'deadline' contains a deadline for the request (or gpr_inf_future)
@@ -106,7 +110,19 @@ void grpc_httpcli_get(grpc_httpcli_context *context, grpc_pollset *pollset,
grpc_httpcli_response_cb on_response, void *user_data);
/* Asynchronously perform a HTTP POST.
- When there is no body, pass in NULL as body_bytes.
+ 'context' specifies the http context under which to do the post
+ 'pollset' indicates a grpc_pollset that is interested in the result
+ of the post - work on this pollset may be used to progress the post
+ operation
+ 'request' contains request parameters - these are caller owned and can be
+ destroyed once the call returns
+ 'body_bytes' and 'body_size' specify the payload for the post.
+ When there is no body, pass in NULL as body_bytes.
+ 'deadline' contains a deadline for the request (or gpr_inf_future)
+ 'em' points to a caller owned event manager that must be alive for the
+ lifetime of the request
+ 'on_response' is a callback to report results to (and 'user_data' is a user
+ supplied pointer to pass to said call)
Does not support ?var1=val1&var2=val2 in the path. */
void grpc_httpcli_post(grpc_httpcli_context *context, grpc_pollset *pollset,
const grpc_httpcli_request *request,
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index da3c94ec11..7472b6144f 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -65,7 +65,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
May unlock GRPC_POLLSET_MU(pollset) during its execution. */
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline);
-/* Break a pollset out of polling work
+/* Break one polling thread out of polling work for this pollset.
Requires GRPC_POLLSET_MU(pollset) locked. */
void grpc_pollset_kick(grpc_pollset *pollset);
diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h
index 0a404b601b..77e32a8d51 100644
--- a/src/core/iomgr/pollset_kick_posix.h
+++ b/src/core/iomgr/pollset_kick_posix.h
@@ -37,6 +37,11 @@
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <grpc/support/sync.h>
+/* pollset kicking allows breaking a thread out of polling work for
+ a given pollset.
+ writing a byte to a pipe is used as a posix-ly portable base
+ mechanism, and eventfds are utilized on Linux for better performance. */
+
typedef struct grpc_kick_fd_info {
grpc_wakeup_fd_info wakeup_fd;
/* used for polling list and free list */
@@ -67,7 +72,7 @@ void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state);
* applicable. Intended for testing. */
void grpc_pollset_kick_global_init_fallback_fd(void);
-/* Must be called before entering poll(). If return value is -1, this consumed
+/* Must be called before entering poll(). If return value is NULL, this consumed
an existing kick. Otherwise the return value is an FD to add to the poll set.
*/
grpc_kick_fd_info *grpc_pollset_kick_pre_poll(
@@ -82,6 +87,7 @@ void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state,
void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state,
grpc_kick_fd_info *fd_info);
+/* Actually kick */
void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index e5e3435feb..b4a526b9e7 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -97,15 +97,7 @@ static int multipoll_with_epoll_pollset_maybe_work(
* here.
*/
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
- timeout_ms = -1;
- } else {
- timeout_ms = gpr_time_to_millis(
- gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500)));
- if (timeout_ms < 0) {
- return 1;
- }
- }
+ timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
pollset->counter += 1;
gpr_mu_unlock(&pollset->mu);
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index d21c52c0f0..2f108da66a 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -113,15 +113,7 @@ static int multipoll_with_poll_pollset_maybe_work(
grpc_kick_fd_info *kfd;
h = pollset->data.ptr;
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
- timeout = -1;
- } else {
- timeout = gpr_time_to_millis(
- gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500)));
- if (timeout < 0) {
- return 1;
- }
- }
+ timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
if (h->pfd_capacity < h->fd_count + 1) {
h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
gpr_free(h->pfds);
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 0fe54c8f1d..46d3d132ce 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -163,7 +163,8 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
gpr_mu_lock(&pollset->mu);
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
- if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && pollset->counter == 0) {
+ if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
+ pollset->counter == 0) {
pollset->called_shutdown = 1;
call_shutdown = 1;
}
@@ -187,6 +188,22 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
+int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) {
+ gpr_timespec timeout;
+ static const int max_spin_polling_us = 10;
+ if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
+ return -1;
+ }
+ if (gpr_time_cmp(
+ deadline,
+ gpr_time_add(now, gpr_time_from_micros(max_spin_polling_us))) <= 0) {
+ return 0;
+ }
+ timeout = gpr_time_sub(deadline, now);
+ return gpr_time_to_millis(
+ gpr_time_add(timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1)));
+}
+
/*
* basic_pollset - a vtable that provides polling for zero or one file
* descriptor via poll()
@@ -343,15 +360,7 @@ static int basic_pollset_maybe_work(grpc_pollset *pollset,
GRPC_FD_UNREF(fd, "basicpoll");
fd = pollset->data.ptr = NULL;
}
- if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
- timeout = -1;
- } else {
- timeout = gpr_time_to_millis(
- gpr_time_add(gpr_time_sub(deadline, now), gpr_time_from_micros(500)));
- if (timeout < 0) {
- return 1;
- }
- }
+ timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state);
if (kfd == NULL) {
/* Already kicked */
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 92c258e0cd..ba3d638d41 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -94,6 +94,15 @@ int grpc_kick_read_fd(grpc_pollset *p);
/* Call after polling has been kicked to leave the kicked state */
void grpc_kick_drain(grpc_pollset *p);
+/* Convert a timespec to milliseconds:
+ - very small or negative poll times are clamped to zero to do a
+ non-blocking poll (which becomes spin polling)
+ - other small values are rounded up to one millisecond
+ - longer than a millisecond polls are rounded up to the next nearest
+ millisecond to avoid spinning
+ - infinite timeouts are converted to -1 */
+int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now);
+
/* turn a pollset into a multipoller: platform specific */
typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset,
struct grpc_fd **fds,
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 335ffb21b9..98e3b552a7 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -39,7 +39,7 @@
/* A grpc_pollset_set is a set of pollsets that are interested in an
action. Adding a pollset to a pollset_set automatically adds any
fd's (etc) that have been registered with the set_set with that pollset.
- Registering fd's automatically iterates all current pollsets. */
+ Registering fd's automatically adds them to all current pollsets. */
#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/pollset_set_posix.h"
diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c
index 6a21925908..b9c209cd2c 100644
--- a/src/core/iomgr/pollset_set_windows.c
+++ b/src/core/iomgr/pollset_set_windows.c
@@ -42,9 +42,9 @@ void grpc_pollset_set_init(grpc_pollset_set *pollset_set) {}
void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {}
void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set,
- grpc_pollset *pollset) {}
+ grpc_pollset *pollset) {}
void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set,
- grpc_pollset *pollset) {}
+ grpc_pollset *pollset) {}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h
index f40a5043c8..0fa08b52b0 100644
--- a/src/core/iomgr/tcp_client.h
+++ b/src/core/iomgr/tcp_client.h
@@ -41,7 +41,9 @@
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
- NULL on failure) */
+ NULL on failure).
+ interested_parties points to a set of pollsets that would be interested
+ in this connection being established (in order to continue their work) */
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
void *arg, grpc_pollset_set *interested_parties,
const struct sockaddr *addr, int addr_len,
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 2ac35f863a..5854031c9b 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -108,6 +108,7 @@ struct grpc_tcp_server {
/* destroyed port count: how many ports are completely destroyed */
size_t destroyed_ports;
+ /* is this server shutting down? (boolean) */
int shutdown;
/* all listening ports */
@@ -119,7 +120,9 @@ struct grpc_tcp_server {
void (*shutdown_complete)(void *);
void *shutdown_complete_arg;
+ /* all pollsets interested in new connections */
grpc_pollset **pollsets;
+ /* number of pollsets in the pollsets array */
size_t pollset_count;
};
@@ -160,6 +163,9 @@ static void destroyed_port(void *server, int success) {
static void dont_care_about_shutdown_completion(void *ignored) {}
+/* called when all listening endpoints have been shutdown, so no further
+ events will be received on them - at this point it's safe to destroy
+ things */
static void deactivated_all_ports(grpc_tcp_server *s) {
size_t i;
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index d11706ece0..e9bd45db68 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -53,6 +53,10 @@ typedef struct {
grpc_credentials *creds;
grpc_mdstr *host;
grpc_mdstr *method;
+ /* pollset bound to this call; if we need to make external
+ network requests, they should be done under this pollset
+ so that work can progress when this call wants work to
+ progress */
grpc_pollset *pollset;
grpc_transport_op op;
size_t op_md_idx;
diff --git a/src/core/support/log_win32.c b/src/core/support/log_win32.c
index f878e6aaa1..d249be7d2e 100644
--- a/src/core/support/log_win32.c
+++ b/src/core/support/log_win32.c
@@ -94,23 +94,22 @@ void gpr_default_log(gpr_log_func_args *args) {
fprintf(stderr, "%s%s.%09u %5lu %s:%d] %s\n",
gpr_log_severity_string(args->severity), time_buffer,
- (int)(now.tv_nsec), GetCurrentThreadId(),
- args->file, args->line, args->message);
+ (int)(now.tv_nsec), GetCurrentThreadId(), args->file, args->line,
+ args->message);
}
char *gpr_format_message(DWORD messageid) {
LPTSTR tmessage;
char *message;
- DWORD status = FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER |
- FORMAT_MESSAGE_FROM_SYSTEM |
- FORMAT_MESSAGE_IGNORE_INSERTS,
- NULL, messageid,
- MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
- (LPTSTR)(&tmessage), 0, NULL);
+ DWORD status = FormatMessage(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
+ FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL, messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR)(&tmessage), 0, NULL);
if (status == 0) return gpr_strdup("Unable to retrieve error string");
message = gpr_tchar_to_char(tmessage);
LocalFree(tmessage);
return message;
}
-#endif /* GPR_WIN32 */
+#endif /* GPR_WIN32 */
diff --git a/src/core/surface/byte_buffer_reader.c b/src/core/surface/byte_buffer_reader.c
index 86829a686f..283db83833 100644
--- a/src/core/surface/byte_buffer_reader.c
+++ b/src/core/surface/byte_buffer_reader.c
@@ -64,11 +64,11 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_msg_decompress(reader->buffer_in->data.raw.compression,
&reader->buffer_in->data.raw.slice_buffer,
&decompressed_slices_buffer);
- reader->buffer_out = grpc_raw_byte_buffer_create(
- decompressed_slices_buffer.slices,
- decompressed_slices_buffer.count);
+ reader->buffer_out =
+ grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
+ decompressed_slices_buffer.count);
gpr_slice_buffer_destroy(&decompressed_slices_buffer);
- } else { /* not compressed, use the input buffer as output */
+ } else { /* not compressed, use the input buffer as output */
reader->buffer_out = reader->buffer_in;
}
reader->current.index = 0;
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index bcb8c7bfec..cf0a595147 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -1288,7 +1288,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->flags = op->flags;
break;
case GRPC_OP_SEND_MESSAGE:
- if (!are_write_flags_valid(op->flags)){
+ if (!are_write_flags_valid(op->flags)) {
return GRPC_CALL_ERROR_INVALID_FLAGS;
}
req = &reqs[out++];
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index a613bac8cd..fb3662b50d 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -79,7 +79,7 @@ typedef union {
typedef struct {
grpc_ioreq_op op;
grpc_ioreq_data data;
- gpr_uint32 flags; /**< A copy of the write flags from grpc_op */
+ gpr_uint32 flags; /**< A copy of the write flags from grpc_op */
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index caaced75c4..1cd1dc822d 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -1606,15 +1606,16 @@ static int init_settings_frame_parser(transport *t) {
int ok;
if (t->incoming_stream_id != 0) {
- gpr_log(GPR_ERROR, "settings frame received for stream %d", t->incoming_stream_id);
+ gpr_log(GPR_ERROR, "settings frame received for stream %d",
+ t->incoming_stream_id);
drop_connection(t);
return 0;
}
ok = GRPC_CHTTP2_PARSE_OK ==
- grpc_chttp2_settings_parser_begin_frame(
- &t->simple_parsers.settings, t->incoming_frame_size,
- t->incoming_frame_flags, t->settings[PEER_SETTINGS]);
+ grpc_chttp2_settings_parser_begin_frame(
+ &t->simple_parsers.settings, t->incoming_frame_size,
+ t->incoming_frame_flags, t->settings[PEER_SETTINGS]);
if (!ok) {
drop_connection(t);
return 0;
diff --git a/src/cpp/client/channel_arguments.cc b/src/cpp/client/channel_arguments.cc
index 87f8349eef..679c4f1503 100644
--- a/src/cpp/client/channel_arguments.cc
+++ b/src/cpp/client/channel_arguments.cc
@@ -34,6 +34,7 @@
#include <grpc++/channel_arguments.h>
#include <grpc/grpc_security.h>
+#include "src/core/channel/channel_args.h"
namespace grpc {
@@ -41,6 +42,10 @@ void ChannelArguments::SetSslTargetNameOverride(const grpc::string& name) {
SetString(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, name);
}
+void ChannelArguments::SetCompressionLevel(grpc_compression_level level) {
+ SetInt(GRPC_COMPRESSION_LEVEL_ARG, level);
+}
+
grpc::string ChannelArguments::GetSslTargetNameOverride() const {
for (unsigned int i = 0; i < args_.size(); i++) {
if (grpc::string(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG) == args_[i].key) {
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 82ded5cc7a..21f94d3cf5 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -204,7 +204,7 @@ namespace Grpc.Core.Tests
BenchmarkUtil.RunBenchmark(100, 100,
() => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); });
}
-
+
[Test]
public void UnknownMethodHandler()
{
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index d92a9d1b37..a9625a1799 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -100,7 +100,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
if (!host || !method) {
[NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."];
}
- // TODO(jcanizales): Throw if the requestWriter was already started.
+ if (requestWriter.state != GRXWriterStateNotStarted) {
+ [NSException raise:NSInvalidArgumentException format:@"The requests writer can't be already started."];
+ }
if ((self = [super init])) {
static dispatch_once_t initialization;
dispatch_once(&initialization, ^{
diff --git a/src/objective-c/README.md b/src/objective-c/README.md
index 0e5eb9a25f..e997b76d14 100644
--- a/src/objective-c/README.md
+++ b/src/objective-c/README.md
@@ -163,7 +163,7 @@ files:
* [Podspec](https://github.com/grpc/grpc/blob/master/gRPC.podspec) for the Objective-C gRPC runtime
library. This can be tedious to configure manually.
-* [Podspec](https://github.com/jcanizales/protobuf/blob/add-podspec/Protobuf.podspec) for the
+* [Podspec](https://github.com/google/protobuf/blob/master/Protobuf.podspec) for the
Objective-C Protobuf runtime library.
[Protocol Buffers]:https://developers.google.com/protocol-buffers/
diff --git a/src/python/src/grpc/_adapter/_intermediary_low_test.py b/src/python/src/grpc/_adapter/_intermediary_low_test.py
index 478346341b..1a9b0c69f3 100644
--- a/src/python/src/grpc/_adapter/_intermediary_low_test.py
+++ b/src/python/src/grpc/_adapter/_intermediary_low_test.py
@@ -29,6 +29,8 @@
"""Tests for the old '_low'."""
+import Queue
+import threading
import time
import unittest
@@ -43,6 +45,7 @@ _BYTE_SEQUENCE_SEQUENCE = tuple(
bytes(bytearray((row + column) % 256 for column in range(row)))
for row in range(_STREAM_LENGTH))
+
class LonelyClientTest(unittest.TestCase):
def testLonelyClient(self):
@@ -79,6 +82,14 @@ class LonelyClientTest(unittest.TestCase):
del completion_queue
+def _drive_completion_queue(completion_queue, event_queue):
+ while True:
+ event = completion_queue.get(_FUTURE)
+ if event.kind is _low.Event.Kind.STOP:
+ break
+ event_queue.put(event)
+
+
class EchoTest(unittest.TestCase):
def setUp(self):
@@ -88,24 +99,26 @@ class EchoTest(unittest.TestCase):
self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0')
self.server.start()
+ self.server_events = Queue.Queue()
+ self.server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.server_completion_queue, self.server_events))
+ self.server_completion_queue_thread.start()
self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None)
+ self.client_events = Queue.Queue()
+ self.client_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.client_completion_queue, self.client_events))
+ self.client_completion_queue_thread.start()
def tearDown(self):
self.server.stop()
self.server_completion_queue.stop()
self.client_completion_queue.stop()
- while True:
- event = self.server_completion_queue.get(_FUTURE)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
- while True:
- event = self.client_completion_queue.get(_FUTURE)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
- self.server_completion_queue = None
- self.client_completion_queue = None
+ self.server_completion_queue_thread.join()
+ self.client_completion_queue_thread.join()
del self.server
def _perform_echo_test(self, test_data):
@@ -144,7 +157,7 @@ class EchoTest(unittest.TestCase):
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag)
- service_accepted = self.server_completion_queue.get(_FUTURE)
+ service_accepted = self.server_events.get()
self.assertIsNotNone(service_accepted)
self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
self.assertIs(service_accepted.tag, service_tag)
@@ -165,7 +178,7 @@ class EchoTest(unittest.TestCase):
server_leading_binary_metadata_value)
server_call.premetadata()
- metadata_accepted = self.client_completion_queue.get(_FUTURE)
+ metadata_accepted = self.client_events.get()
self.assertIsNotNone(metadata_accepted)
self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
self.assertEqual(metadata_tag, metadata_accepted.tag)
@@ -179,14 +192,14 @@ class EchoTest(unittest.TestCase):
for datum in test_data:
client_call.write(datum, write_tag)
- write_accepted = self.client_completion_queue.get(_FUTURE)
+ write_accepted = self.client_events.get()
self.assertIsNotNone(write_accepted)
self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
self.assertIs(write_accepted.tag, write_tag)
self.assertIs(write_accepted.write_accepted, True)
server_call.read(read_tag)
- read_accepted = self.server_completion_queue.get(_FUTURE)
+ read_accepted = self.server_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
@@ -194,14 +207,14 @@ class EchoTest(unittest.TestCase):
server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag)
- write_accepted = self.server_completion_queue.get(_FUTURE)
+ write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
self.assertEqual(write_tag, write_accepted.tag)
self.assertTrue(write_accepted.write_accepted)
client_call.read(read_tag)
- read_accepted = self.client_completion_queue.get(_FUTURE)
+ read_accepted = self.client_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
@@ -209,14 +222,14 @@ class EchoTest(unittest.TestCase):
client_data.append(read_accepted.bytes)
client_call.complete(complete_tag)
- complete_accepted = self.client_completion_queue.get(_FUTURE)
+ complete_accepted = self.client_events.get()
self.assertIsNotNone(complete_accepted)
self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
self.assertIs(complete_accepted.tag, complete_tag)
self.assertIs(complete_accepted.complete_accepted, True)
server_call.read(read_tag)
- read_accepted = self.server_completion_queue.get(_FUTURE)
+ read_accepted = self.server_events.get()
self.assertIsNotNone(read_accepted)
self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
self.assertEqual(read_tag, read_accepted.tag)
@@ -228,8 +241,8 @@ class EchoTest(unittest.TestCase):
server_trailing_binary_metadata_value)
server_call.status(_low.Status(_low.Code.OK, details), status_tag)
- server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
- server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
+ server_terminal_event_one = self.server_events.get()
+ server_terminal_event_two = self.server_events.get()
if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
status_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two
@@ -246,8 +259,8 @@ class EchoTest(unittest.TestCase):
self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
client_call.read(read_tag)
- client_terminal_event_one = self.client_completion_queue.get(_FUTURE)
- client_terminal_event_two = self.client_completion_queue.get(_FUTURE)
+ client_terminal_event_one = self.client_events.get()
+ client_terminal_event_two = self.client_events.get()
if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = client_terminal_event_one
finish_accepted = client_terminal_event_two
@@ -303,22 +316,26 @@ class CancellationTest(unittest.TestCase):
self.server = _low.Server(self.server_completion_queue)
port = self.server.add_http2_addr('[::]:0')
self.server.start()
+ self.server_events = Queue.Queue()
+ self.server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.server_completion_queue, self.server_events))
+ self.server_completion_queue_thread.start()
self.client_completion_queue = _low.CompletionQueue()
self.channel = _low.Channel('%s:%d' % (self.host, port), None)
+ self.client_events = Queue.Queue()
+ self.client_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue,
+ args=(self.client_completion_queue, self.client_events))
+ self.client_completion_queue_thread.start()
def tearDown(self):
self.server.stop()
self.server_completion_queue.stop()
self.client_completion_queue.stop()
- while True:
- event = self.server_completion_queue.get(0)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
- while True:
- event = self.client_completion_queue.get(0)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
+ self.server_completion_queue_thread.join()
+ self.client_completion_queue_thread.join()
del self.server
def testCancellation(self):
@@ -340,29 +357,29 @@ class CancellationTest(unittest.TestCase):
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
self.server.service(service_tag)
- service_accepted = self.server_completion_queue.get(_FUTURE)
+ service_accepted = self.server_events.get()
server_call = service_accepted.service_acceptance.call
server_call.accept(self.server_completion_queue, finish_tag)
server_call.premetadata()
- metadata_accepted = self.client_completion_queue.get(_FUTURE)
+ metadata_accepted = self.client_events.get()
self.assertIsNotNone(metadata_accepted)
for datum in test_data:
client_call.write(datum, write_tag)
- write_accepted = self.client_completion_queue.get(_FUTURE)
+ write_accepted = self.client_events.get()
server_call.read(read_tag)
- read_accepted = self.server_completion_queue.get(_FUTURE)
+ read_accepted = self.server_events.get()
server_data.append(read_accepted.bytes)
server_call.write(read_accepted.bytes, write_tag)
- write_accepted = self.server_completion_queue.get(_FUTURE)
+ write_accepted = self.server_events.get()
self.assertIsNotNone(write_accepted)
client_call.read(read_tag)
- read_accepted = self.client_completion_queue.get(_FUTURE)
+ read_accepted = self.client_events.get()
client_data.append(read_accepted.bytes)
client_call.cancel()
@@ -373,8 +390,8 @@ class CancellationTest(unittest.TestCase):
server_call.read(read_tag)
- server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
- server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
+ server_terminal_event_one = self.server_events.get()
+ server_terminal_event_two = self.server_events.get()
if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
read_accepted = server_terminal_event_one
rpc_accepted = server_terminal_event_two
@@ -388,7 +405,7 @@ class CancellationTest(unittest.TestCase):
self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
- finish_event = self.client_completion_queue.get(_FUTURE)
+ finish_event = self.client_events.get()
self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'),
finish_event.status)
diff --git a/src/ruby/.rspec b/src/ruby/.rspec
index dd579f7a13..cd7c5fb5b2 100755
--- a/src/ruby/.rspec
+++ b/src/ruby/.rspec
@@ -1,2 +1,4 @@
-I.
--require spec_helper
+--format documentation
+--color
diff --git a/src/ruby/spec/spec_helper.rb b/src/ruby/spec/spec_helper.rb
index 101165c146..270d2e97d3 100644
--- a/src/ruby/spec/spec_helper.rb
+++ b/src/ruby/spec/spec_helper.rb
@@ -53,3 +53,5 @@ RSpec.configure do |config|
include RSpec::LoggingHelper
config.capture_log_messages
end
+
+RSpec::Expectations.configuration.warn_about_potential_false_positives = false