aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-08-18 15:04:25 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2016-08-18 15:04:25 -0700
commit80763dd7c48f9fe9f0499f9a3d6c340b81d1acb7 (patch)
tree0696c8d9b1a5bbf20dc34070eb625bc97f915be9 /src
parent5b0e9462f0d40b171d50c03b29016c36588a3520 (diff)
parentf4112fc06e126ae4adc4dce99d71d021ff36bf9a (diff)
Merge branch 'master' of github.com:grpc/grpc into lb_add_md
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c19
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c2
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c13
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c6
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c2
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c12
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c7
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.h4
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c1303
-rw-r--r--src/core/lib/channel/handshaker.c24
-rw-r--r--src/core/lib/channel/handshaker.h9
-rw-r--r--src/core/lib/http/httpcli_security_connector.c8
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c1
-rw-r--r--src/core/lib/security/transport/handshake.c9
-rw-r--r--src/core/lib/security/transport/handshake.h7
-rw-r--r--src/core/lib/security/transport/security_connector.c59
-rw-r--r--src/core/lib/security/transport/security_connector.h13
-rw-r--r--src/core/lib/support/log_linux.c1
-rw-r--r--src/cpp/util/byte_buffer.cc25
-rw-r--r--src/csharp/Grpc.Auth/Grpc.Auth.csproj31
-rw-r--r--src/csharp/Grpc.Auth/Grpc.Auth.nuspec2
-rw-r--r--src/csharp/Grpc.Auth/packages.config4
-rw-r--r--src/csharp/Grpc.Auth/project.json4
-rw-r--r--src/csharp/Grpc.Core.Tests/SanityTest.cs6
-rw-r--r--src/csharp/Grpc.Examples.MathClient/project.json5
-rw-r--r--src/csharp/Grpc.Examples.MathServer/project.json5
-rw-r--r--src/csharp/Grpc.Examples/project.json3
-rw-r--r--src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj31
-rw-r--r--src/csharp/Grpc.IntegrationTesting.Client/packages.config4
-rw-r--r--src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj1
-rw-r--r--src/csharp/Grpc.IntegrationTesting.QpsWorker/app.config15
-rw-r--r--src/csharp/Grpc.IntegrationTesting.QpsWorker/project.json5
-rw-r--r--src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj31
-rw-r--r--src/csharp/Grpc.IntegrationTesting.Server/packages.config4
-rw-r--r--src/csharp/Grpc.IntegrationTesting.StressClient/project.json5
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj24
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs63
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropServer.cs32
-rw-r--r--src/csharp/Grpc.IntegrationTesting/QpsWorker.cs30
-rw-r--r--src/csharp/Grpc.IntegrationTesting/StressTestClient.cs51
-rw-r--r--src/csharp/Grpc.IntegrationTesting/packages.config6
-rw-r--r--src/csharp/Grpc.IntegrationTesting/project.json2
-rw-r--r--src/csharp/build_packages.bat8
-rw-r--r--src/node/health_check/package.json4
-rwxr-xr-xsrc/node/tools/bin/protoc.js6
-rw-r--r--src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m394
-rw-r--r--src/php/ext/grpc/php7_wrapper.h3
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi14
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi12
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx4
-rw-r--r--src/python/grpcio_tests/tests/interop/_insecure_interop_test.py14
-rw-r--r--src/python/grpcio_tests/tests/interop/_secure_interop_test.py24
-rw-r--r--src/python/grpcio_tests/tests/interop/client.py56
-rw-r--r--src/python/grpcio_tests/tests/interop/methods.py270
-rw-r--r--src/python/grpcio_tests/tests/interop/server.py12
-rw-r--r--src/python/grpcio_tests/tests/stress/client.py21
-rw-r--r--src/python/grpcio_tests/tests/stress/metrics_server.py2
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb97
62 files changed, 1381 insertions, 1456 deletions
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 6f6855584a..cbaa75a90a 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -88,14 +88,21 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args, void *user_data,
+ grpc_channel_args *args,
+ gpr_slice_buffer *read_buffer, void *user_data,
grpc_error *error) {
connector *c = user_data;
- c->result->transport =
- grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1);
- GPR_ASSERT(c->result->transport);
- grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0);
- c->result->channel_args = args;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_channel_args_destroy(args);
+ gpr_free(read_buffer);
+ } else {
+ c->result->transport =
+ grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1);
+ GPR_ASSERT(c->result->transport);
+ grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
+ read_buffer);
+ c->result->channel_args = args;
+ }
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
index ca435c25ce..b2c5e5b088 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
@@ -75,7 +75,7 @@ grpc_channel *grpc_insecure_channel_create_from_fd(
grpc_channel *channel = grpc_channel_create(
&exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_channel_args_destroy(final_args);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 4e33b6fa61..9e2bdd758f 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -114,8 +114,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&c->mu);
c->result->transport = grpc_create_chttp2_transport(
exec_ctx, c->args.channel_args, secure_endpoint, 1);
- grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
- 0);
+ grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL);
auth_context_arg = grpc_auth_context_to_arg(auth_context);
c->result->channel_args =
grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1);
@@ -126,10 +125,13 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args, void *user_data,
+ grpc_channel_args *args,
+ gpr_slice_buffer *read_buffer, void *user_data,
grpc_error *error) {
connector *c = user_data;
+ c->tmp_args = args;
if (error != GRPC_ERROR_NONE) {
+ gpr_free(read_buffer);
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
@@ -137,10 +139,9 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
// TODO(roth, jboeuf): Convert security connector handshaking to use new
// handshake API, and then move the code from on_secure_handshake_done()
// into this function.
- c->tmp_args = args;
grpc_channel_security_connector_do_handshake(
- exec_ctx, c->security_connector, endpoint, c->args.deadline,
- on_secure_handshake_done, c);
+ exec_ctx, c->security_connector, endpoint, read_buffer,
+ c->args.deadline, on_secure_handshake_done, c);
}
}
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index 9cd374777e..f0e07429fa 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -55,7 +55,8 @@ typedef struct server_connect_state {
} server_connect_state;
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args, void *user_data,
+ grpc_channel_args *args,
+ gpr_slice_buffer *read_buffer, void *user_data,
grpc_error *error) {
server_connect_state *state = user_data;
if (error != GRPC_ERROR_NONE) {
@@ -64,6 +65,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_error_free_string(error_str);
GRPC_ERROR_UNREF(error);
grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr);
+ gpr_free(read_buffer);
} else {
// Beware that the call to grpc_create_chttp2_transport() has to happen
// before grpc_tcp_server_destroy(). This is fine here, but similar code
@@ -75,7 +77,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_server_setup_transport(exec_ctx, state->server, transport,
state->accepting_pollset,
grpc_server_get_channel_args(state->server));
- grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(exec_ctx, transport, read_buffer);
}
// Clean up.
grpc_channel_args_destroy(args);
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
index 96bf4d6f30..4350543c27 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
@@ -67,7 +67,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
&exec_ctx, server_args, server_endpoint, 0 /* is_client */);
grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, grpc_cq_pollset(cq));
grpc_server_setup_transport(&exec_ctx, server, transport, NULL, server_args);
- grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index ccea15a648..da3e284fcf 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -111,7 +111,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
grpc_server_setup_transport(exec_ctx, state->state->server, transport,
state->accepting_pollset, args_copy);
grpc_channel_args_destroy(args_copy);
- grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL);
} else {
/* We need to consume this here, because the server may already have
* gone away. */
@@ -128,7 +128,8 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
- grpc_channel_args *args, void *user_data,
+ grpc_channel_args *args,
+ gpr_slice_buffer *read_buffer, void *user_data,
grpc_error *error) {
server_secure_connect *state = user_data;
if (error != GRPC_ERROR_NONE) {
@@ -136,9 +137,10 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
grpc_error_free_string(error_str);
GRPC_ERROR_UNREF(error);
+ grpc_channel_args_destroy(args);
+ gpr_free(read_buffer);
grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr);
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
- grpc_channel_args_destroy(args);
state_unref(state->state);
gpr_free(state);
return;
@@ -150,8 +152,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
// into this function.
state->args = args;
grpc_server_security_connector_do_handshake(
- exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline,
- on_secure_handshake_done, state);
+ exec_ctx, state->state->sc, state->acceptor, endpoint, read_buffer,
+ state->deadline, on_secure_handshake_done, state);
}
static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 28768f57f0..0e8dfb7d96 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -2546,9 +2546,12 @@ grpc_transport *grpc_create_chttp2_transport(
void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- gpr_slice *slices, size_t nslices) {
+ gpr_slice_buffer *read_buffer) {
grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
REF_TRANSPORT(t, "reading_action"); /* matches unref inside reading_action */
- gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
+ if (read_buffer != NULL) {
+ gpr_slice_buffer_move_into(read_buffer, &t->read_buffer);
+ gpr_free(read_buffer);
+ }
reading_action(exec_ctx, t, GRPC_ERROR_NONE);
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
index 5da4276f82..4e2d0954bf 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h
@@ -44,8 +44,10 @@ grpc_transport *grpc_create_chttp2_transport(
grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
grpc_endpoint *ep, int is_client);
+/// Takes ownership of \a read_buffer, which (if non-NULL) contains
+/// leftover bytes previously read from the endpoint (e.g., by handshakers).
void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
grpc_transport *transport,
- gpr_slice *slices, size_t nslices);
+ gpr_slice_buffer *read_buffer);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 25d8aca250..029c15014e 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -46,617 +46,964 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/transport_impl.h"
#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
#define GRPC_HEADER_SIZE_IN_BYTES 5
-// Global flag that gets set with GRPC_TRACE env variable
-int grpc_cronet_trace = 1;
+#define CRONET_LOG(...) \
+ do { \
+ if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \
+ } while (0)
-// Cronet transport object
+/* TODO (makdharma): Hook up into the wider tracing mechanism */
+int grpc_cronet_trace = 0;
+
+enum e_op_result {
+ ACTION_TAKEN_WITH_CALLBACK,
+ ACTION_TAKEN_NO_CALLBACK,
+ NO_ACTION_POSSIBLE
+};
+
+enum e_op_id {
+ OP_SEND_INITIAL_METADATA = 0,
+ OP_SEND_MESSAGE,
+ OP_SEND_TRAILING_METADATA,
+ OP_RECV_MESSAGE,
+ OP_RECV_INITIAL_METADATA,
+ OP_RECV_TRAILING_METADATA,
+ OP_CANCEL_ERROR,
+ OP_ON_COMPLETE,
+ OP_FAILED,
+ OP_SUCCEEDED,
+ OP_CANCELED,
+ OP_RECV_MESSAGE_AND_ON_COMPLETE,
+ OP_READ_REQ_MADE,
+ OP_NUM_OPS
+};
+
+/* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */
+
+static void on_request_headers_sent(cronet_bidirectional_stream *);
+static void on_response_headers_received(
+ cronet_bidirectional_stream *,
+ const cronet_bidirectional_stream_header_array *, const char *);
+static void on_write_completed(cronet_bidirectional_stream *, const char *);
+static void on_read_completed(cronet_bidirectional_stream *, char *, int);
+static void on_response_trailers_received(
+ cronet_bidirectional_stream *,
+ const cronet_bidirectional_stream_header_array *);
+static void on_succeeded(cronet_bidirectional_stream *);
+static void on_failed(cronet_bidirectional_stream *, int);
+static void on_canceled(cronet_bidirectional_stream *);
+static cronet_bidirectional_stream_callback cronet_callbacks = {
+ on_request_headers_sent,
+ on_response_headers_received,
+ on_read_completed,
+ on_write_completed,
+ on_response_trailers_received,
+ on_succeeded,
+ on_failed,
+ on_canceled};
+
+/* Cronet transport object */
struct grpc_cronet_transport {
grpc_transport base; /* must be first element in this structure */
cronet_engine *engine;
char *host;
};
-
typedef struct grpc_cronet_transport grpc_cronet_transport;
-enum send_state {
- CRONET_SEND_IDLE = 0,
- CRONET_REQ_STARTED,
- CRONET_SEND_HEADER,
- CRONET_WRITE,
- CRONET_WRITE_COMPLETED,
+/* TODO (makdharma): reorder structure for memory efficiency per
+ http://www.catb.org/esr/structure-packing/#_structure_reordering: */
+struct read_state {
+ /* vars to store data coming from server */
+ char *read_buffer;
+ bool length_field_received;
+ int received_bytes;
+ int remaining_bytes;
+ int length_field;
+ char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES];
+ char *payload_field;
+ bool read_stream_closed;
+
+ /* vars for holding data destined for the application */
+ struct grpc_slice_buffer_stream sbs;
+ gpr_slice_buffer read_slice_buffer;
+
+ /* vars for trailing metadata */
+ grpc_chttp2_incoming_metadata_buffer trailing_metadata;
+ bool trailing_metadata_valid;
+
+ /* vars for initial metadata */
+ grpc_chttp2_incoming_metadata_buffer initial_metadata;
};
-enum recv_state {
- CRONET_RECV_IDLE = 0,
- CRONET_RECV_READ_LENGTH,
- CRONET_RECV_READ_DATA,
- CRONET_RECV_CLOSED,
+struct write_state {
+ char *write_buffer;
};
-static const char *recv_state_name[] = {
- "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,",
- "CRONET_RECV_CLOSED"};
+/* track state of one stream op */
+struct op_state {
+ bool state_op_done[OP_NUM_OPS];
+ bool state_callback_received[OP_NUM_OPS];
+ /* data structure for storing data coming from server */
+ struct read_state rs;
+ /* data structure for storing data going to the server */
+ struct write_state ws;
+};
-// Enum that identifies calling function.
-enum e_caller {
- PERFORM_STREAM_OP,
- ON_READ_COMPLETE,
- ON_RESPONSE_HEADERS_RECEIVED,
- ON_RESPONSE_TRAILERS_RECEIVED
+struct op_and_state {
+ grpc_transport_stream_op op;
+ struct op_state state;
+ bool done;
+ struct stream_obj *s; /* Pointer back to the stream object */
+ struct op_and_state *next; /* next op_and_state in the linked list */
};
-enum callback_id {
- CB_SEND_INITIAL_METADATA = 0,
- CB_SEND_MESSAGE,
- CB_SEND_TRAILING_METADATA,
- CB_RECV_MESSAGE,
- CB_RECV_INITIAL_METADATA,
- CB_RECV_TRAILING_METADATA,
- CB_NUM_CALLBACKS
+struct op_storage {
+ int num_pending_ops;
+ struct op_and_state *head;
};
struct stream_obj {
- // we store received bytes here as they trickle in.
- gpr_slice_buffer write_slice_buffer;
+ struct op_and_state *oas;
+ grpc_transport_stream_op *curr_op;
+ grpc_cronet_transport curr_ct;
+ grpc_stream *curr_gs;
cronet_bidirectional_stream *cbs;
- gpr_slice slice;
- gpr_slice_buffer read_slice_buffer;
- struct grpc_slice_buffer_stream sbs;
- char *read_buffer;
- int remaining_read_bytes;
- int total_read_bytes;
-
- char *write_buffer;
- size_t write_buffer_size;
-
- // Hold the URL
- char *url;
-
- bool response_headers_received;
- bool read_requested;
- bool response_trailers_received;
- bool read_closed;
-
- // Recv message stuff
- grpc_byte_buffer **recv_message;
- // Initial metadata stuff
- grpc_metadata_batch *recv_initial_metadata;
- // Trailing metadata stuff
- grpc_metadata_batch *recv_trailing_metadata;
- grpc_chttp2_incoming_metadata_buffer imb;
-
- // This mutex protects receive state machine execution
- gpr_mu recv_mu;
- // we can queue up up to 2 callbacks for each OP
- grpc_closure *callback_list[CB_NUM_CALLBACKS][2];
-
- // storage for header
- cronet_bidirectional_stream_header *headers;
- uint32_t num_headers;
cronet_bidirectional_stream_header_array header_array;
- // state tracking
- enum recv_state cronet_recv_state;
- enum send_state cronet_send_state;
-};
-typedef struct stream_obj stream_obj;
+ /* Stream level state. Some state will be tracked both at stream and stream_op
+ * level */
+ struct op_state state;
-static void next_send_step(stream_obj *s);
-static void next_recv_step(stream_obj *s, enum e_caller caller);
+ /* OP storage */
+ struct op_storage storage;
-static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_pollset *pollset) {}
+ /* Mutex to protect storage */
+ gpr_mu mu;
+};
+typedef struct stream_obj stream_obj;
-static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
- grpc_transport *gt, grpc_stream *gs,
- grpc_pollset_set *pollset_set) {}
+static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
+ struct op_and_state *oas);
-static void enqueue_callbacks(grpc_closure *callback_list[]) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- if (callback_list[0]) {
- grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL);
- callback_list[0] = NULL;
- }
- if (callback_list[1]) {
- grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL);
- callback_list[1] = NULL;
+/*
+ Utility function to translate enum into string for printing
+*/
+static const char *op_result_string(enum e_op_result i) {
+ switch (i) {
+ case ACTION_TAKEN_WITH_CALLBACK:
+ return "ACTION_TAKEN_WITH_CALLBACK";
+ case ACTION_TAKEN_NO_CALLBACK:
+ return "ACTION_TAKEN_NO_CALLBACK";
+ case NO_ACTION_POSSIBLE:
+ return "NO_ACTION_POSSIBLE";
}
- grpc_exec_ctx_finish(&exec_ctx);
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
}
-static void on_canceled(cronet_bidirectional_stream *stream) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "on_canceled %p", stream);
+static const char *op_id_string(enum e_op_id i) {
+ switch (i) {
+ case OP_SEND_INITIAL_METADATA:
+ return "OP_SEND_INITIAL_METADATA";
+ case OP_SEND_MESSAGE:
+ return "OP_SEND_MESSAGE";
+ case OP_SEND_TRAILING_METADATA:
+ return "OP_SEND_TRAILING_METADATA";
+ case OP_RECV_MESSAGE:
+ return "OP_RECV_MESSAGE";
+ case OP_RECV_INITIAL_METADATA:
+ return "OP_RECV_INITIAL_METADATA";
+ case OP_RECV_TRAILING_METADATA:
+ return "OP_RECV_TRAILING_METADATA";
+ case OP_CANCEL_ERROR:
+ return "OP_CANCEL_ERROR";
+ case OP_ON_COMPLETE:
+ return "OP_ON_COMPLETE";
+ case OP_FAILED:
+ return "OP_FAILED";
+ case OP_SUCCEEDED:
+ return "OP_SUCCEEDED";
+ case OP_CANCELED:
+ return "OP_CANCELED";
+ case OP_RECV_MESSAGE_AND_ON_COMPLETE:
+ return "OP_RECV_MESSAGE_AND_ON_COMPLETE";
+ case OP_READ_REQ_MADE:
+ return "OP_READ_REQ_MADE";
+ case OP_NUM_OPS:
+ return "OP_NUM_OPS";
}
+ return "UNKNOWN";
}
-static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error);
- }
+/*
+ Add a new stream op to op storage.
+*/
+static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
+ struct op_storage *storage = &s->storage;
+ /* add new op at the beginning of the linked list. The memory is freed
+ in remove_from_storage */
+ struct op_and_state *new_op = gpr_malloc(sizeof(struct op_and_state));
+ memcpy(&new_op->op, op, sizeof(grpc_transport_stream_op));
+ memset(&new_op->state, 0, sizeof(new_op->state));
+ new_op->s = s;
+ new_op->done = false;
+ gpr_mu_lock(&s->mu);
+ new_op->next = storage->head;
+ storage->head = new_op;
+ storage->num_pending_ops++;
+ CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op,
+ storage->num_pending_ops);
+ gpr_mu_unlock(&s->mu);
}
-static void on_succeeded(cronet_bidirectional_stream *stream) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "on_succeeded %p", stream);
+/*
+ Traverse the linked list and delete op and free memory
+*/
+static void remove_from_storage(struct stream_obj *s,
+ struct op_and_state *oas) {
+ struct op_and_state *curr;
+ if (s->storage.head == NULL || oas == NULL) {
+ return;
}
-}
-
-static void on_response_trailers_received(
- cronet_bidirectional_stream *stream,
- const cronet_bidirectional_stream_header_array *trailers) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: on_response_trailers_received");
+ if (s->storage.head == oas) {
+ s->storage.head = oas->next;
+ gpr_free(oas);
+ s->storage.num_pending_ops--;
+ CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
+ s->storage.num_pending_ops);
+ } else {
+ for (curr = s->storage.head; curr != NULL; curr = curr->next) {
+ if (curr->next == oas) {
+ curr->next = oas->next;
+ s->storage.num_pending_ops--;
+ CRONET_LOG(GPR_DEBUG, "Freed %p. Now %d in the queue", oas,
+ s->storage.num_pending_ops);
+ gpr_free(oas);
+ break;
+ } else if (curr->next == NULL) {
+ CRONET_LOG(GPR_ERROR, "Reached end of LL and did not find op to free");
+ }
+ }
}
- stream_obj *s = (stream_obj *)stream->annotation;
+}
- memset(&s->imb, 0, sizeof(s->imb));
- grpc_chttp2_incoming_metadata_buffer_init(&s->imb);
- unsigned int i = 0;
- for (i = 0; i < trailers->count; i++) {
- grpc_chttp2_incoming_metadata_buffer_add(
- &s->imb, grpc_mdelem_from_metadata_strings(
- grpc_mdstr_from_string(trailers->headers[i].key),
- grpc_mdstr_from_string(trailers->headers[i].value)));
+/*
+ Cycle through ops and try to take next action. Break when either
+ an action with callback is taken, or no action is possible.
+ This can be executed from the Cronet network thread via cronet callback
+ or on the application supplied thread via the perform_stream_op function.
+*/
+static void execute_from_storage(stream_obj *s) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_mu_lock(&s->mu);
+ for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
+ CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
+ GPR_ASSERT(curr->done == 0);
+ enum e_op_result result = execute_stream_op(&exec_ctx, curr);
+ CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
+ op_result_string(result));
+ /* if this op is done, then remove it and free memory */
+ if (curr->done) {
+ struct op_and_state *next = curr->next;
+ remove_from_storage(s, curr);
+ curr = next;
+ }
+ /* continue processing the same op if ACTION_TAKEN_WITHOUT_CALLBACK */
+ if (result == NO_ACTION_POSSIBLE) {
+ curr = curr->next;
+ } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
+ break;
+ }
}
- s->response_trailers_received = true;
- next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED);
+ gpr_mu_unlock(&s->mu);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void on_write_completed(cronet_bidirectional_stream *stream,
- const char *data) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "W: on_write_completed");
- }
+/*
+ Cronet callback
+*/
+static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
+ CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
stream_obj *s = (stream_obj *)stream->annotation;
- enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]);
- s->cronet_send_state = CRONET_WRITE_COMPLETED;
- next_send_step(s);
+ cronet_bidirectional_stream_destroy(s->cbs);
+ s->state.state_callback_received[OP_FAILED] = true;
+ s->cbs = NULL;
+ if (s->header_array.headers) {
+ gpr_free(s->header_array.headers);
+ s->header_array.headers = NULL;
+ }
+ if (s->state.ws.write_buffer) {
+ gpr_free(s->state.ws.write_buffer);
+ s->state.ws.write_buffer = NULL;
+ }
+ execute_from_storage(s);
}
-static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
- gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
- uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
- if (s->total_read_bytes > 0) {
- // Only copy if there is non-zero number of bytes
- memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
- gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+/*
+ Cronet callback
+*/
+static void on_canceled(cronet_bidirectional_stream *stream) {
+ CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
+ stream_obj *s = (stream_obj *)stream->annotation;
+ cronet_bidirectional_stream_destroy(s->cbs);
+ s->state.state_callback_received[OP_CANCELED] = true;
+ s->cbs = NULL;
+ if (s->header_array.headers) {
+ gpr_free(s->header_array.headers);
+ s->header_array.headers = NULL;
}
- grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
- *s->recv_message = (grpc_byte_buffer *)&s->sbs;
+ if (s->state.ws.write_buffer) {
+ gpr_free(s->state.ws.write_buffer);
+ s->state.ws.write_buffer = NULL;
+ }
+ execute_from_storage(s);
}
-static int parse_grpc_header(const uint8_t *data) {
- const uint8_t *p = data + 1;
- int length = 0;
- length |= ((uint8_t)*p++) << 24;
- length |= ((uint8_t)*p++) << 16;
- length |= ((uint8_t)*p++) << 8;
- length |= ((uint8_t)*p++);
- return length;
+/*
+ Cronet callback
+*/
+static void on_succeeded(cronet_bidirectional_stream *stream) {
+ CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
+ stream_obj *s = (stream_obj *)stream->annotation;
+ cronet_bidirectional_stream_destroy(s->cbs);
+ s->state.state_callback_received[OP_SUCCEEDED] = true;
+ s->cbs = NULL;
+ execute_from_storage(s);
}
-static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
- int count) {
+/*
+ Cronet callback
+*/
+static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
+ CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d",
- count, s->total_read_bytes, s->remaining_read_bytes);
- }
- if (count > 0) {
- GPR_ASSERT(s->recv_message);
- s->remaining_read_bytes -= count;
- next_recv_step(s, ON_READ_COMPLETE);
- } else {
- s->read_closed = true;
- next_recv_step(s, ON_READ_COMPLETE);
+ s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
+ s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
+ /* Free the memory allocated for headers */
+ if (s->header_array.headers) {
+ gpr_free(s->header_array.headers);
+ s->header_array.headers = NULL;
}
+ execute_from_storage(s);
}
+/*
+ Cronet callback
+*/
static void on_response_headers_received(
cronet_bidirectional_stream *stream,
const cronet_bidirectional_stream_header_array *headers,
const char *negotiated_protocol) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: on_response_headers_received");
- }
+ CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
+ headers, negotiated_protocol);
stream_obj *s = (stream_obj *)stream->annotation;
- enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]);
- s->response_headers_received = true;
- next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED);
-}
-
-static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "W: on_request_headers_sent");
+ memset(&s->state.rs.initial_metadata, 0,
+ sizeof(s->state.rs.initial_metadata));
+ grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
+ for (size_t i = 0; i < headers->count; i++) {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->state.rs.initial_metadata,
+ grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(headers->headers[i].key),
+ grpc_mdstr_from_string(headers->headers[i].value)));
}
- stream_obj *s = (stream_obj *)stream->annotation;
- enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]);
- s->cronet_send_state = CRONET_SEND_HEADER;
- next_send_step(s);
+ s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true;
+ execute_from_storage(s);
}
-// Callback function pointers (invoked by cronet in response to events)
-static cronet_bidirectional_stream_callback callbacks = {
- on_request_headers_sent,
- on_response_headers_received,
- on_read_completed,
- on_write_completed,
- on_response_trailers_received,
- on_succeeded,
- on_failed,
- on_canceled};
-
-static void invoke_closing_callback(stream_obj *s) {
- grpc_chttp2_incoming_metadata_buffer_publish(&s->imb,
- s->recv_trailing_metadata);
- if (s->callback_list[CB_RECV_TRAILING_METADATA]) {
- enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]);
+/*
+ Cronet callback
+*/
+static void on_write_completed(cronet_bidirectional_stream *stream,
+ const char *data) {
+ stream_obj *s = (stream_obj *)stream->annotation;
+ CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
+ if (s->state.ws.write_buffer) {
+ gpr_free(s->state.ws.write_buffer);
+ s->state.ws.write_buffer = NULL;
}
+ s->state.state_callback_received[OP_SEND_MESSAGE] = true;
+ execute_from_storage(s);
}
-static void set_recv_state(stream_obj *s, enum recv_state state) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]);
+/*
+ Cronet callback
+*/
+static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
+ int count) {
+ stream_obj *s = (stream_obj *)stream->annotation;
+ CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
+ count);
+ s->state.state_callback_received[OP_RECV_MESSAGE] = true;
+ if (count > 0) {
+ s->state.rs.received_bytes += count;
+ s->state.rs.remaining_bytes -= count;
+ if (s->state.rs.remaining_bytes > 0) {
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ s->state.state_op_done[OP_READ_REQ_MADE] = true;
+ cronet_bidirectional_stream_read(
+ s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
+ s->state.rs.remaining_bytes);
+ } else {
+ execute_from_storage(s);
+ }
+ } else {
+ s->state.rs.read_stream_closed = true;
+ execute_from_storage(s);
}
- s->cronet_recv_state = state;
}
-// This is invoked from perform_stream_op, and all on_xxxx callbacks.
-static void next_recv_step(stream_obj *s, enum e_caller caller) {
- gpr_mu_lock(&s->recv_mu);
- switch (s->cronet_recv_state) {
- case CRONET_RECV_IDLE:
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE");
- }
- if (caller == PERFORM_STREAM_OP ||
- caller == ON_RESPONSE_HEADERS_RECEIVED) {
- if (s->read_closed && s->response_trailers_received) {
- invoke_closing_callback(s);
- set_recv_state(s, CRONET_RECV_CLOSED);
- } else if (s->response_headers_received == true &&
- s->read_requested == true) {
- set_recv_state(s, CRONET_RECV_READ_LENGTH);
- s->total_read_bytes = s->remaining_read_bytes =
- GRPC_HEADER_SIZE_IN_BYTES;
- GPR_ASSERT(s->read_buffer);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
- }
- cronet_bidirectional_stream_read(s->cbs, s->read_buffer,
- s->remaining_read_bytes);
- }
- }
- break;
- case CRONET_RECV_READ_LENGTH:
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH");
- }
- if (caller == ON_READ_COMPLETE) {
- if (s->read_closed) {
- invoke_closing_callback(s);
- enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
- set_recv_state(s, CRONET_RECV_CLOSED);
- } else {
- GPR_ASSERT(s->remaining_read_bytes == 0);
- set_recv_state(s, CRONET_RECV_READ_DATA);
- s->total_read_bytes = s->remaining_read_bytes =
- parse_grpc_header((const uint8_t *)s->read_buffer);
- s->read_buffer =
- gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes);
- GPR_ASSERT(s->read_buffer);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
- }
- if (s->remaining_read_bytes > 0) {
- cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
- s->remaining_read_bytes);
- } else {
- // Calling the closing callback directly since this is a 0 byte read
- // for an empty message.
- process_recv_message(s, NULL);
- enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
- invoke_closing_callback(s);
- set_recv_state(s, CRONET_RECV_CLOSED);
- }
- }
- }
- break;
- case CRONET_RECV_READ_DATA:
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA");
- }
- if (caller == ON_READ_COMPLETE) {
- if (s->remaining_read_bytes > 0) {
- int offset = s->total_read_bytes - s->remaining_read_bytes;
- GPR_ASSERT(s->read_buffer);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
- }
- cronet_bidirectional_stream_read(
- s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes);
- } else {
- gpr_slice_buffer_init(&s->read_slice_buffer);
- uint8_t *p = (uint8_t *)s->read_buffer;
- process_recv_message(s, p);
- set_recv_state(s, CRONET_RECV_IDLE);
- enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
- }
- }
- break;
- case CRONET_RECV_CLOSED:
- break;
- default:
- GPR_ASSERT(0); // Should not reach here
- break;
+/*
+ Cronet callback
+*/
+static void on_response_trailers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *trailers) {
+ CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
+ trailers);
+ stream_obj *s = (stream_obj *)stream->annotation;
+ memset(&s->state.rs.trailing_metadata, 0,
+ sizeof(s->state.rs.trailing_metadata));
+ s->state.rs.trailing_metadata_valid = false;
+ grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata);
+ for (size_t i = 0; i < trailers->count; i++) {
+ CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key,
+ trailers->headers[i].value);
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->state.rs.trailing_metadata,
+ grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(trailers->headers[i].key),
+ grpc_mdstr_from_string(trailers->headers[i].value)));
+ s->state.rs.trailing_metadata_valid = true;
}
- gpr_mu_unlock(&s->recv_mu);
+ s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
+ execute_from_storage(s);
}
-// This function takes the data from s->write_slice_buffer and assembles into
-// a contiguous byte stream with 5 byte gRPC header prepended.
-static void create_grpc_frame(stream_obj *s) {
- gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer);
- uint8_t *raw_data = GPR_SLICE_START_PTR(slice);
+/*
+ Utility function that takes the data from s->write_slice_buffer and assembles
+ into a contiguous byte stream with 5 byte gRPC header prepended.
+*/
+static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
+ char **pp_write_buffer,
+ size_t *p_write_buffer_size) {
+ gpr_slice slice = gpr_slice_buffer_take_first(write_slice_buffer);
size_t length = GPR_SLICE_LENGTH(slice);
- s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
- s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size);
- uint8_t *p = (uint8_t *)s->write_buffer;
- // Append 5 byte header
+ *p_write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
+ /* This is freed in the on_write_completed callback */
+ char *write_buffer = gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES);
+ *pp_write_buffer = write_buffer;
+ uint8_t *p = (uint8_t *)write_buffer;
+ /* Append 5 byte header */
*p++ = 0;
*p++ = (uint8_t)(length >> 24);
*p++ = (uint8_t)(length >> 16);
*p++ = (uint8_t)(length >> 8);
*p++ = (uint8_t)(length);
- // append actual data
- memcpy(p, raw_data, length);
+ /* append actual data */
+ memcpy(p, GPR_SLICE_START_PTR(slice), length);
}
-static void do_write(stream_obj *s) {
- gpr_slice_buffer *sb = &s->write_slice_buffer;
- GPR_ASSERT(sb->count <= 1);
- if (sb->count > 0) {
- create_grpc_frame(s);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
- }
- cronet_bidirectional_stream_write(s->cbs, s->write_buffer,
- (int)s->write_buffer_size, false);
- }
-}
-
-//
-static void next_send_step(stream_obj *s) {
- switch (s->cronet_send_state) {
- case CRONET_SEND_IDLE:
- GPR_ASSERT(
- s->cbs); // cronet_bidirectional_stream is not initialized yet.
- s->cronet_send_state = CRONET_REQ_STARTED;
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url);
- }
- cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST",
- &s->header_array, false);
- // we no longer need the memory that was allocated earlier.
- gpr_free(s->header_array.headers);
- break;
- case CRONET_SEND_HEADER:
- do_write(s);
- s->cronet_send_state = CRONET_WRITE;
- break;
- case CRONET_WRITE_COMPLETED:
- do_write(s);
- break;
- default:
- GPR_ASSERT(0);
- break;
- }
-}
-
-static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head,
- const char *host,
- stream_obj *s) {
+/*
+ Convert metadata in a format that Cronet can consume
+*/
+static void convert_metadata_to_cronet_headers(
+ grpc_linked_mdelem *head, const char *host, char **pp_url,
+ cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers) {
grpc_linked_mdelem *curr = head;
- // Walk the linked list and get number of header fields
- uint32_t num_headers_available = 0;
+ /* Walk the linked list and get number of header fields */
+ size_t num_headers_available = 0;
while (curr != NULL) {
curr = curr->next;
num_headers_available++;
}
- // Allocate enough memory
- s->headers = (cronet_bidirectional_stream_header *)gpr_malloc(
- sizeof(cronet_bidirectional_stream_header) * num_headers_available);
-
- // Walk the linked list again, this time copying the header fields.
- // s->num_headers
- // can be less than num_headers_available, as some headers are not used for
- // cronet
+ /* Allocate enough memory. It is freed in the on_request_headers_sent callback
+ */
+ cronet_bidirectional_stream_header *headers =
+ (cronet_bidirectional_stream_header *)gpr_malloc(
+ sizeof(cronet_bidirectional_stream_header) * num_headers_available);
+ *pp_headers = headers;
+
+ /* Walk the linked list again, this time copying the header fields.
+ s->num_headers can be less than num_headers_available, as some headers
+ are not used for cronet.
+ TODO (makdharma): Eliminate need to traverse the LL second time for perf.
+ */
curr = head;
- s->num_headers = 0;
- while (s->num_headers < num_headers_available) {
+ size_t num_headers = 0;
+ while (num_headers < num_headers_available) {
grpc_mdelem *mdelem = curr->md;
curr = curr->next;
const char *key = grpc_mdstr_as_c_string(mdelem->key);
const char *value = grpc_mdstr_as_c_string(mdelem->value);
- if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
- strcmp(key, ":authority") == 0) {
- // Cronet populates these fields on its own.
+ if (mdelem->key == GRPC_MDSTR_METHOD || mdelem->key == GRPC_MDSTR_SCHEME ||
+ mdelem->key == GRPC_MDSTR_AUTHORITY) {
+ /* Cronet populates these fields on its own */
continue;
}
- if (strcmp(key, ":path") == 0) {
- // Create URL by appending :path value to the hostname
- gpr_asprintf(&s->url, "https://%s%s", host, value);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "extracted URL = %s", s->url);
- }
+ if (mdelem->key == GRPC_MDSTR_PATH) {
+ /* Create URL by appending :path value to the hostname */
+ gpr_asprintf(pp_url, "https://%s%s", host, value);
continue;
}
- s->headers[s->num_headers].key = key;
- s->headers[s->num_headers].value = value;
- s->num_headers++;
+ CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
+ headers[num_headers].key = key;
+ headers[num_headers].value = value;
+ num_headers++;
if (curr == NULL) {
break;
}
}
+ *p_num_headers = (size_t)num_headers;
}
-static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, grpc_transport_stream_op *op) {
- grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
- GPR_ASSERT(ct->engine);
- stream_obj *s = (stream_obj *)gs;
- if (op->recv_trailing_metadata) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG,
- "perform_stream_op - recv_trailing_metadata: on_complete=%p",
- op->on_complete);
+static int parse_grpc_header(const uint8_t *data) {
+ const uint8_t *p = data + 1;
+ int length = 0;
+ length |= ((uint8_t)*p++) << 24;
+ length |= ((uint8_t)*p++) << 16;
+ length |= ((uint8_t)*p++) << 8;
+ length |= ((uint8_t)*p++);
+ return length;
+}
+
+/*
+ Op Execution: Decide if one of the actions contained in the stream op can be
+ executed. This is the heart of the state machine.
+*/
+static bool op_can_be_run(grpc_transport_stream_op *curr_op,
+ struct op_state *stream_state,
+ struct op_state *op_state, enum e_op_id op_id) {
+ bool result = true;
+ /* When call is canceled, every op can be run, except under following
+ conditions
+ */
+ bool is_canceled_of_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
+ stream_state->state_callback_received[OP_FAILED];
+ if (is_canceled_of_failed) {
+ if (op_id == OP_SEND_INITIAL_METADATA) result = false;
+ if (op_id == OP_SEND_MESSAGE) result = false;
+ if (op_id == OP_SEND_TRAILING_METADATA) result = false;
+ if (op_id == OP_CANCEL_ERROR) result = false;
+ /* already executed */
+ if (op_id == OP_RECV_INITIAL_METADATA &&
+ stream_state->state_op_done[OP_RECV_INITIAL_METADATA])
+ result = false;
+ if (op_id == OP_RECV_MESSAGE &&
+ stream_state->state_op_done[OP_RECV_MESSAGE])
+ result = false;
+ if (op_id == OP_RECV_TRAILING_METADATA &&
+ stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
+ result = false;
+ } else if (op_id == OP_SEND_INITIAL_METADATA) {
+ /* already executed */
+ if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
+ } else if (op_id == OP_RECV_INITIAL_METADATA) {
+ /* already executed */
+ if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
+ /* we haven't sent headers yet. */
+ else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ result = false;
+ /* we haven't received headers yet. */
+ else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA])
+ result = false;
+ } else if (op_id == OP_SEND_MESSAGE) {
+ /* already executed (note we're checking op specific state, not stream
+ state) */
+ if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
+ /* we haven't sent headers yet. */
+ else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ result = false;
+ } else if (op_id == OP_RECV_MESSAGE) {
+ /* already executed */
+ if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
+ /* we haven't received headers yet. */
+ else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA])
+ result = false;
+ } else if (op_id == OP_RECV_TRAILING_METADATA) {
+ /* already executed */
+ if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
+ /* we have asked for but haven't received message yet. */
+ else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
+ !stream_state->state_op_done[OP_RECV_MESSAGE])
+ result = false;
+ /* we haven't received trailers yet. */
+ else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
+ result = false;
+ /* we haven't received on_succeeded yet. */
+ else if (!stream_state->state_callback_received[OP_SUCCEEDED])
+ result = false;
+ } else if (op_id == OP_SEND_TRAILING_METADATA) {
+ /* already executed */
+ if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
+ /* we haven't sent initial metadata yet */
+ else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ result = false;
+ /* we haven't sent message yet */
+ else if (curr_op->send_message &&
+ !stream_state->state_op_done[OP_SEND_MESSAGE])
+ result = false;
+ /* we haven't got on_write_completed for the send yet */
+ else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
+ !stream_state->state_callback_received[OP_SEND_MESSAGE])
+ result = false;
+ } else if (op_id == OP_CANCEL_ERROR) {
+ /* already executed */
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
+ } else if (op_id == OP_ON_COMPLETE) {
+ /* already executed (note we're checking op specific state, not stream
+ state) */
+ if (op_state->state_op_done[OP_ON_COMPLETE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
}
- s->recv_trailing_metadata = op->recv_trailing_metadata;
- GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]);
- s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete;
- }
- if (op->recv_message) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p",
- op->on_complete);
+ /* Check if every op that was asked for is done. */
+ else if (curr_op->send_initial_metadata &&
+ !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->send_message &&
+ !op_state->state_op_done[OP_SEND_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->send_message &&
+ !stream_state->state_callback_received[OP_SEND_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->send_trailing_metadata &&
+ !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->recv_initial_metadata &&
+ !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->recv_message &&
+ !stream_state->state_op_done[OP_RECV_MESSAGE]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ } else if (curr_op->recv_trailing_metadata) {
+ /* We aren't done with trailing metadata yet */
+ if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
+ /* We've asked for actual message in an earlier op, and it hasn't been
+ delivered yet. */
+ else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
+ /* If this op is not the one asking for read, (which means some earlier
+ op has asked), and the read hasn't been delivered. */
+ if (!curr_op->recv_message &&
+ !stream_state->state_callback_received[OP_SUCCEEDED]) {
+ CRONET_LOG(GPR_DEBUG, "Because");
+ result = false;
+ }
+ }
}
- s->recv_message = (grpc_byte_buffer **)op->recv_message;
- GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]);
- GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]);
- s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready;
- s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete;
- s->read_requested = true;
- next_recv_step(s, PERFORM_STREAM_OP);
+ /* We should see at least one on_write_completed for the trailers that we
+ sent */
+ else if (curr_op->send_trailing_metadata &&
+ !stream_state->state_callback_received[OP_SEND_MESSAGE])
+ result = false;
}
- if (op->recv_initial_metadata) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p",
- op->on_complete);
+ CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
+ result ? "YES" : "NO");
+ return result;
+}
+
+/*
+ TODO (makdharma): Break down this function in smaller chunks for readability.
+*/
+static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
+ struct op_and_state *oas) {
+ grpc_transport_stream_op *stream_op = &oas->op;
+ struct stream_obj *s = oas->s;
+ struct op_state *stream_state = &s->state;
+ enum e_op_result result = NO_ACTION_POSSIBLE;
+ if (stream_op->send_initial_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_SEND_INITIAL_METADATA)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
+ /* This OP is the beginning. Reset various states */
+ memset(&s->header_array, 0, sizeof(s->header_array));
+ memset(&stream_state->rs, 0, sizeof(stream_state->rs));
+ memset(&stream_state->ws, 0, sizeof(stream_state->ws));
+ memset(stream_state->state_op_done, 0, sizeof(stream_state->state_op_done));
+ memset(stream_state->state_callback_received, 0,
+ sizeof(stream_state->state_callback_received));
+ /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
+ * on_failed */
+ GPR_ASSERT(s->cbs == NULL);
+ s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
+ &cronet_callbacks);
+ CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs);
+ char *url;
+ s->header_array.headers = NULL;
+ convert_metadata_to_cronet_headers(
+ stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url,
+ &s->header_array.headers, &s->header_array.count);
+ s->header_array.capacity = s->header_array.count;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs,
+ url);
+ cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array,
+ false);
+ stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else if (stream_op->recv_initial_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_RECV_INITIAL_METADATA)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
+ if (!stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_NONE, NULL);
+ } else {
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED, NULL);
}
- s->recv_initial_metadata = op->recv_initial_metadata;
- GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]);
- GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]);
- s->callback_list[CB_RECV_INITIAL_METADATA][0] =
- op->recv_initial_metadata_ready;
- s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete;
- }
- if (op->send_initial_metadata) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG,
- "perform_stream_op - send_initial_metadata: on_complete=%p",
- op->on_complete);
+ stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
+ } else if (stream_op->send_message &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_SEND_MESSAGE)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
+ gpr_slice_buffer write_slice_buffer;
+ gpr_slice slice;
+ gpr_slice_buffer_init(&write_slice_buffer);
+ grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
+ stream_op->send_message->length, NULL);
+ /* Check that compression flag is OFF. We don't support compression yet. */
+ if (stream_op->send_message->flags != 0) {
+ gpr_log(GPR_ERROR, "Compression is not supported");
+ GPR_ASSERT(stream_op->send_message->flags == 0);
}
- s->num_headers = 0;
- convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head,
- ct->host, s);
- s->header_array.count = s->num_headers;
- s->header_array.capacity = s->num_headers;
- s->header_array.headers = s->headers;
- GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]);
- s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete;
- }
- if (op->send_message) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p",
- op->on_complete);
+ gpr_slice_buffer_add(&write_slice_buffer, slice);
+ if (write_slice_buffer.count != 1) {
+ /* Empty request not handled yet */
+ gpr_log(GPR_ERROR, "Empty request is not supported");
+ GPR_ASSERT(write_slice_buffer.count == 1);
+ }
+ if (write_slice_buffer.count > 0) {
+ size_t write_buffer_size;
+ create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
+ &write_buffer_size);
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, %p)",
+ s->cbs, stream_state->ws.write_buffer);
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
+ (int)write_buffer_size, false);
+ result = ACTION_TAKEN_WITH_CALLBACK;
}
- grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice,
- op->send_message->length, NULL);
- // Check that compression flag is not ON. We don't support compression yet.
- // TODO (makdharma): add compression support
- GPR_ASSERT(op->send_message->flags == 0);
- gpr_slice_buffer_add(&s->write_slice_buffer, s->slice);
- if (s->cbs == NULL) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create");
+ stream_state->state_op_done[OP_SEND_MESSAGE] = true;
+ oas->state.state_op_done[OP_SEND_MESSAGE] = true;
+ } else if (stream_op->recv_message &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_RECV_MESSAGE)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
+ if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_CANCELLED, NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ } else if (stream_state->rs.read_stream_closed == true) {
+ /* No more data will be received */
+ CRONET_LOG(GPR_DEBUG, "read stream closed");
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+ } else if (stream_state->rs.length_field_received == false) {
+ if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
+ stream_state->rs.remaining_bytes == 0) {
+ /* Start a read operation for data */
+ stream_state->rs.length_field_received = true;
+ stream_state->rs.length_field = stream_state->rs.remaining_bytes =
+ parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer);
+ CRONET_LOG(GPR_DEBUG, "length field = %d",
+ stream_state->rs.length_field);
+ if (stream_state->rs.length_field > 0) {
+ stream_state->rs.read_buffer =
+ gpr_malloc((size_t)stream_state->rs.length_field);
+ GPR_ASSERT(stream_state->rs.read_buffer);
+ stream_state->rs.remaining_bytes = stream_state->rs.length_field;
+ stream_state->rs.received_bytes = 0;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ stream_state->state_op_done[OP_READ_REQ_MADE] =
+ true; /* Indicates that at least one read request has been made */
+ cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else {
+ stream_state->rs.remaining_bytes = 0;
+ CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
+ gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
+ grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
+ &stream_state->rs.read_slice_buffer, 0);
+ *((grpc_byte_buffer **)stream_op->recv_message) =
+ (grpc_byte_buffer *)&stream_state->rs.sbs;
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
+ }
+ } else if (stream_state->rs.remaining_bytes == 0) {
+ /* Start a read operation for first 5 bytes (GRPC header) */
+ stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
+ stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
+ stream_state->rs.received_bytes = 0;
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs);
+ stream_state->state_op_done[OP_READ_REQ_MADE] =
+ true; /* Indicates that at least one read request has been made */
+ cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
+ stream_state->rs.remaining_bytes);
}
- s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
- GPR_ASSERT(s->cbs);
- s->read_closed = false;
- s->response_trailers_received = false;
- s->response_headers_received = false;
- s->cronet_send_state = CRONET_SEND_IDLE;
- s->cronet_recv_state = CRONET_RECV_IDLE;
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else if (stream_state->rs.remaining_bytes == 0) {
+ CRONET_LOG(GPR_DEBUG, "read operation complete");
+ gpr_slice read_data_slice =
+ gpr_slice_malloc((uint32_t)stream_state->rs.length_field);
+ uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
+ memcpy(dst_p, stream_state->rs.read_buffer,
+ (size_t)stream_state->rs.length_field);
+ gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
+ gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer,
+ read_data_slice);
+ grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
+ &stream_state->rs.read_slice_buffer, 0);
+ *((grpc_byte_buffer **)stream_op->recv_message) =
+ (grpc_byte_buffer *)&stream_state->rs.sbs;
+ grpc_exec_ctx_sched(exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
+ stream_state->state_op_done[OP_RECV_MESSAGE] = true;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+ /* Clear read state of the stream, so next read op (if it were to come)
+ * will work */
+ stream_state->rs.received_bytes = stream_state->rs.remaining_bytes =
+ stream_state->rs.length_field_received = 0;
+ result = ACTION_TAKEN_NO_CALLBACK;
}
- GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]);
- s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete;
- next_send_step(s);
- }
- if (op->send_trailing_metadata) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG,
- "perform_stream_op - send_trailing_metadata: on_complete=%p",
- op->on_complete);
+ } else if (stream_op->recv_trailing_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_RECV_TRAILING_METADATA)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
+ if (oas->s->state.rs.trailing_metadata_valid) {
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ &oas->s->state.rs.trailing_metadata,
+ stream_op->recv_trailing_metadata);
+ stream_state->rs.trailing_metadata_valid = false;
}
- GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]);
- s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete;
+ stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
+ result = ACTION_TAKEN_NO_CALLBACK;
+ } else if (stream_op->send_trailing_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_SEND_TRAILING_METADATA)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs);
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ cronet_bidirectional_stream_write(s->cbs, "", 0, true);
+ stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else if (stream_op->cancel_error &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_CANCEL_ERROR)) {
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
+ CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
if (s->cbs) {
- // Send an "empty" write to the far end to signal that we're done.
- // This will induce the server to send down trailers.
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
- }
- cronet_bidirectional_stream_write(s->cbs, "abc", 0, true);
- } else {
- // We never created a stream. This was probably an empty request.
- invoke_closing_callback(s);
+ cronet_bidirectional_stream_cancel(s->cbs);
}
+ stream_state->state_op_done[OP_CANCEL_ERROR] = true;
+ result = ACTION_TAKEN_WITH_CALLBACK;
+ } else if (stream_op->on_complete &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_ON_COMPLETE)) {
+ /* All actions in this stream_op are complete. Call the on_complete callback
+ */
+ CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
+ grpc_exec_ctx_sched(exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE,
+ NULL);
+ oas->state.state_op_done[OP_ON_COMPLETE] = true;
+ oas->done = true;
+ /* reset any send message state, only if this ON_COMPLETE is about a send.
+ */
+ if (stream_op->send_message) {
+ stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
+ stream_state->state_op_done[OP_SEND_MESSAGE] = false;
+ }
+ result = ACTION_TAKEN_NO_CALLBACK;
+ /* If this is the on_complete callback being called for a received message -
+ make a note */
+ if (stream_op->recv_message)
+ stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
+ } else {
+ result = NO_ACTION_POSSIBLE;
}
+ return result;
}
+/*
+ Functions used by upper layers to access transport functionality.
+*/
+
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data) {
stream_obj *s = (stream_obj *)gs;
- memset(s->callback_list, 0, sizeof(s->callback_list));
+ memset(&s->storage, 0, sizeof(s->storage));
+ s->storage.head = NULL;
+ memset(&s->state, 0, sizeof(s->state));
+ s->curr_op = NULL;
s->cbs = NULL;
- gpr_mu_init(&s->recv_mu);
- s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
- s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
- gpr_slice_buffer_init(&s->write_slice_buffer);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "cronet_transport - init_stream");
- }
+ memset(&s->header_array, 0, sizeof(s->header_array));
+ memset(&s->state.rs, 0, sizeof(s->state.rs));
+ memset(&s->state.ws, 0, sizeof(s->state.ws));
+ memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
+ memset(s->state.state_callback_received, 0,
+ sizeof(s->state.state_callback_received));
+ gpr_mu_init(&s->mu);
return 0;
}
-static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "Destroy stream");
- }
+static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset *pollset) {}
+
+static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
+ grpc_transport *gt, grpc_stream *gs,
+ grpc_pollset_set *pollset_set) {}
+
+static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_transport_stream_op *op) {
+ CRONET_LOG(GPR_DEBUG, "perform_stream_op");
stream_obj *s = (stream_obj *)gs;
- s->cbs = NULL;
- gpr_free(s->read_buffer);
- gpr_free(s->write_buffer);
- gpr_free(s->url);
- gpr_mu_destroy(&s->recv_mu);
- if (and_free_memory) {
- gpr_free(and_free_memory);
- }
+ s->curr_gs = gs;
+ memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport));
+ add_to_storage(s, op);
+ execute_from_storage(s);
}
-static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
- grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
- gpr_free(ct->host);
- if (grpc_cronet_trace) {
- gpr_log(GPR_DEBUG, "Destroy transport");
- }
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, void *and_free_memory) {}
+
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}
+
+static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+ return NULL;
}
+static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_transport_op *op) {}
+
const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
"cronet_http",
init_stream,
set_pollset_do_nothing,
set_pollset_set_do_nothing,
perform_stream_op,
- NULL,
+ perform_op,
destroy_stream,
destroy_transport,
- NULL};
+ get_peer};
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index 6c3ca198b7..c0979f5e80 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -62,11 +62,13 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_endpoint* endpoint,
grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer,
gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data) {
handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args,
- deadline, acceptor, cb, user_data);
+ read_buffer, deadline, acceptor, cb,
+ user_data);
}
//
@@ -143,7 +145,8 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
// handshakers together.
static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
grpc_endpoint* endpoint,
- grpc_channel_args* args, void* user_data,
+ grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer, void* user_data,
grpc_error* error) {
grpc_handshake_manager* mgr = user_data;
GPR_ASSERT(mgr->state != NULL);
@@ -151,8 +154,8 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
// If we got an error, skip all remaining handshakers and invoke the
// caller-supplied callback immediately.
if (error != GRPC_ERROR_NONE) {
- mgr->state->final_cb(exec_ctx, endpoint, args, mgr->state->final_user_data,
- error);
+ mgr->state->final_cb(exec_ctx, endpoint, args, read_buffer,
+ mgr->state->final_user_data, error);
return;
}
grpc_handshaker_done_cb cb = call_next_handshaker;
@@ -163,9 +166,9 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
user_data = mgr->state->final_user_data;
}
// Invoke handshaker.
- grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->state->index],
- endpoint, args, mgr->state->deadline,
- mgr->state->acceptor, cb, user_data);
+ grpc_handshaker_do_handshake(
+ exec_ctx, mgr->handshakers[mgr->state->index], endpoint, args,
+ read_buffer, mgr->state->deadline, mgr->state->acceptor, cb, user_data);
++mgr->state->index;
// If this is the last handshaker, clean up state.
if (mgr->state->index == mgr->count) {
@@ -180,10 +183,12 @@ void grpc_handshake_manager_do_handshake(
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data) {
grpc_channel_args* args_copy = grpc_channel_args_copy(args);
+ gpr_slice_buffer* read_buffer = malloc(sizeof(*read_buffer));
+ gpr_slice_buffer_init(read_buffer);
if (mgr->count == 0) {
// No handshakers registered, so we just immediately call the done
// callback with the passed-in endpoint.
- cb(exec_ctx, endpoint, args_copy, user_data, GRPC_ERROR_NONE);
+ cb(exec_ctx, endpoint, args_copy, read_buffer, user_data, GRPC_ERROR_NONE);
} else {
GPR_ASSERT(mgr->state == NULL);
mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state));
@@ -192,6 +197,7 @@ void grpc_handshake_manager_do_handshake(
mgr->state->acceptor = acceptor;
mgr->state->final_cb = cb;
mgr->state->final_user_data = user_data;
- call_next_handshaker(exec_ctx, endpoint, args_copy, mgr, GRPC_ERROR_NONE);
+ call_next_handshaker(exec_ctx, endpoint, args_copy, read_buffer, mgr,
+ GRPC_ERROR_NONE);
}
}
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
index dfc469c417..b276f6028c 100644
--- a/src/core/lib/channel/handshaker.h
+++ b/src/core/lib/channel/handshaker.h
@@ -36,6 +36,7 @@
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/impl/codegen/time.h>
+#include <grpc/support/slice_buffer.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
@@ -56,10 +57,11 @@
typedef struct grpc_handshaker grpc_handshaker;
/// Callback type invoked when a handshaker is done.
-/// Takes ownership of \a args.
+/// Takes ownership of \a args and \a read_buffer.
typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx,
grpc_endpoint* endpoint,
grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer,
void* user_data, grpc_error* error);
struct grpc_handshaker_vtable {
@@ -72,10 +74,12 @@ struct grpc_handshaker_vtable {
/// Performs handshaking. When finished, calls \a cb with \a user_data.
/// Takes ownership of \a args.
+ /// Takes ownership of \a read_buffer, which contains leftover bytes read
+ /// from the endpoint by the previous handshaker.
/// \a acceptor will be NULL for client-side handshakers.
void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
grpc_endpoint* endpoint, grpc_channel_args* args,
- gpr_timespec deadline,
+ gpr_slice_buffer* read_buffer, gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data);
};
@@ -101,6 +105,7 @@ void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_endpoint* endpoint,
grpc_channel_args* args,
+ gpr_slice_buffer* read_buffer,
gpr_timespec deadline,
grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data);
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c
index a57d93bb7b..0006e809a6 100644
--- a/src/core/lib/http/httpcli_security_connector.c
+++ b/src/core/lib/http/httpcli_security_connector.c
@@ -61,6 +61,7 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) {
static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
+ gpr_slice_buffer *read_buffer,
gpr_timespec deadline,
grpc_security_handshake_done_cb cb,
void *user_data) {
@@ -69,6 +70,7 @@ static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
tsi_result result = TSI_OK;
tsi_handshaker *handshaker;
if (c->handshaker_factory == NULL) {
+ gpr_free(read_buffer);
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
return;
}
@@ -77,10 +79,12 @@ static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.",
tsi_result_to_string(result));
+ gpr_free(read_buffer);
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
} else {
grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true,
- nonsecure_endpoint, deadline, cb, user_data);
+ nonsecure_endpoint, read_buffer, deadline, cb,
+ user_data);
}
}
@@ -183,7 +187,7 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg,
pem_root_certs, pem_root_certs_size, host, &sc) ==
GRPC_SECURITY_OK);
grpc_channel_security_connector_do_handshake(
- exec_ctx, sc, tcp, deadline, on_secure_transport_setup_done, c);
+ exec_ctx, sc, tcp, NULL, deadline, on_secure_transport_setup_done, c);
GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "httpcli");
}
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 6a63c4d1d1..02bcbaa10f 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -42,6 +42,7 @@
#include <assert.h>
#include <errno.h>
#include <poll.h>
+#include <pthread.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
diff --git a/src/core/lib/security/transport/handshake.c b/src/core/lib/security/transport/handshake.c
index 540a17283d..fbeec312b6 100644
--- a/src/core/lib/security/transport/handshake.c
+++ b/src/core/lib/security/transport/handshake.c
@@ -325,8 +325,9 @@ static void on_timeout(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
void grpc_do_security_handshake(
grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
grpc_security_connector *connector, bool is_client_side,
- grpc_endpoint *nonsecure_endpoint, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data) {
+ grpc_endpoint *nonsecure_endpoint, gpr_slice_buffer *read_buffer,
+ gpr_timespec deadline, grpc_security_handshake_done_cb cb,
+ void *user_data) {
grpc_security_connector_handshake_list *handshake_node;
grpc_security_handshake *h = gpr_malloc(sizeof(grpc_security_handshake));
memset(h, 0, sizeof(grpc_security_handshake));
@@ -346,6 +347,10 @@ void grpc_do_security_handshake(
gpr_slice_buffer_init(&h->left_overs);
gpr_slice_buffer_init(&h->outgoing);
gpr_slice_buffer_init(&h->incoming);
+ if (read_buffer != NULL) {
+ gpr_slice_buffer_move_into(read_buffer, &h->incoming);
+ gpr_free(read_buffer);
+ }
if (!is_client_side) {
grpc_server_security_connector *server_connector =
(grpc_server_security_connector *)connector;
diff --git a/src/core/lib/security/transport/handshake.h b/src/core/lib/security/transport/handshake.h
index c0906dd6af..53092f5421 100644
--- a/src/core/lib/security/transport/handshake.h
+++ b/src/core/lib/security/transport/handshake.h
@@ -37,12 +37,13 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/security/transport/security_connector.h"
-/* Calls the callback upon completion. Takes owership of handshaker. */
+/* Calls the callback upon completion. Takes owership of handshaker and
+ * read_buffer. */
void grpc_do_security_handshake(
grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
grpc_security_connector *connector, bool is_client_side,
- grpc_endpoint *nonsecure_endpoint, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data);
+ grpc_endpoint *nonsecure_endpoint, gpr_slice_buffer *read_buffer,
+ gpr_timespec deadline, grpc_security_handshake_done_cb cb, void *user_data);
void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx, void *handshake);
diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c
index f0ee6770e5..0eca46eb52 100644
--- a/src/core/lib/security/transport/security_connector.c
+++ b/src/core/lib/security/transport/security_connector.c
@@ -127,25 +127,29 @@ void grpc_server_security_connector_shutdown(
void grpc_channel_security_connector_do_handshake(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data) {
+ grpc_endpoint *nonsecure_endpoint, gpr_slice_buffer *read_buffer,
+ gpr_timespec deadline, grpc_security_handshake_done_cb cb,
+ void *user_data) {
if (sc == NULL || nonsecure_endpoint == NULL) {
+ gpr_free(read_buffer);
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
} else {
- sc->do_handshake(exec_ctx, sc, nonsecure_endpoint, deadline, cb, user_data);
+ sc->do_handshake(exec_ctx, sc, nonsecure_endpoint, read_buffer, deadline,
+ cb, user_data);
}
}
void grpc_server_security_connector_do_handshake(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
- gpr_timespec deadline, grpc_security_handshake_done_cb cb,
- void *user_data) {
+ gpr_slice_buffer *read_buffer, gpr_timespec deadline,
+ grpc_security_handshake_done_cb cb, void *user_data) {
if (sc == NULL || nonsecure_endpoint == NULL) {
+ gpr_free(read_buffer);
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL, NULL);
} else {
- sc->do_handshake(exec_ctx, sc, acceptor, nonsecure_endpoint, deadline, cb,
- user_data);
+ sc->do_handshake(exec_ctx, sc, acceptor, nonsecure_endpoint, read_buffer,
+ deadline, cb, user_data);
}
}
@@ -312,23 +316,23 @@ static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx,
static void fake_channel_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
+ gpr_slice_buffer *read_buffer,
gpr_timespec deadline,
grpc_security_handshake_done_cb cb,
void *user_data) {
grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(1), &sc->base,
- true, nonsecure_endpoint, deadline, cb, user_data);
+ true, nonsecure_endpoint, read_buffer, deadline,
+ cb, user_data);
}
-static void fake_server_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor,
- grpc_endpoint *nonsecure_endpoint,
- gpr_timespec deadline,
- grpc_security_handshake_done_cb cb,
- void *user_data) {
+static void fake_server_do_handshake(
+ grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
+ grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
+ gpr_slice_buffer *read_buffer, gpr_timespec deadline,
+ grpc_security_handshake_done_cb cb, void *user_data) {
grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(0), &sc->base,
- false, nonsecure_endpoint, deadline, cb,
- user_data);
+ false, nonsecure_endpoint, read_buffer, deadline,
+ cb, user_data);
}
static grpc_security_connector_vtable fake_channel_vtable = {
@@ -418,6 +422,7 @@ static grpc_security_status ssl_create_handshaker(
static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
+ gpr_slice_buffer *read_buffer,
gpr_timespec deadline,
grpc_security_handshake_done_cb cb,
void *user_data) {
@@ -430,30 +435,32 @@ static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx,
: c->target_name,
&handshaker);
if (status != GRPC_SECURITY_OK) {
+ gpr_free(read_buffer);
cb(exec_ctx, user_data, status, NULL, NULL);
} else {
grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, true,
- nonsecure_endpoint, deadline, cb, user_data);
+ nonsecure_endpoint, read_buffer, deadline, cb,
+ user_data);
}
}
-static void ssl_server_do_handshake(grpc_exec_ctx *exec_ctx,
- grpc_server_security_connector *sc,
- grpc_tcp_server_acceptor *acceptor,
- grpc_endpoint *nonsecure_endpoint,
- gpr_timespec deadline,
- grpc_security_handshake_done_cb cb,
- void *user_data) {
+static void ssl_server_do_handshake(
+ grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
+ grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
+ gpr_slice_buffer *read_buffer, gpr_timespec deadline,
+ grpc_security_handshake_done_cb cb, void *user_data) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
tsi_handshaker *handshaker;
grpc_security_status status =
ssl_create_handshaker(c->handshaker_factory, false, NULL, &handshaker);
if (status != GRPC_SECURITY_OK) {
+ gpr_free(read_buffer);
cb(exec_ctx, user_data, status, NULL, NULL);
} else {
grpc_do_security_handshake(exec_ctx, handshaker, &sc->base, false,
- nonsecure_endpoint, deadline, cb, user_data);
+ nonsecure_endpoint, read_buffer, deadline, cb,
+ user_data);
}
}
diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h
index c2ddf5ee1e..0b5b44bf1a 100644
--- a/src/core/lib/security/transport/security_connector.h
+++ b/src/core/lib/security/transport/security_connector.h
@@ -143,7 +143,8 @@ struct grpc_channel_security_connector {
grpc_security_call_host_check_cb cb, void *user_data);
void (*do_handshake)(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc,
- grpc_endpoint *nonsecure_endpoint, gpr_timespec deadline,
+ grpc_endpoint *nonsecure_endpoint,
+ gpr_slice_buffer *read_buffer, gpr_timespec deadline,
grpc_security_handshake_done_cb cb, void *user_data);
};
@@ -156,8 +157,8 @@ void grpc_channel_security_connector_check_call_host(
/* Handshake. */
void grpc_channel_security_connector_do_handshake(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector,
- grpc_endpoint *nonsecure_endpoint, gpr_timespec deadline,
- grpc_security_handshake_done_cb cb, void *user_data);
+ grpc_endpoint *nonsecure_endpoint, gpr_slice_buffer *read_buffer,
+ gpr_timespec deadline, grpc_security_handshake_done_cb cb, void *user_data);
/* --- server_security_connector object. ---
@@ -174,14 +175,16 @@ struct grpc_server_security_connector {
void (*do_handshake)(grpc_exec_ctx *exec_ctx,
grpc_server_security_connector *sc,
grpc_tcp_server_acceptor *acceptor,
- grpc_endpoint *nonsecure_endpoint, gpr_timespec deadline,
+ grpc_endpoint *nonsecure_endpoint,
+ gpr_slice_buffer *read_buffer, gpr_timespec deadline,
grpc_security_handshake_done_cb cb, void *user_data);
};
void grpc_server_security_connector_do_handshake(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
grpc_tcp_server_acceptor *acceptor, grpc_endpoint *nonsecure_endpoint,
- gpr_timespec deadline, grpc_security_handshake_done_cb cb, void *user_data);
+ gpr_slice_buffer *read_buffer, gpr_timespec deadline,
+ grpc_security_handshake_done_cb cb, void *user_data);
void grpc_server_security_connector_shutdown(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector);
diff --git a/src/core/lib/support/log_linux.c b/src/core/lib/support/log_linux.c
index 508fae4eec..299b377373 100644
--- a/src/core/lib/support/log_linux.c
+++ b/src/core/lib/support/log_linux.c
@@ -47,7 +47,6 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
-#include <linux/unistd.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc
index c2cd20ee07..4c4772a92b 100644
--- a/src/cpp/util/byte_buffer.cc
+++ b/src/cpp/util/byte_buffer.cc
@@ -37,12 +37,19 @@
namespace grpc {
ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) {
- // TODO(yangg) maybe expose some core API to simplify this
- std::vector<gpr_slice> c_slices(nslices);
- for (size_t i = 0; i < nslices; i++) {
- c_slices[i] = slices[i].slice_;
- }
- buffer_ = grpc_raw_byte_buffer_create(c_slices.data(), nslices);
+ // The following assertions check that the representation of a grpc::Slice is
+ // identical to that of a gpr_slice: it has a gpr_slice field, and nothing
+ // else.
+ static_assert(std::is_same<decltype(slices[0].slice_), gpr_slice>::value,
+ "Slice must have same representation as gpr_slice");
+ static_assert(sizeof(Slice) == sizeof(gpr_slice),
+ "Slice must have same representation as gpr_slice");
+ // The const_cast is legal if grpc_raw_byte_buffer_create() does no more
+ // than its advertised side effect of increasing the reference count of the
+ // slices it processes, and such an increase does not affect the semantics
+ // seen by the caller of this constructor.
+ buffer_ = grpc_raw_byte_buffer_create(
+ reinterpret_cast<gpr_slice*>(const_cast<Slice*>(slices)), nslices);
}
ByteBuffer::~ByteBuffer() {
@@ -95,4 +102,10 @@ ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) {
return *this;
}
+void ByteBuffer::Swap(ByteBuffer* other) {
+ grpc_byte_buffer* tmp = other->buffer_;
+ other->buffer_ = this->buffer_;
+ this->buffer_ = tmp;
+}
+
} // namespace grpc
diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.csproj b/src/csharp/Grpc.Auth/Grpc.Auth.csproj
index 1fa14fc3df..7a6955311a 100644
--- a/src/csharp/Grpc.Auth/Grpc.Auth.csproj
+++ b/src/csharp/Grpc.Auth/Grpc.Auth.csproj
@@ -39,30 +39,25 @@
<AssemblyOriginatorKeyFile>..\keys\Grpc.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
- <Reference Include="BouncyCastle.Crypto, Version=1.7.4137.9688, Culture=neutral, PublicKeyToken=a4292a325f69b123, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="System" />
+ <Reference Include="System.Net" />
+ <Reference Include="System.Net.Http" />
+ <Reference Include="System.Net.Http.WebRequest" />
+ <Reference Include="BouncyCastle.Crypto">
<HintPath>..\packages\BouncyCastle.1.7.0\lib\Net40-Client\BouncyCastle.Crypto.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Auth, Version=1.11.1.0, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Google.Apis.Auth.1.11.1\lib\net45\Google.Apis.Auth.dll</HintPath>
+ <Reference Include="Newtonsoft.Json">
+ <HintPath>..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Auth.PlatformServices, Version=1.11.1.0, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Google.Apis.Auth.1.11.1\lib\net45\Google.Apis.Auth.PlatformServices.dll</HintPath>
+ <Reference Include="Google.Apis.Core">
+ <HintPath>..\packages\Google.Apis.Core.1.15.0\lib\net45\Google.Apis.Core.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Core, Version=1.11.1.0, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Google.Apis.Core.1.11.1\lib\net45\Google.Apis.Core.dll</HintPath>
+ <Reference Include="Google.Apis.Auth">
+ <HintPath>..\packages\Google.Apis.Auth.1.15.0\lib\net45\Google.Apis.Auth.dll</HintPath>
</Reference>
- <Reference Include="Newtonsoft.Json, Version=7.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
+ <Reference Include="Google.Apis.Auth.PlatformServices">
+ <HintPath>..\packages\Google.Apis.Auth.1.15.0\lib\net45\Google.Apis.Auth.PlatformServices.dll</HintPath>
</Reference>
- <Reference Include="System" />
- <Reference Include="System.Net" />
- <Reference Include="System.Net.Http" />
- <Reference Include="System.Net.Http.WebRequest" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\Grpc.Core\Version.cs">
diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec
index 4baed3704c..a1f5668e2e 100644
--- a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec
+++ b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec
@@ -15,7 +15,7 @@
<copyright>Copyright 2015, Google Inc.</copyright>
<tags>gRPC RPC Protocol HTTP/2 Auth OAuth2</tags>
<dependencies>
- <dependency id="Google.Apis.Auth" version="1.11.1" />
+ <dependency id="Google.Apis.Auth" version="1.15.0" />
<dependency id="Grpc.Core" version="$version$" />
</dependencies>
</metadata>
diff --git a/src/csharp/Grpc.Auth/packages.config b/src/csharp/Grpc.Auth/packages.config
index c20d9ceed6..738d3e6f3b 100644
--- a/src/csharp/Grpc.Auth/packages.config
+++ b/src/csharp/Grpc.Auth/packages.config
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="BouncyCastle" version="1.7.0" targetFramework="net45" />
- <package id="Google.Apis.Auth" version="1.11.1" targetFramework="net45" />
- <package id="Google.Apis.Core" version="1.11.1" targetFramework="net45" />
+ <package id="Google.Apis.Auth" version="1.15.0" targetFramework="net45" />
+ <package id="Google.Apis.Core" version="1.15.0" targetFramework="net45" />
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Auth/project.json b/src/csharp/Grpc.Auth/project.json
index 6215ca3585..30f0944693 100644
--- a/src/csharp/Grpc.Auth/project.json
+++ b/src/csharp/Grpc.Auth/project.json
@@ -23,13 +23,13 @@
},
"dependencies": {
"Grpc.Core": "1.1.0-dev",
- "Google.Apis.Auth": "1.11.1"
+ "Google.Apis.Auth": "1.15.0"
},
"frameworks": {
"net45": { },
"netstandard1.5": {
"imports": [
- "net45"
+ "portable-net45"
],
"dependencies": {
"Microsoft.NETCore.Portable.Compatibility": "1.0.1",
diff --git a/src/csharp/Grpc.Core.Tests/SanityTest.cs b/src/csharp/Grpc.Core.Tests/SanityTest.cs
index df887d82b3..f1eb13dffc 100644
--- a/src/csharp/Grpc.Core.Tests/SanityTest.cs
+++ b/src/csharp/Grpc.Core.Tests/SanityTest.cs
@@ -58,9 +58,9 @@ namespace Grpc.Core.Tests
[Test]
public void TestsJsonUpToDate()
{
- Dictionary<string, List<string>> discoveredTests = DiscoverAllTestClasses();
- Dictionary<string, List<string>> testsFromFile
- = JsonConvert.DeserializeObject<Dictionary<string, List<string>>>(ReadTestsJson());
+ var discoveredTests = DiscoverAllTestClasses();
+ var testsFromFile
+ = JsonConvert.DeserializeObject<Dictionary<string, List<string>>>(ReadTestsJson());
Assert.AreEqual(discoveredTests, testsFromFile);
}
diff --git a/src/csharp/Grpc.Examples.MathClient/project.json b/src/csharp/Grpc.Examples.MathClient/project.json
index 764a335ddf..ad319478ab 100644
--- a/src/csharp/Grpc.Examples.MathClient/project.json
+++ b/src/csharp/Grpc.Examples.MathClient/project.json
@@ -42,6 +42,11 @@
}
}
},
+ "runtimes": {
+ "win7-x64": { },
+ "debian.8-x64": { },
+ "osx.10.11-x64": { }
+ },
"dependencies": {
"Grpc.Examples": {
diff --git a/src/csharp/Grpc.Examples.MathServer/project.json b/src/csharp/Grpc.Examples.MathServer/project.json
index 764a335ddf..ad319478ab 100644
--- a/src/csharp/Grpc.Examples.MathServer/project.json
+++ b/src/csharp/Grpc.Examples.MathServer/project.json
@@ -42,6 +42,11 @@
}
}
},
+ "runtimes": {
+ "win7-x64": { },
+ "debian.8-x64": { },
+ "osx.10.11-x64": { }
+ },
"dependencies": {
"Grpc.Examples": {
diff --git a/src/csharp/Grpc.Examples/project.json b/src/csharp/Grpc.Examples/project.json
index 5329f390e4..98bd5d852c 100644
--- a/src/csharp/Grpc.Examples/project.json
+++ b/src/csharp/Grpc.Examples/project.json
@@ -20,11 +20,12 @@
"System.IO": ""
}
},
- "netstandard1.5": {
+ "netcoreapp1.0": {
"imports": [
"portable-net45"
],
"dependencies": {
+ "Microsoft.NETCore.App": "1.0.0",
"NETStandard.Library": "1.6.0"
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj b/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj
index 91fb3ce5bc..6816b5c5a2 100644
--- a/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj
+++ b/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj
@@ -39,30 +39,25 @@
<AssemblyOriginatorKeyFile>..\keys\Grpc.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
- <Reference Include="BouncyCastle.Crypto, Version=1.7.4137.9688, Culture=neutral, PublicKeyToken=a4292a325f69b123, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="System" />
+ <Reference Include="System.Net" />
+ <Reference Include="System.Net.Http" />
+ <Reference Include="System.Net.Http.WebRequest" />
+ <Reference Include="BouncyCastle.Crypto">
<HintPath>..\packages\BouncyCastle.1.7.0\lib\Net40-Client\BouncyCastle.Crypto.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Auth, Version=1.11.1.0, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Google.Apis.Auth.1.11.1\lib\net45\Google.Apis.Auth.dll</HintPath>
+ <Reference Include="Newtonsoft.Json">
+ <HintPath>..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Auth.PlatformServices, Version=1.11.1.0, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Google.Apis.Auth.1.11.1\lib\net45\Google.Apis.Auth.PlatformServices.dll</HintPath>
+ <Reference Include="Google.Apis.Core">
+ <HintPath>..\packages\Google.Apis.Core.1.15.0\lib\net45\Google.Apis.Core.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Core, Version=1.11.1.0, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Google.Apis.Core.1.11.1\lib\net45\Google.Apis.Core.dll</HintPath>
+ <Reference Include="Google.Apis.Auth">
+ <HintPath>..\packages\Google.Apis.Auth.1.15.0\lib\net45\Google.Apis.Auth.dll</HintPath>
</Reference>
- <Reference Include="Newtonsoft.Json, Version=7.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
+ <Reference Include="Google.Apis.Auth.PlatformServices">
+ <HintPath>..\packages\Google.Apis.Auth.1.15.0\lib\net45\Google.Apis.Auth.PlatformServices.dll</HintPath>
</Reference>
- <Reference Include="System" />
- <Reference Include="System.Net" />
- <Reference Include="System.Net.Http" />
- <Reference Include="System.Net.Http.WebRequest" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\Grpc.Core\Version.cs">
diff --git a/src/csharp/Grpc.IntegrationTesting.Client/packages.config b/src/csharp/Grpc.IntegrationTesting.Client/packages.config
index c20d9ceed6..738d3e6f3b 100644
--- a/src/csharp/Grpc.IntegrationTesting.Client/packages.config
+++ b/src/csharp/Grpc.IntegrationTesting.Client/packages.config
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="BouncyCastle" version="1.7.0" targetFramework="net45" />
- <package id="Google.Apis.Auth" version="1.11.1" targetFramework="net45" />
- <package id="Google.Apis.Core" version="1.11.1" targetFramework="net45" />
+ <package id="Google.Apis.Auth" version="1.15.0" targetFramework="net45" />
+ <package id="Google.Apis.Core" version="1.15.0" targetFramework="net45" />
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj b/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj
index dda26a6892..593bf0939d 100644
--- a/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj
+++ b/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj
@@ -58,7 +58,6 @@
</ProjectReference>
</ItemGroup>
<ItemGroup>
- <None Include="app.config" />
<None Include="Grpc.IntegrationTesting.QpsWorker.project.json" />
</ItemGroup>
</Project> \ No newline at end of file
diff --git a/src/csharp/Grpc.IntegrationTesting.QpsWorker/app.config b/src/csharp/Grpc.IntegrationTesting.QpsWorker/app.config
deleted file mode 100644
index e204447bb3..0000000000
--- a/src/csharp/Grpc.IntegrationTesting.QpsWorker/app.config
+++ /dev/null
@@ -1,15 +0,0 @@
-<?xml version="1.0" encoding="utf-8"?>
-<configuration>
- <runtime>
- <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1">
- <dependentAssembly>
- <assemblyIdentity name="System.Net.Http.Primitives" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
- <bindingRedirect oldVersion="0.0.0.0-4.2.29.0" newVersion="4.2.29.0" />
- </dependentAssembly>
- <dependentAssembly>
- <assemblyIdentity name="Google.Apis.Core" publicKeyToken="4b01fa6e34db77ab" culture="neutral" />
- <bindingRedirect oldVersion="0.0.0.0-1.11.1.0" newVersion="1.11.1.0" />
- </dependentAssembly>
- </assemblyBinding>
- </runtime>
-</configuration> \ No newline at end of file
diff --git a/src/csharp/Grpc.IntegrationTesting.QpsWorker/project.json b/src/csharp/Grpc.IntegrationTesting.QpsWorker/project.json
index 9afcb306b4..287950720f 100644
--- a/src/csharp/Grpc.IntegrationTesting.QpsWorker/project.json
+++ b/src/csharp/Grpc.IntegrationTesting.QpsWorker/project.json
@@ -44,6 +44,11 @@
}
}
},
+ "runtimes": {
+ "win7-x64": { },
+ "debian.8-x64": { },
+ "osx.10.11-x64": { }
+ },
"dependencies": {
"Grpc.IntegrationTesting": {
diff --git a/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj b/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj
index f73d99dbd1..987387ca25 100644
--- a/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj
+++ b/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj
@@ -39,30 +39,25 @@
<AssemblyOriginatorKeyFile>..\keys\Grpc.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
- <Reference Include="BouncyCastle.Crypto, Version=1.7.4137.9688, Culture=neutral, PublicKeyToken=a4292a325f69b123, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="System" />
+ <Reference Include="System.Net" />
+ <Reference Include="System.Net.Http" />
+ <Reference Include="System.Net.Http.WebRequest" />
+ <Reference Include="BouncyCastle.Crypto">
<HintPath>..\packages\BouncyCastle.1.7.0\lib\Net40-Client\BouncyCastle.Crypto.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Auth, Version=1.11.1.0, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Google.Apis.Auth.1.11.1\lib\net45\Google.Apis.Auth.dll</HintPath>
+ <Reference Include="Newtonsoft.Json">
+ <HintPath>..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Auth.PlatformServices, Version=1.11.1.0, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Google.Apis.Auth.1.11.1\lib\net45\Google.Apis.Auth.PlatformServices.dll</HintPath>
+ <Reference Include="Google.Apis.Core">
+ <HintPath>..\packages\Google.Apis.Core.1.15.0\lib\net45\Google.Apis.Core.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Core, Version=1.11.1.0, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Google.Apis.Core.1.11.1\lib\net45\Google.Apis.Core.dll</HintPath>
+ <Reference Include="Google.Apis.Auth">
+ <HintPath>..\packages\Google.Apis.Auth.1.15.0\lib\net45\Google.Apis.Auth.dll</HintPath>
</Reference>
- <Reference Include="Newtonsoft.Json, Version=7.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
- <HintPath>..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
+ <Reference Include="Google.Apis.Auth.PlatformServices">
+ <HintPath>..\packages\Google.Apis.Auth.1.15.0\lib\net45\Google.Apis.Auth.PlatformServices.dll</HintPath>
</Reference>
- <Reference Include="System" />
- <Reference Include="System.Net" />
- <Reference Include="System.Net.Http" />
- <Reference Include="System.Net.Http.WebRequest" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\Grpc.Core\Version.cs">
diff --git a/src/csharp/Grpc.IntegrationTesting.Server/packages.config b/src/csharp/Grpc.IntegrationTesting.Server/packages.config
index c20d9ceed6..738d3e6f3b 100644
--- a/src/csharp/Grpc.IntegrationTesting.Server/packages.config
+++ b/src/csharp/Grpc.IntegrationTesting.Server/packages.config
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="BouncyCastle" version="1.7.0" targetFramework="net45" />
- <package id="Google.Apis.Auth" version="1.11.1" targetFramework="net45" />
- <package id="Google.Apis.Core" version="1.11.1" targetFramework="net45" />
+ <package id="Google.Apis.Auth" version="1.15.0" targetFramework="net45" />
+ <package id="Google.Apis.Core" version="1.15.0" targetFramework="net45" />
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.IntegrationTesting.StressClient/project.json b/src/csharp/Grpc.IntegrationTesting.StressClient/project.json
index 9afcb306b4..287950720f 100644
--- a/src/csharp/Grpc.IntegrationTesting.StressClient/project.json
+++ b/src/csharp/Grpc.IntegrationTesting.StressClient/project.json
@@ -44,6 +44,11 @@
}
}
},
+ "runtimes": {
+ "win7-x64": { },
+ "debian.8-x64": { },
+ "osx.10.11-x64": { }
+ },
"dependencies": {
"Grpc.IntegrationTesting": {
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 7512d2a5d1..e030b21eec 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -38,9 +38,6 @@
<AssemblyOriginatorKeyFile>..\keys\Grpc.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
- <Reference Include="CommandLine">
- <HintPath>..\packages\CommandLineParser.1.9.71\lib\net45\CommandLine.dll</HintPath>
- </Reference>
<Reference Include="Moq">
<HintPath>..\packages\Moq.4.2.1510.2205\lib\net40\Moq.dll</HintPath>
</Reference>
@@ -51,15 +48,6 @@
<Reference Include="BouncyCastle.Crypto">
<HintPath>..\packages\BouncyCastle.1.7.0\lib\Net40-Client\BouncyCastle.Crypto.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Auth">
- <HintPath>..\packages\Google.Apis.Auth.1.11.1\lib\net45\Google.Apis.Auth.dll</HintPath>
- </Reference>
- <Reference Include="Google.Apis.Auth.PlatformServices">
- <HintPath>..\packages\Google.Apis.Auth.1.11.1\lib\net45\Google.Apis.Auth.PlatformServices.dll</HintPath>
- </Reference>
- <Reference Include="Google.Apis.Core">
- <HintPath>..\packages\Google.Apis.Core.1.11.1\lib\net45\Google.Apis.Core.dll</HintPath>
- </Reference>
<Reference Include="Google.Protobuf">
<HintPath>..\packages\Google.Protobuf.3.0.0-beta3\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath>
</Reference>
@@ -75,6 +63,18 @@
<Reference Include="nunitlite">
<HintPath>..\packages\NUnitLite.3.2.0\lib\net45\nunitlite.dll</HintPath>
</Reference>
+ <Reference Include="Google.Apis.Core">
+ <HintPath>..\packages\Google.Apis.Core.1.15.0\lib\net45\Google.Apis.Core.dll</HintPath>
+ </Reference>
+ <Reference Include="Google.Apis.Auth">
+ <HintPath>..\packages\Google.Apis.Auth.1.15.0\lib\net45\Google.Apis.Auth.dll</HintPath>
+ </Reference>
+ <Reference Include="Google.Apis.Auth.PlatformServices">
+ <HintPath>..\packages\Google.Apis.Auth.1.15.0\lib\net45\Google.Apis.Auth.PlatformServices.dll</HintPath>
+ </Reference>
+ <Reference Include="CommandLineParser.Unofficial">
+ <HintPath>..\packages\CommandLineParser.Unofficial.2.0.275\lib\net45\CommandLineParser.Unofficial.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="..\Grpc.Core\Version.cs">
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index ec407d3fcf..79fd18b6d5 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -56,24 +56,24 @@ namespace Grpc.IntegrationTesting
{
private class ClientOptions
{
- [Option("server_host", DefaultValue = "127.0.0.1")]
+ [Option("server_host", Default = "127.0.0.1")]
public string ServerHost { get; set; }
- [Option("server_host_override", DefaultValue = TestCredentials.DefaultHostOverride)]
+ [Option("server_host_override", Default = TestCredentials.DefaultHostOverride)]
public string ServerHostOverride { get; set; }
[Option("server_port", Required = true)]
public int ServerPort { get; set; }
- [Option("test_case", DefaultValue = "large_unary")]
+ [Option("test_case", Default = "large_unary")]
public string TestCase { get; set; }
// Deliberately using nullable bool type to allow --use_tls=true syntax (as opposed to --use_tls)
- [Option("use_tls", DefaultValue = false)]
+ [Option("use_tls", Default = false)]
public bool? UseTls { get; set; }
// Deliberately using nullable bool type to allow --use_test_ca=true syntax (as opposed to --use_test_ca)
- [Option("use_test_ca", DefaultValue = false)]
+ [Option("use_test_ca", Default = false)]
public bool? UseTestCa { get; set; }
[Option("default_service_account", Required = false)]
@@ -84,19 +84,6 @@ namespace Grpc.IntegrationTesting
[Option("service_account_key_file", Required = false)]
public string ServiceAccountKeyFile { get; set; }
-
- [HelpOption]
- public string GetUsage()
- {
- var help = new HelpText
- {
- Heading = "gRPC C# interop testing client",
- AddDashesToOption = true
- };
- help.AddPreOptionsLine("Usage:");
- help.AddOptions(this);
- return help;
- }
}
ClientOptions options;
@@ -108,14 +95,13 @@ namespace Grpc.IntegrationTesting
public static void Run(string[] args)
{
- var options = new ClientOptions();
- if (!Parser.Default.ParseArguments(args, options))
- {
- Environment.Exit(1);
- }
-
- var interopClient = new InteropClient(options);
- interopClient.Run().Wait();
+ var parserResult = Parser.Default.ParseArguments<ClientOptions>(args)
+ .WithNotParsed(errors => Environment.Exit(1))
+ .WithParsed(options =>
+ {
+ var interopClient = new InteropClient(options);
+ interopClient.Run().Wait();
+ });
}
private async Task Run()
@@ -145,26 +131,16 @@ namespace Grpc.IntegrationTesting
if (options.TestCase == "jwt_token_creds")
{
-#if !NETCOREAPP1_0
var googleCredential = await GoogleCredential.GetApplicationDefaultAsync();
Assert.IsTrue(googleCredential.IsCreateScopedRequired);
credentials = ChannelCredentials.Create(credentials, googleCredential.ToCallCredentials());
-#else
- // TODO(jtattermusch): implement this
- throw new NotImplementedException("Not supported on CoreCLR yet");
-#endif
}
if (options.TestCase == "compute_engine_creds")
{
-#if !NETCOREAPP1_0
var googleCredential = await GoogleCredential.GetApplicationDefaultAsync();
Assert.IsFalse(googleCredential.IsCreateScopedRequired);
credentials = ChannelCredentials.Create(credentials, googleCredential.ToCallCredentials());
-#else
- // TODO(jtattermusch): implement this
- throw new NotImplementedException("Not supported on CoreCLR yet");
-#endif
}
return credentials;
}
@@ -395,7 +371,6 @@ namespace Grpc.IntegrationTesting
public static async Task RunOAuth2AuthTokenAsync(TestService.TestServiceClient client, string oauthScope)
{
-#if !NETCOREAPP1_0
Console.WriteLine("running oauth2_auth_token");
ITokenAccess credential = (await GoogleCredential.GetApplicationDefaultAsync()).CreateScoped(new[] { oauthScope });
string oauth2Token = await credential.GetAccessTokenForRequestAsync();
@@ -413,15 +388,10 @@ namespace Grpc.IntegrationTesting
Assert.True(oauthScope.Contains(response.OauthScope));
Assert.AreEqual(GetEmailFromServiceAccountFile(), response.Username);
Console.WriteLine("Passed!");
-#else
- // TODO(jtattermusch): implement this
- throw new NotImplementedException("Not supported on CoreCLR yet");
-#endif
}
public static async Task RunPerRpcCredsAsync(TestService.TestServiceClient client, string oauthScope)
{
-#if !NETCOREAPP1_0
Console.WriteLine("running per_rpc_creds");
ITokenAccess googleCredential = await GoogleCredential.GetApplicationDefaultAsync();
@@ -435,10 +405,6 @@ namespace Grpc.IntegrationTesting
Assert.AreEqual(GetEmailFromServiceAccountFile(), response.Username);
Console.WriteLine("Passed!");
-#else
- // TODO(jtattermusch): implement this
- throw new NotImplementedException("Not supported on CoreCLR yet");
-#endif
}
public static async Task RunCancelAfterBeginAsync(TestService.TestServiceClient client)
@@ -731,17 +697,12 @@ namespace Grpc.IntegrationTesting
// extracts the client_email field from service account file used for auth test cases
private static string GetEmailFromServiceAccountFile()
{
-#if !NETCOREAPP1_0
string keyFile = Environment.GetEnvironmentVariable("GOOGLE_APPLICATION_CREDENTIALS");
Assert.IsNotNull(keyFile);
var jobject = JObject.Parse(File.ReadAllText(keyFile));
string email = jobject.GetValue("client_email").Value<string>();
Assert.IsTrue(email.Length > 0); // spec requires nonempty client email.
return email;
-#else
- // TODO(jtattermusch): implement this
- throw new NotImplementedException("Not supported on CoreCLR yet");
-#endif
}
private static Metadata CreateTestMetadata()
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
index cd47e31c2b..4118f99c2b 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
@@ -51,25 +51,12 @@ namespace Grpc.IntegrationTesting
{
private class ServerOptions
{
- [Option("port", DefaultValue = 8070)]
+ [Option("port", Default = 8070)]
public int Port { get; set; }
// Deliberately using nullable bool type to allow --use_tls=true syntax (as opposed to --use_tls)
- [Option("use_tls", DefaultValue = false)]
+ [Option("use_tls", Default = false)]
public bool? UseTls { get; set; }
-
- [HelpOption]
- public string GetUsage()
- {
- var help = new HelpText
- {
- Heading = "gRPC C# interop testing server",
- AddDashesToOption = true
- };
- help.AddPreOptionsLine("Usage:");
- help.AddOptions(this);
- return help;
- }
}
ServerOptions options;
@@ -81,14 +68,13 @@ namespace Grpc.IntegrationTesting
public static void Run(string[] args)
{
- var options = new ServerOptions();
- if (!Parser.Default.ParseArguments(args, options))
- {
- Environment.Exit(1);
- }
-
- var interopServer = new InteropServer(options);
- interopServer.Run();
+ var parserResult = Parser.Default.ParseArguments<ServerOptions>(args)
+ .WithNotParsed(errors => Environment.Exit(1))
+ .WithParsed(options =>
+ {
+ var interopServer = new InteropServer(options);
+ interopServer.Run();
+ });
}
private void Run()
diff --git a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
index a7c9fa894d..865556c242 100644
--- a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
+++ b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs
@@ -52,21 +52,8 @@ namespace Grpc.IntegrationTesting
{
private class ServerOptions
{
- [Option("driver_port", DefaultValue = 0)]
+ [Option("driver_port", Default = 0)]
public int DriverPort { get; set; }
-
- [HelpOption]
- public string GetUsage()
- {
- var help = new HelpText
- {
- Heading = "gRPC C# performance testing worker",
- AddDashesToOption = true
- };
- help.AddPreOptionsLine("Usage:");
- help.AddOptions(this);
- return help;
- }
}
ServerOptions options;
@@ -78,14 +65,13 @@ namespace Grpc.IntegrationTesting
public static void Run(string[] args)
{
- var options = new ServerOptions();
- if (!Parser.Default.ParseArguments(args, options))
- {
- Environment.Exit(1);
- }
-
- var workerServer = new QpsWorker(options);
- workerServer.RunAsync().Wait();
+ var parserResult = Parser.Default.ParseArguments<ServerOptions>(args)
+ .WithNotParsed((x) => Environment.Exit(1))
+ .WithParsed(options =>
+ {
+ var workerServer = new QpsWorker(options);
+ workerServer.RunAsync().Wait();
+ });
}
private async Task RunAsync()
diff --git a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
index 74ee040ae4..750613b078 100644
--- a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
@@ -54,36 +54,23 @@ namespace Grpc.IntegrationTesting
private class ClientOptions
{
- [Option("server_addresses", DefaultValue = "localhost:8080")]
+ [Option("server_addresses", Default = "localhost:8080")]
public string ServerAddresses { get; set; }
- [Option("test_cases", DefaultValue = "large_unary:100")]
+ [Option("test_cases", Default = "large_unary:100")]
public string TestCases { get; set; }
- [Option("test_duration_secs", DefaultValue = -1)]
+ [Option("test_duration_secs", Default = -1)]
public int TestDurationSecs { get; set; }
- [Option("num_channels_per_server", DefaultValue = 1)]
+ [Option("num_channels_per_server", Default = 1)]
public int NumChannelsPerServer { get; set; }
- [Option("num_stubs_per_channel", DefaultValue = 1)]
+ [Option("num_stubs_per_channel", Default = 1)]
public int NumStubsPerChannel { get; set; }
- [Option("metrics_port", DefaultValue = 8081)]
+ [Option("metrics_port", Default = 8081)]
public int MetricsPort { get; set; }
-
- [HelpOption]
- public string GetUsage()
- {
- var help = new HelpText
- {
- Heading = "gRPC C# stress test client",
- AddDashesToOption = true
- };
- help.AddPreOptionsLine("Usage:");
- help.AddOptions(this);
- return help;
- }
}
ClientOptions options;
@@ -105,23 +92,21 @@ namespace Grpc.IntegrationTesting
public static void Run(string[] args)
{
- var options = new ClientOptions();
- if (!Parser.Default.ParseArguments(args, options))
- {
- Environment.Exit(1);
- }
-
- GrpcPreconditions.CheckArgument(options.NumChannelsPerServer > 0);
- GrpcPreconditions.CheckArgument(options.NumStubsPerChannel > 0);
+ var parserResult = Parser.Default.ParseArguments<ClientOptions>(args)
+ .WithNotParsed((x) => Environment.Exit(1))
+ .WithParsed(options => {
+ GrpcPreconditions.CheckArgument(options.NumChannelsPerServer > 0);
+ GrpcPreconditions.CheckArgument(options.NumStubsPerChannel > 0);
- var serverAddresses = options.ServerAddresses.Split(',');
- GrpcPreconditions.CheckArgument(serverAddresses.Length > 0, "You need to provide at least one server address");
+ var serverAddresses = options.ServerAddresses.Split(',');
+ GrpcPreconditions.CheckArgument(serverAddresses.Length > 0, "You need to provide at least one server address");
- var testCases = ParseWeightedTestCases(options.TestCases);
- GrpcPreconditions.CheckArgument(testCases.Count > 0, "You need to provide at least one test case");
+ var testCases = ParseWeightedTestCases(options.TestCases);
+ GrpcPreconditions.CheckArgument(testCases.Count > 0, "You need to provide at least one test case");
- var interopClient = new StressTestClient(options, serverAddresses.ToList(), testCases);
- interopClient.Run().Wait();
+ var interopClient = new StressTestClient(options, serverAddresses.ToList(), testCases);
+ interopClient.Run().Wait();
+ });
}
async Task Run()
diff --git a/src/csharp/Grpc.IntegrationTesting/packages.config b/src/csharp/Grpc.IntegrationTesting/packages.config
index e6e64e6558..8bf9dd4937 100644
--- a/src/csharp/Grpc.IntegrationTesting/packages.config
+++ b/src/csharp/Grpc.IntegrationTesting/packages.config
@@ -1,9 +1,9 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="BouncyCastle" version="1.7.0" targetFramework="net45" />
- <package id="CommandLineParser" version="1.9.71" targetFramework="net45" />
- <package id="Google.Apis.Auth" version="1.11.1" targetFramework="net45" />
- <package id="Google.Apis.Core" version="1.11.1" targetFramework="net45" />
+ <package id="CommandLineParser.Unofficial" version="2.0.275" targetFramework="net45" />
+ <package id="Google.Apis.Auth" version="1.15.0" targetFramework="net45" />
+ <package id="Google.Apis.Core" version="1.15.0" targetFramework="net45" />
<package id="Google.Protobuf" version="3.0.0-beta3" targetFramework="net45" />
<package id="System.Interactive.Async" version="3.0.0" targetFramework="net45" />
<package id="Moq" version="4.2.1510.2205" targetFramework="net45" />
diff --git a/src/csharp/Grpc.IntegrationTesting/project.json b/src/csharp/Grpc.IntegrationTesting/project.json
index bb61a679c1..9d706510b2 100644
--- a/src/csharp/Grpc.IntegrationTesting/project.json
+++ b/src/csharp/Grpc.IntegrationTesting/project.json
@@ -58,7 +58,7 @@
"target": "project"
},
"Google.Protobuf": "3.0.0-beta3",
- "CommandLineParser": "1.9.71",
+ "CommandLineParser.Unofficial": "2.0.275",
"NUnit": "3.2.0",
"NUnitLite": "3.2.0-*"
},
diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat
index f05c0241b6..b92189c840 100644
--- a/src/csharp/build_packages.bat
+++ b/src/csharp/build_packages.bat
@@ -31,10 +31,7 @@
@rem Current package versions
set VERSION=1.1.0-dev
-set PROTOBUF_VERSION=3.0.0-beta3
-
-@rem Packages that depend on prerelease packages (like Google.Protobuf) need to have prerelease suffix as well.
-set VERSION_WITH_BETA=%VERSION%-beta
+set PROTOBUF_VERSION=3.0.0
@rem Adjust the location of nuget.exe
set NUGET=C:\nuget\nuget.exe
@@ -58,7 +55,6 @@ xcopy /Y /I ..\..\architecture=x64,language=protoc,platform=macos\artifacts\* pr
@rem Fetch all dependencies
%NUGET% restore ..\..\vsprojects\grpc_csharp_ext.sln || goto :error
-%NUGET% restore Grpc.sln || goto :error
setlocal
@@ -73,7 +69,7 @@ endlocal
%NUGET% pack Grpc.Auth\Grpc.Auth.nuspec -Symbols -Version %VERSION% || goto :error
%NUGET% pack Grpc.Core\Grpc.Core.nuspec -Symbols -Version %VERSION% || goto :error
-%NUGET% pack Grpc.HealthCheck\Grpc.HealthCheck.nuspec -Symbols -Version %VERSION_WITH_BETA% -Properties ProtobufVersion=%PROTOBUF_VERSION% || goto :error
+%NUGET% pack Grpc.HealthCheck\Grpc.HealthCheck.nuspec -Symbols -Version %VERSION% -Properties ProtobufVersion=%PROTOBUF_VERSION% || goto :error
%NUGET% pack Grpc.nuspec -Version %VERSION% || goto :error
%NUGET% pack Grpc.Tools.nuspec -Version %VERSION% || goto :error
diff --git a/src/node/health_check/package.json b/src/node/health_check/package.json
index 40e276055b..e673359809 100644
--- a/src/node/health_check/package.json
+++ b/src/node/health_check/package.json
@@ -15,9 +15,9 @@
}
],
"dependencies": {
- "grpc": "^0.15.0",
+ "grpc": "^1.1.0-dev",
"lodash": "^3.9.3",
- "google-protobuf": "^3.0.0-alpha.5"
+ "google-protobuf": "^3.0.0"
},
"files": [
"LICENSE",
diff --git a/src/node/tools/bin/protoc.js b/src/node/tools/bin/protoc.js
index 53fc5dc428..7f8356867a 100755
--- a/src/node/tools/bin/protoc.js
+++ b/src/node/tools/bin/protoc.js
@@ -47,7 +47,11 @@ var exe_ext = process.platform === 'win32' ? '.exe' : '';
var protoc = path.resolve(__dirname, 'protoc' + exe_ext);
-var child_process = execFile(protoc, process.argv.slice(2), function(error, stdout, stderr) {
+var plugin = path.resolve(__dirname, 'grpc_node_plugin' + exe_ext);
+
+var args = ['--plugin=protoc-gen-grpc=' + plugin].concat(process.argv.slice(2));
+
+var child_process = execFile(protoc, args, function(error, stdout, stderr) {
if (error) {
throw error;
}
diff --git a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m
deleted file mode 100644
index 58abb492ce..0000000000
--- a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-/*
- * This test file is derived from fixture h2_ssl.c in core end2end test
- * (test/core/end2end/fixture/h2_ssl.c). The structure of the fixture is
- * preserved as much as possible
- *
- * This fixture creates a server full stack using chttp2 and a client
- * full stack using Cronet. End-to-end tests are run against this
- * configuration
- *
- */
-
-
-#import <XCTest/XCTest.h>
-#include "test/core/end2end/end2end_tests.h"
-
-#include <stdio.h>
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/host_port.h>
-#include <grpc/support/log.h>
-
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/security/credentials/credentials.h"
-#include "src/core/lib/support/env.h"
-#include "src/core/lib/support/string.h"
-#include "src/core/lib/support/tmpfile.h"
-#include "test/core/end2end/data/ssl_test_data.h"
-#include "test/core/util/port.h"
-#include "test/core/util/test_config.h"
-
-#include <grpc/grpc_cronet.h>
-#import <Cronet/Cronet.h>
-
-typedef struct fullstack_secure_fixture_data {
- char *localaddr;
-} fullstack_secure_fixture_data;
-
-static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack(
- grpc_channel_args *client_args, grpc_channel_args *server_args) {
- grpc_end2end_test_fixture f;
- int port = grpc_pick_unused_port_or_die();
- fullstack_secure_fixture_data *ffd =
- gpr_malloc(sizeof(fullstack_secure_fixture_data));
- memset(&f, 0, sizeof(f));
-
- gpr_join_host_port(&ffd->localaddr, "localhost", port);
-
- f.fixture_data = ffd;
- f.cq = grpc_completion_queue_create(NULL);
-
- return f;
-}
-
-static void process_auth_failure(void *state, grpc_auth_context *ctx,
- const grpc_metadata *md, size_t md_count,
- grpc_process_auth_metadata_done_cb cb,
- void *user_data) {
- GPR_ASSERT(state == NULL);
- cb(user_data, NULL, 0, NULL, 0, GRPC_STATUS_UNAUTHENTICATED, NULL);
-}
-
-static void cronet_init_client_secure_fullstack(
- grpc_end2end_test_fixture *f, grpc_channel_args *client_args,
- cronet_engine *cronetEngine) {
- fullstack_secure_fixture_data *ffd = f->fixture_data;
- f->client =
- grpc_cronet_secure_channel_create(cronetEngine, ffd->localaddr, client_args, NULL);
- GPR_ASSERT(f->client != NULL);
-}
-
-static void chttp2_init_server_secure_fullstack(
- grpc_end2end_test_fixture *f, grpc_channel_args *server_args,
- grpc_server_credentials *server_creds) {
- fullstack_secure_fixture_data *ffd = f->fixture_data;
- if (f->server) {
- grpc_server_destroy(f->server);
- }
- f->server = grpc_server_create(server_args, NULL);
- grpc_server_register_completion_queue(f->server, f->cq, NULL);
- GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr,
- server_creds));
- grpc_server_credentials_release(server_creds);
- grpc_server_start(f->server);
-}
-
-static void chttp2_tear_down_secure_fullstack(grpc_end2end_test_fixture *f) {
- fullstack_secure_fixture_data *ffd = f->fixture_data;
- gpr_free(ffd->localaddr);
- gpr_free(ffd);
-}
-
-static void cronet_init_client_simple_ssl_secure_fullstack(
- grpc_end2end_test_fixture *f, grpc_channel_args *client_args) {
- grpc_arg ssl_name_override = {GRPC_ARG_STRING,
- GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
- {"foo.test.google.fr"}};
-
- grpc_channel_args *new_client_args =
- grpc_channel_args_copy_and_add(client_args, &ssl_name_override, 1);
- [Cronet setHttp2Enabled:YES];
- [Cronet start];
- cronet_engine *cronetEngine = [Cronet getGlobalEngine];
-
- cronet_init_client_secure_fullstack(f, new_client_args, cronetEngine);
- grpc_channel_args_destroy(new_client_args);
-}
-
-static int fail_server_auth_check(grpc_channel_args *server_args) {
- size_t i;
- if (server_args == NULL) return 0;
- for (i = 0; i < server_args->num_args; i++) {
- if (strcmp(server_args->args[i].key, FAIL_AUTH_CHECK_SERVER_ARG_NAME) ==
- 0) {
- return 1;
- }
- }
- return 0;
-}
-
-static void chttp2_init_server_simple_ssl_secure_fullstack(
- grpc_end2end_test_fixture *f, grpc_channel_args *server_args) {
- grpc_ssl_pem_key_cert_pair pem_cert_key_pair = {test_server1_key,
- test_server1_cert};
- grpc_server_credentials *ssl_creds =
- grpc_ssl_server_credentials_create(NULL, &pem_cert_key_pair, 1, 0, NULL);
- if (fail_server_auth_check(server_args)) {
- grpc_auth_metadata_processor processor = {process_auth_failure, NULL, NULL};
- grpc_server_credentials_set_auth_metadata_processor(ssl_creds, processor);
- }
- chttp2_init_server_secure_fullstack(f, server_args, ssl_creds);
-}
-
-/* All test configurations */
-
-static grpc_end2end_test_config configs[] = {
- {"chttp2/simple_ssl_fullstack",
- FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION |
- FEATURE_MASK_SUPPORTS_PER_CALL_CREDENTIALS,
- chttp2_create_fixture_secure_fullstack,
- cronet_init_client_simple_ssl_secure_fullstack,
- chttp2_init_server_simple_ssl_secure_fullstack,
- chttp2_tear_down_secure_fullstack},
-};
-
-
-
-static char *roots_filename;
-
-@interface CoreCronetEnd2EndTests : XCTestCase
-
-@end
-
-@implementation CoreCronetEnd2EndTests
-
-
-// The setUp() function is run before the test cases run and only run once
-+ (void)setUp {
- [super setUp];
-
- FILE *roots_file;
- size_t roots_size = strlen(test_root_cert);
-
- char *argv[] = {"CoreCronetEnd2EndTests"};
- grpc_test_init(1, argv);
- grpc_end2end_tests_pre_init();
-
- /* Set the SSL roots env var. */
- roots_file = gpr_tmpfile("chttp2_simple_ssl_fullstack_test", &roots_filename);
- GPR_ASSERT(roots_filename != NULL);
- GPR_ASSERT(roots_file != NULL);
- GPR_ASSERT(fwrite(test_root_cert, 1, roots_size, roots_file) == roots_size);
- fclose(roots_file);
- gpr_setenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR, roots_filename);
-
- grpc_init();
-
-}
-
-// The tearDown() function is run after all test cases finish running
-+ (void)tearDown {
- grpc_shutdown();
-
- /* Cleanup. */
- remove(roots_filename);
- gpr_free(roots_filename);
-
- [super tearDown];
-}
-
-- (void)testIndividualCase:(char*)test_case {
- char *argv[] = {"h2_ssl", test_case};
-
- for (int i = 0; i < sizeof(configs) / sizeof(*configs); i++) {
- grpc_end2end_tests(sizeof(argv) / sizeof(argv[0]), argv, configs[i]);
- }
-}
-
-// TODO(mxyan): Use NSStringFromSelector(_cmd) to acquire test name from the
-// test case method name, so that bodies of test cases can stay identical
-- (void)testBadHostname {
- [self testIndividualCase:"bad_hostname"];
-}
-
-- (void)testBinaryMetadata {
- [self testIndividualCase:"binary_metadata"];
-}
-
-- (void)testCallCreds {
- [self testIndividualCase:"call_creds"];
-}
-
-- (void)testCancelAfterAccept {
- [self testIndividualCase:"cancel_after_accept"];
-}
-
-- (void)testCancelAfterClientDone {
- [self testIndividualCase:"cancel_after_client_done"];
-}
-
-- (void)testCancelAfterInvoke {
- [self testIndividualCase:"cancel_after_invoke"];
-}
-
-- (void)testCancelBeforeInvoke {
- [self testIndividualCase:"cancel_before_invoke"];
-}
-
-- (void)testCancelInAVacuum {
- [self testIndividualCase:"cancel_in_a_vacuum"];
-}
-
-- (void)testCancelWithStatus {
- [self testIndividualCase:"cancel_with_status"];
-}
-
-- (void)testCompressedPayload {
- [self testIndividualCase:"compressed_payload"];
-}
-
-- (void)testConnectivity {
- [self testIndividualCase:"connectivity"];
-}
-
-- (void)testDefaultHost {
- [self testIndividualCase:"default_host"];
-}
-
-- (void)testDisappearingServer {
- [self testIndividualCase:"disappearing_server"];
-}
-
-- (void)testEmptyBatch {
- [self testIndividualCase:"empty_batch"];
-}
-
-- (void)testFilterCausesClose {
- [self testIndividualCase:"filter_causes_close"];
-}
-
-- (void)testGracefulServerShutdown {
- [self testIndividualCase:"graceful_server_shutdown"];
-}
-
-- (void)testHighInitialSeqno {
- [self testIndividualCase:"high_initial_seqno"];
-}
-
-- (void)testHpackSize {
- [self testIndividualCase:"hpack_size"];
-}
-
-- (void)testIdempotentRequest {
- [self testIndividualCase:"idempotent_request"];
-}
-
-- (void)testInvokeLargeRequest {
- [self testIndividualCase:"invoke_large_request"];
-}
-
-- (void)testLargeMetadata {
- [self testIndividualCase:"large_metadata"];
-}
-
-- (void)testMaxConcurrentStreams {
- [self testIndividualCase:"max_concurrent_streams"];
-}
-
-- (void)testMaxMessageLength {
- [self testIndividualCase:"max_message_length"];
-}
-
-- (void)testNegativeDeadline {
- [self testIndividualCase:"negative_deadline"];
-}
-
-- (void)testNetworkStatusChange {
- [self testIndividualCase:"network_status_change"];
-}
-
-- (void)testNoOp {
- [self testIndividualCase:"no_op"];
-}
-
-- (void)testPayload {
- [self testIndividualCase:"payload"];
-}
-
-- (void)testPing {
- [self testIndividualCase:"ping"];
-}
-
-- (void)testPingPongStreaming {
- [self testIndividualCase:"ping_pong_streaming"];
-}
-
-- (void)testRegisteredCall {
- [self testIndividualCase:"registered_call"];
-}
-
-- (void)testRequestWithFlags {
- [self testIndividualCase:"request_with_flags"];
-}
-
-- (void)testRequestWithPayload {
- [self testIndividualCase:"request_with_payload"];
-}
-
-- (void)testServerFinishesRequest {
- [self testIndividualCase:"server_finishes_request"];
-}
-
-- (void)testShutdownFinishesCalls {
- [self testIndividualCase:"shutdown_finishes_calls"];
-}
-
-- (void)testShutdownFinishesTags {
- [self testIndividualCase:"shutdown_finishes_tags"];
-}
-
-- (void)testSimpleDelayedRequest {
- [self testIndividualCase:"simple_delayed_request"];
-}
-
-- (void)testSimpleMetadata {
- [self testIndividualCase:"simple_metadata"];
-}
-
-- (void)testSimpleRequest {
- [self testIndividualCase:"simple_request"];
-}
-
-- (void)testStreamingErrorResponse {
- [self testIndividualCase:"streaming_error_response"];
-}
-
-- (void)testTrailingMetadata {
- [self testIndividualCase:"trailing_metadata"];
-}
-
-@end
diff --git a/src/php/ext/grpc/php7_wrapper.h b/src/php/ext/grpc/php7_wrapper.h
index fd8d35636f..1d7824113f 100644
--- a/src/php/ext/grpc/php7_wrapper.h
+++ b/src/php/ext/grpc/php7_wrapper.h
@@ -143,8 +143,7 @@ static inline int php_grpc_zend_hash_find(HashTable *ht, char *key, int len,
#define PHP_GRPC_RETURN_STRING(val, dup) RETURN_STRING(val)
#define PHP_GRPC_MAKE_STD_ZVAL(pzv) \
- zval _stack_zval_##pzv; \
- pzv = &(_stack_zval_##pzv)
+ pzv = (zval *)emalloc(sizeof(zval));
#define PHP_GRPC_DELREF(zv)
#define PHP_GRPC_WRAP_OBJECT_START(name) \
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index ba60986143..cc3bd7a067 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -34,6 +34,7 @@ cdef class Call:
def __cinit__(self):
# Create an *empty* call
+ grpc_init()
self.c_call = NULL
self.references = []
@@ -106,6 +107,7 @@ cdef class Call:
def __dealloc__(self):
if self.c_call != NULL:
grpc_call_destroy(self.c_call)
+ grpc_shutdown()
# The object *should* always be valid from Python. Used for debugging.
@property
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 5416401431..3df937eb14 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -34,6 +34,7 @@ cdef class Channel:
def __cinit__(self, bytes target, ChannelArgs arguments=None,
ChannelCredentials channel_credentials=None):
+ grpc_init()
cdef grpc_channel_args *c_arguments = NULL
cdef char *c_target = NULL
self.c_channel = NULL
@@ -103,3 +104,4 @@ cdef class Channel:
def __dealloc__(self):
if self.c_channel != NULL:
grpc_channel_destroy(self.c_channel)
+ grpc_shutdown()
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 5955021ceb..a258ba4063 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -38,6 +38,7 @@ cdef int _INTERRUPT_CHECK_PERIOD_MS = 200
cdef class CompletionQueue:
def __cinit__(self):
+ grpc_init()
with nogil:
self.c_completion_queue = grpc_completion_queue_create(NULL)
self.is_shutting_down = False
@@ -129,3 +130,4 @@ cdef class CompletionQueue:
self.c_completion_queue, c_deadline, NULL)
self._interpret_event(event)
grpc_completion_queue_destroy(self.c_completion_queue)
+ grpc_shutdown()
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index 035ac49a8b..04872b9c09 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -33,6 +33,7 @@ cimport cpython
cdef class ChannelCredentials:
def __cinit__(self):
+ grpc_init()
self.c_credentials = NULL
self.c_ssl_pem_key_cert_pair.private_key = NULL
self.c_ssl_pem_key_cert_pair.certificate_chain = NULL
@@ -47,11 +48,13 @@ cdef class ChannelCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
grpc_channel_credentials_release(self.c_credentials)
+ grpc_shutdown()
cdef class CallCredentials:
def __cinit__(self):
+ grpc_init()
self.c_credentials = NULL
self.references = []
@@ -64,17 +67,20 @@ cdef class CallCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
grpc_call_credentials_release(self.c_credentials)
+ grpc_shutdown()
cdef class ServerCredentials:
def __cinit__(self):
+ grpc_init()
self.c_credentials = NULL
self.references = []
def __dealloc__(self):
if self.c_credentials != NULL:
grpc_server_credentials_release(self.c_credentials)
+ grpc_shutdown()
cdef class CredentialsMetadataPlugin:
@@ -90,6 +96,7 @@ cdef class CredentialsMetadataPlugin:
successful).
name (bytes): Plugin name.
"""
+ grpc_init()
if not callable(plugin_callback):
raise ValueError('expected callable plugin_callback')
self.plugin_callback = plugin_callback
@@ -105,10 +112,14 @@ cdef class CredentialsMetadataPlugin:
cpython.Py_INCREF(self)
return result
+ def __dealloc__(self):
+ grpc_shutdown()
+
cdef class AuthMetadataContext:
def __cinit__(self):
+ grpc_init()
self.context.service_url = NULL
self.context.method_name = NULL
@@ -120,6 +131,9 @@ cdef class AuthMetadataContext:
def method_name(self):
return self.context.method_name
+ def __dealloc__(self):
+ grpc_shutdown()
+
cdef void plugin_get_metadata(
void *state, grpc_auth_metadata_context context,
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 54b3d00dfc..834a44123d 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -176,12 +176,14 @@ cdef class Timespec:
cdef class CallDetails:
def __cinit__(self):
+ grpc_init()
with nogil:
grpc_call_details_init(&self.c_details)
def __dealloc__(self):
with nogil:
grpc_call_details_destroy(&self.c_details)
+ grpc_shutdown()
@property
def method(self):
@@ -232,6 +234,7 @@ cdef class Event:
cdef class ByteBuffer:
def __cinit__(self, bytes data):
+ grpc_init()
if data is None:
self.c_byte_buffer = NULL
return
@@ -288,6 +291,7 @@ cdef class ByteBuffer:
def __dealloc__(self):
if self.c_byte_buffer != NULL:
grpc_byte_buffer_destroy(self.c_byte_buffer)
+ grpc_shutdown()
cdef class SslPemKeyCertPair:
@@ -319,6 +323,7 @@ cdef class ChannelArg:
cdef class ChannelArgs:
def __cinit__(self, args):
+ grpc_init()
self.args = list(args)
for arg in self.args:
if not isinstance(arg, ChannelArg):
@@ -333,6 +338,7 @@ cdef class ChannelArgs:
def __dealloc__(self):
with nogil:
gpr_free(self.c_args.arguments)
+ grpc_shutdown()
def __len__(self):
# self.args is never stale; it's only updated from this file
@@ -399,6 +405,7 @@ cdef class _MetadataIterator:
cdef class Metadata:
def __cinit__(self, metadata):
+ grpc_init()
self.metadata = list(metadata)
for metadatum in metadata:
if not isinstance(metadatum, Metadatum):
@@ -420,6 +427,7 @@ cdef class Metadata:
# it'd be nice if that were documented somewhere...)
# TODO(atash): document this in the C core
grpc_metadata_array_destroy(&self.c_metadata_array)
+ grpc_shutdown()
def __len__(self):
return self.c_metadata_array.count
@@ -437,6 +445,7 @@ cdef class Metadata:
cdef class Operation:
def __cinit__(self):
+ grpc_init()
self.references = []
self._received_status_details = NULL
self._received_status_details_capacity = 0
@@ -529,6 +538,7 @@ cdef class Operation:
# This means that we need to clean up after receive_status_on_client.
if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT:
gpr_free(self._received_status_details)
+ grpc_shutdown()
def operation_send_initial_metadata(Metadata metadata, int flags):
cdef Operation op = Operation()
@@ -645,6 +655,7 @@ cdef class _OperationsIterator:
cdef class Operations:
def __cinit__(self, operations):
+ grpc_init()
self.operations = list(operations) # normalize iterable
self.c_ops = NULL
self.c_nops = 0
@@ -667,6 +678,7 @@ cdef class Operations:
def __dealloc__(self):
with nogil:
gpr_free(self.c_ops)
+ grpc_shutdown()
def __iter__(self):
return _OperationsIterator(self)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index 4f2d51b03f..ca2b831114 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -35,6 +35,7 @@ import time
cdef class Server:
def __cinit__(self, ChannelArgs arguments=None):
+ grpc_init()
cdef grpc_channel_args *c_arguments = NULL
self.references = []
self.registered_completion_queues = []
@@ -172,3 +173,4 @@ cdef class Server:
while not self.is_shutdown:
time.sleep(0)
grpc_server_destroy(self.c_server)
+ grpc_shutdown()
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index a9520b9c0f..08089994a9 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -55,12 +55,8 @@ cdef extern from "Python.h":
def _initialize():
- grpc_init()
grpc_set_ssl_roots_override_callback(
<grpc_ssl_roots_override_callback>ssl_roots_override_callback)
- if Py_AtExit(grpc_shutdown) != 0:
- raise ImportError('failed to register gRPC library shutdown callbacks')
-
_initialize()
diff --git a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
index c753d6faf0..936c895bd2 100644
--- a/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_insecure_interop_test.py
@@ -29,9 +29,10 @@
"""Insecure client-server interoperability as a unit test."""
+from concurrent import futures
import unittest
-from grpc.beta import implementations
+import grpc
from src.proto.grpc.testing import test_pb2
from tests.interop import _interop_test_case
@@ -44,14 +45,13 @@ class InsecureInteropTest(
unittest.TestCase):
def setUp(self):
- self.server = test_pb2.beta_create_TestService_server(methods.TestService())
+ self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ test_pb2.add_TestServiceServicer_to_server(
+ methods.TestService(), self.server)
port = self.server.add_insecure_port('[::]:0')
self.server.start()
- self.stub = test_pb2.beta_create_TestService_stub(
- implementations.insecure_channel('localhost', port))
-
- def tearDown(self):
- self.server.stop(0)
+ self.stub = test_pb2.TestServiceStub(
+ grpc.insecure_channel('localhost:{}'.format(port)))
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py b/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
index cb09f54a34..eaca553e1b 100644
--- a/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_secure_interop_test.py
@@ -29,17 +29,16 @@
"""Secure client-server interoperability as a unit test."""
+from concurrent import futures
import unittest
-from grpc.beta import implementations
+import grpc
from src.proto.grpc.testing import test_pb2
from tests.interop import _interop_test_case
from tests.interop import methods
from tests.interop import resources
-from tests.unit.beta import test_utilities
-
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
@@ -48,19 +47,18 @@ class SecureInteropTest(
unittest.TestCase):
def setUp(self):
- self.server = test_pb2.beta_create_TestService_server(methods.TestService())
+ self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ test_pb2.add_TestServiceServicer_to_server(
+ methods.TestService(), self.server)
port = self.server.add_secure_port(
- '[::]:0', implementations.ssl_server_credentials(
+ '[::]:0', grpc.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain())]))
self.server.start()
- self.stub = test_pb2.beta_create_TestService_stub(
- test_utilities.not_really_secure_channel(
- 'localhost', port, implementations.ssl_channel_credentials(
- resources.test_root_certificates()),
- _SERVER_HOST_OVERRIDE))
-
- def tearDown(self):
- self.server.stop(0)
+ self.stub = test_pb2.TestServiceStub(
+ grpc.secure_channel(
+ 'localhost:{}'.format(port),
+ grpc.ssl_channel_credentials(resources.test_root_certificates()),
+ (('grpc.ssl_target_name_override', _SERVER_HOST_OVERRIDE,),)))
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py
index 8aa1ce30c1..9d61d18975 100644
--- a/src/python/grpcio_tests/tests/interop/client.py
+++ b/src/python/grpcio_tests/tests/interop/client.py
@@ -32,14 +32,12 @@
import argparse
from oauth2client import client as oauth2client_client
+import grpc
from grpc.beta import implementations
from src.proto.grpc.testing import test_pb2
from tests.interop import methods
from tests.interop import resources
-from tests.unit.beta import test_utilities
-
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
def _args():
@@ -66,41 +64,49 @@ def _args():
return parser.parse_args()
+def _application_default_credentials():
+ return oauth2client_client.GoogleCredentials.get_application_default()
+
+
def _stub(args):
+ target = '{}:{}'.format(args.server_host, args.server_port)
if args.test_case == 'oauth2_auth_token':
- creds = oauth2client_client.GoogleCredentials.get_application_default()
- scoped_creds = creds.create_scoped([args.oauth_scope])
- access_token = scoped_creds.get_access_token().access_token
- call_creds = implementations.access_token_call_credentials(access_token)
+ google_credentials = _application_default_credentials()
+ scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
+ access_token = scoped_credentials.get_access_token().access_token
+ call_credentials = grpc.access_token_call_credentials(access_token)
elif args.test_case == 'compute_engine_creds':
- creds = oauth2client_client.GoogleCredentials.get_application_default()
- scoped_creds = creds.create_scoped([args.oauth_scope])
- call_creds = implementations.google_call_credentials(scoped_creds)
+ google_credentials = _application_default_credentials()
+ scoped_credentials = google_credentials.create_scoped([args.oauth_scope])
+ # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+ # remaining use of the Beta API.
+ call_credentials = implementations.google_call_credentials(
+ scoped_credentials)
elif args.test_case == 'jwt_token_creds':
- creds = oauth2client_client.GoogleCredentials.get_application_default()
- call_creds = implementations.google_call_credentials(creds)
+ google_credentials = _application_default_credentials()
+ # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+ # remaining use of the Beta API.
+ call_credentials = implementations.google_call_credentials(
+ google_credentials)
else:
- call_creds = None
+ call_credentials = None
if args.use_tls:
if args.use_test_ca:
root_certificates = resources.test_root_certificates()
else:
root_certificates = None # will load default roots.
- channel_creds = implementations.ssl_channel_credentials(root_certificates)
- if call_creds is not None:
- channel_creds = implementations.composite_channel_credentials(
- channel_creds, call_creds)
+ channel_credentials = grpc.ssl_channel_credentials(root_certificates)
+ if call_credentials is not None:
+ channel_credentials = grpc.composite_channel_credentials(
+ channel_credentials, call_credentials)
- channel = test_utilities.not_really_secure_channel(
- args.server_host, args.server_port, channel_creds,
- args.server_host_override)
- stub = test_pb2.beta_create_TestService_stub(channel)
+ channel = grpc.secure_channel(
+ target, channel_credentials,
+ (('grpc.ssl_target_name_override', args.server_host_override,),))
else:
- channel = implementations.insecure_channel(
- args.server_host, args.server_port)
- stub = test_pb2.beta_create_TestService_stub(channel)
- return stub
+ channel = grpc.insecure_channel(target)
+ return test_pb2.TestServiceStub(channel)
def _test_case_from_arg(test_case_arg):
diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py
index 97e6c9e27e..7edd75c56c 100644
--- a/src/python/grpcio_tests/tests/interop/methods.py
+++ b/src/python/grpcio_tests/tests/interop/methods.py
@@ -29,8 +29,6 @@
"""Implementations of interoperability test methods."""
-from __future__ import print_function
-
import enum
import json
import os
@@ -41,26 +39,21 @@ from oauth2client import client as oauth2client_client
import grpc
from grpc.beta import implementations
-from grpc.beta import interfaces
-from grpc.framework.common import cardinality
-from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import test_pb2
-_TIMEOUT = 7
-
-class TestService(test_pb2.BetaTestServiceServicer):
+class TestService(test_pb2.TestServiceServicer):
def EmptyCall(self, request, context):
return empty_pb2.Empty()
def UnaryCall(self, request, context):
if request.HasField('response_status'):
- context.code(request.response_status.code)
- context.details(request.response_status.message)
+ context.set_code(request.response_status.code)
+ context.set_details(request.response_status.message)
return messages_pb2.SimpleResponse(
payload=messages_pb2.Payload(
type=messages_pb2.COMPRESSABLE,
@@ -68,8 +61,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
def StreamingOutputCall(self, request, context):
if request.HasField('response_status'):
- context.code(request.response_status.code)
- context.details(request.response_status.message)
+ context.set_code(request.response_status.code)
+ context.set_details(request.response_status.message)
for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
@@ -79,7 +72,7 @@ class TestService(test_pb2.BetaTestServiceServicer):
def StreamingInputCall(self, request_iterator, context):
aggregate_size = 0
for request in request_iterator:
- if request.payload and request.payload.body:
+ if request.payload is not None and request.payload.body:
aggregate_size += len(request.payload.body)
return messages_pb2.StreamingInputCallResponse(
aggregated_payload_size=aggregate_size)
@@ -87,8 +80,8 @@ class TestService(test_pb2.BetaTestServiceServicer):
def FullDuplexCall(self, request_iterator, context):
for request in request_iterator:
if request.HasField('response_status'):
- context.code(request.response_status.code)
- context.details(request.response_status.message)
+ context.set_code(request.response_status.code)
+ context.set_details(request.response_status.message)
for response_parameters in request.response_parameters:
yield messages_pb2.StreamingOutputCallResponse(
payload=messages_pb2.Payload(
@@ -101,83 +94,80 @@ class TestService(test_pb2.BetaTestServiceServicer):
return self.FullDuplexCall(request_iterator, context)
-def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope,
- protocol_options=None):
- with stub:
- request = messages_pb2.SimpleRequest(
- response_type=messages_pb2.COMPRESSABLE, response_size=314159,
- payload=messages_pb2.Payload(body=b'\x00' * 271828),
- fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
- response_future = stub.UnaryCall.future(request, _TIMEOUT,
- protocol_options=protocol_options)
- response = response_future.result()
- if response.payload.type is not messages_pb2.COMPRESSABLE:
- raise ValueError(
- 'response payload type is "%s"!' % type(response.payload.type))
- if len(response.payload.body) != 314159:
- raise ValueError(
- 'response body of incorrect size %d!' % len(response.payload.body))
+def _large_unary_common_behavior(
+ stub, fill_username, fill_oauth_scope, call_credentials):
+ request = messages_pb2.SimpleRequest(
+ response_type=messages_pb2.COMPRESSABLE, response_size=314159,
+ payload=messages_pb2.Payload(body=b'\x00' * 271828),
+ fill_username=fill_username, fill_oauth_scope=fill_oauth_scope)
+ response_future = stub.UnaryCall.future(
+ request, credentials=call_credentials)
+ response = response_future.result()
+ if response.payload.type is not messages_pb2.COMPRESSABLE:
+ raise ValueError(
+ 'response payload type is "%s"!' % type(response.payload.type))
+ elif len(response.payload.body) != 314159:
+ raise ValueError(
+ 'response body of incorrect size %d!' % len(response.payload.body))
+ else:
return response
def _empty_unary(stub):
- with stub:
- response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT)
- if not isinstance(response, empty_pb2.Empty):
- raise TypeError(
- 'response is of type "%s", not empty_pb2.Empty!', type(response))
+ response = stub.EmptyCall(empty_pb2.Empty())
+ if not isinstance(response, empty_pb2.Empty):
+ raise TypeError(
+ 'response is of type "%s", not empty_pb2.Empty!', type(response))
def _large_unary(stub):
- _large_unary_common_behavior(stub, False, False)
+ _large_unary_common_behavior(stub, False, False, None)
def _client_streaming(stub):
- with stub:
- payload_body_sizes = (27182, 8, 1828, 45904)
- payloads = (
- messages_pb2.Payload(body=b'\x00' * size)
- for size in payload_body_sizes)
- requests = (
- messages_pb2.StreamingInputCallRequest(payload=payload)
- for payload in payloads)
- response = stub.StreamingInputCall(requests, _TIMEOUT)
- if response.aggregated_payload_size != 74922:
- raise ValueError(
- 'incorrect size %d!' % response.aggregated_payload_size)
+ payload_body_sizes = (27182, 8, 1828, 45904,)
+ payloads = (
+ messages_pb2.Payload(body=b'\x00' * size)
+ for size in payload_body_sizes)
+ requests = (
+ messages_pb2.StreamingInputCallRequest(payload=payload)
+ for payload in payloads)
+ response = stub.StreamingInputCall(requests)
+ if response.aggregated_payload_size != 74922:
+ raise ValueError(
+ 'incorrect size %d!' % response.aggregated_payload_size)
def _server_streaming(stub):
- sizes = (31415, 9, 2653, 58979)
-
- with stub:
- request = messages_pb2.StreamingOutputCallRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(
- messages_pb2.ResponseParameters(size=sizes[0]),
- messages_pb2.ResponseParameters(size=sizes[1]),
- messages_pb2.ResponseParameters(size=sizes[2]),
- messages_pb2.ResponseParameters(size=sizes[3]),
- ))
- response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
- for index, response in enumerate(response_iterator):
- if response.payload.type != messages_pb2.COMPRESSABLE:
- raise ValueError(
- 'response body of invalid type %s!' % response.payload.type)
- if len(response.payload.body) != sizes[index]:
- raise ValueError(
- 'response body of invalid size %d!' % len(response.payload.body))
+ sizes = (31415, 9, 2653, 58979,)
+
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=sizes[0]),
+ messages_pb2.ResponseParameters(size=sizes[1]),
+ messages_pb2.ResponseParameters(size=sizes[2]),
+ messages_pb2.ResponseParameters(size=sizes[3]),
+ )
+ )
+ response_iterator = stub.StreamingOutputCall(request)
+ for index, response in enumerate(response_iterator):
+ if response.payload.type != messages_pb2.COMPRESSABLE:
+ raise ValueError(
+ 'response body of invalid type %s!' % response.payload.type)
+ elif len(response.payload.body) != sizes[index]:
+ raise ValueError(
+ 'response body of invalid size %d!' % len(response.payload.body))
def _cancel_after_begin(stub):
- with stub:
- sizes = (27182, 8, 1828, 45904)
- payloads = [messages_pb2.Payload(body=b'\x00' * size) for size in sizes]
- requests = [messages_pb2.StreamingInputCallRequest(payload=payload)
- for payload in payloads]
- responses = stub.StreamingInputCall.future(requests, _TIMEOUT)
- responses.cancel()
- if not responses.cancelled():
- raise ValueError('expected call to be cancelled')
+ sizes = (27182, 8, 1828, 45904,)
+ payloads = (messages_pb2.Payload(body=b'\x00' * size) for size in sizes)
+ requests = (messages_pb2.StreamingInputCallRequest(payload=payload)
+ for payload in payloads)
+ response_future = stub.StreamingInputCall.future(requests)
+ response_future.cancel()
+ if not response_future.cancelled():
+ raise ValueError('expected call to be cancelled')
class _Pipe(object):
@@ -220,18 +210,17 @@ class _Pipe(object):
def _ping_pong(stub):
- request_response_sizes = (31415, 9, 2653, 58979)
- request_payload_sizes = (27182, 8, 1828, 45904)
+ request_response_sizes = (31415, 9, 2653, 58979,)
+ request_payload_sizes = (27182, 8, 1828, 45904,)
- with stub, _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
- print('Starting ping-pong with response iterator %s' % response_iterator)
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe)
for response_size, payload_size in zip(
request_response_sizes, request_payload_sizes):
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(messages_pb2.ResponseParameters(
- size=response_size),),
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=response_size),),
payload=messages_pb2.Payload(body=b'\x00' * payload_size))
pipe.add(request)
response = next(response_iterator)
@@ -244,17 +233,17 @@ def _ping_pong(stub):
def _cancel_after_first_response(stub):
- request_response_sizes = (31415, 9, 2653, 58979)
- request_payload_sizes = (27182, 8, 1828, 45904)
- with stub, _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
+ request_response_sizes = (31415, 9, 2653, 58979,)
+ request_payload_sizes = (27182, 8, 1828, 45904,)
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe)
response_size = request_response_sizes[0]
payload_size = request_payload_sizes[0]
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(messages_pb2.ResponseParameters(
- size=response_size),),
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=response_size),),
payload=messages_pb2.Payload(body=b'\x00' * payload_size))
pipe.add(request)
response = next(response_iterator)
@@ -264,16 +253,17 @@ def _cancel_after_first_response(stub):
try:
next(response_iterator)
- except Exception:
- pass
+ except grpc.RpcError as rpc_error:
+ if rpc_error.code() is not grpc.StatusCode.CANCELLED:
+ raise
else:
raise ValueError('expected call to be cancelled')
def _timeout_on_sleeping_server(stub):
request_payload_size = 27182
- with stub, _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe, 0.001)
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe, timeout=0.001)
request = messages_pb2.StreamingOutputCallRequest(
response_type=messages_pb2.COMPRESSABLE,
@@ -282,15 +272,16 @@ def _timeout_on_sleeping_server(stub):
time.sleep(0.1)
try:
next(response_iterator)
- except face.ExpirationError:
- pass
+ except grpc.RpcError as rpc_error:
+ if rpc_error.code() is not grpc.StatusCode.DEADLINE_EXCEEDED:
+ raise
else:
raise ValueError('expected call to exceed deadline')
def _empty_stream(stub):
- with stub, _Pipe() as pipe:
- response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT)
+ with _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe)
pipe.close()
try:
next(response_iterator)
@@ -300,65 +291,64 @@ def _empty_stream(stub):
def _status_code_and_message(stub):
- with stub:
- message = 'test status message'
- code = 2
- status = grpc.StatusCode.UNKNOWN # code = 2
- request = messages_pb2.SimpleRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_size=1,
- payload=messages_pb2.Payload(body=b'\x00'),
- response_status=messages_pb2.EchoStatus(code=code, message=message)
- )
- response_future = stub.UnaryCall.future(request, _TIMEOUT)
- if response_future.code() != status:
- raise ValueError(
- 'expected code %s, got %s' % (status, response_future.code()))
- if response_future.details() != message:
- raise ValueError(
- 'expected message %s, got %s' % (message, response_future.details()))
-
- request = messages_pb2.StreamingOutputCallRequest(
- response_type=messages_pb2.COMPRESSABLE,
- response_parameters=(
- messages_pb2.ResponseParameters(size=1),),
- response_status=messages_pb2.EchoStatus(code=code, message=message))
- response_iterator = stub.StreamingOutputCall(request, _TIMEOUT)
- if response_future.code() != status:
- raise ValueError(
- 'expected code %s, got %s' % (status, response_iterator.code()))
- if response_future.details() != message:
- raise ValueError(
- 'expected message %s, got %s' % (message, response_iterator.details()))
+ message = 'test status message'
+ code = 2
+ status = grpc.StatusCode.UNKNOWN # code = 2
+ request = messages_pb2.SimpleRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_size=1,
+ payload=messages_pb2.Payload(body=b'\x00'),
+ response_status=messages_pb2.EchoStatus(code=code, message=message)
+ )
+ response_future = stub.UnaryCall.future(request)
+ if response_future.code() != status:
+ raise ValueError(
+ 'expected code %s, got %s' % (status, response_future.code()))
+ elif response_future.details() != message:
+ raise ValueError(
+ 'expected message %s, got %s' % (message, response_future.details()))
+
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=1),),
+ response_status=messages_pb2.EchoStatus(code=code, message=message))
+ response_iterator = stub.StreamingOutputCall(request)
+ if response_future.code() != status:
+ raise ValueError(
+ 'expected code %s, got %s' % (status, response_iterator.code()))
+ elif response_future.details() != message:
+ raise ValueError(
+ 'expected message %s, got %s' % (message, response_iterator.details()))
def _compute_engine_creds(stub, args):
- response = _large_unary_common_behavior(stub, True, True)
+ response = _large_unary_common_behavior(stub, True, True, None)
if args.default_service_account != response.username:
raise ValueError(
- 'expected username %s, got %s' % (args.default_service_account,
- response.username))
+ 'expected username %s, got %s' % (
+ args.default_service_account, response.username))
def _oauth2_auth_token(stub, args):
json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
- response = _large_unary_common_behavior(stub, True, True)
+ response = _large_unary_common_behavior(stub, True, True, None)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))
if args.oauth_scope.find(response.oauth_scope) == -1:
raise ValueError(
- 'expected to find oauth scope "%s" in received "%s"' %
- (response.oauth_scope, args.oauth_scope))
+ 'expected to find oauth scope "{}" in received "{}"'.format(
+ response.oauth_scope, args.oauth_scope))
def _jwt_token_creds(stub, args):
json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
- response = _large_unary_common_behavior(stub, True, False)
+ response = _large_unary_common_behavior(stub, True, False, None)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))
@@ -370,11 +360,11 @@ def _per_rpc_creds(stub, args):
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
credentials = oauth2client_client.GoogleCredentials.get_application_default()
scoped_credentials = credentials.create_scoped([args.oauth_scope])
- call_creds = implementations.google_call_credentials(scoped_credentials)
- options = interfaces.grpc_call_options(disable_compression=False,
- credentials=call_creds)
- response = _large_unary_common_behavior(stub, True, False,
- protocol_options=options)
+ # TODO(https://github.com/grpc/grpc/issues/6799): Eliminate this last
+ # remaining use of the Beta API.
+ call_credentials = implementations.google_call_credentials(
+ scoped_credentials)
+ response = _large_unary_common_behavior(stub, True, False, call_credentials)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))
diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py
index ab2c3c708f..1ae83bc57d 100644
--- a/src/python/grpcio_tests/tests/interop/server.py
+++ b/src/python/grpcio_tests/tests/interop/server.py
@@ -30,10 +30,11 @@
"""The Python implementation of the GRPC interoperability test server."""
import argparse
+from concurrent import futures
import logging
import time
-from grpc.beta import implementations
+import grpc
from src.proto.grpc.testing import test_pb2
from tests.interop import methods
@@ -51,12 +52,13 @@ def serve():
default=False, type=resources.parse_bool)
args = parser.parse_args()
- server = test_pb2.beta_create_TestService_server(methods.TestService())
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ test_pb2.add_TestServiceServicer_to_server(methods.TestService(), server)
if args.use_tls:
private_key = resources.private_key()
certificate_chain = resources.certificate_chain()
- credentials = implementations.ssl_server_credentials(
- [(private_key, certificate_chain)])
+ credentials = grpc.ssl_server_credentials(
+ ((private_key, certificate_chain),))
server.add_secure_port('[::]:{}'.format(args.port), credentials)
else:
server.add_insecure_port('[::]:{}'.format(args.port))
@@ -68,7 +70,7 @@ def serve():
time.sleep(_ONE_DAY_IN_SECONDS)
except BaseException as e:
logging.info('Caught exception "%s"; stopping server...', e)
- server.stop(0)
+ server.stop(None)
logging.info('Server stopped; exiting.')
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py
index 0de2532cd8..975f33b4c1 100644
--- a/src/python/grpcio_tests/tests/stress/client.py
+++ b/src/python/grpcio_tests/tests/stress/client.py
@@ -30,9 +30,10 @@
"""Entry point for running stress tests."""
import argparse
+from concurrent import futures
import threading
-from grpc.beta import implementations
+import grpc
from six.moves import queue
from src.proto.grpc.testing import metrics_pb2
from src.proto.grpc.testing import test_pb2
@@ -92,24 +93,24 @@ def _parse_weighted_test_cases(test_case_args):
def run_test(args):
test_cases = _parse_weighted_test_cases(args.test_cases)
- test_servers = args.server_addresses.split(',')
+ test_server_targets = args.server_addresses.split(',')
# Propagate any client exceptions with a queue
exception_queue = queue.Queue()
stop_event = threading.Event()
hist = histogram.Histogram(1, 1)
runners = []
- server = metrics_pb2.beta_create_MetricsService_server(
- metrics_server.MetricsServer(hist))
+ server = grpc.server(futures.ThreadPoolExecutor(max_workers=25))
+ metrics_pb2.add_MetricsServiceServicer_to_server(
+ metrics_server.MetricsServer(hist), server)
server.add_insecure_port('[::]:{}'.format(args.metrics_port))
server.start()
- for test_server in test_servers:
- host, port = test_server.split(':', 1)
+ for test_server_target in test_server_targets:
for _ in xrange(args.num_channels_per_server):
- channel = implementations.insecure_channel(host, int(port))
+ channel = grpc.insecure_channel(test_server_target)
for _ in xrange(args.num_stubs_per_channel):
- stub = test_pb2.beta_create_TestService_stub(channel)
+ stub = test_pb2.TestServiceStub(channel)
runner = test_runner.TestRunner(stub, test_cases, hist,
exception_queue, stop_event)
runners.append(runner)
@@ -128,8 +129,8 @@ def run_test(args):
stop_event.set()
for runner in runners:
runner.join()
- runner = None
- server.stop(0)
+ runner = None
+ server.stop(None)
if __name__ == '__main__':
run_test(_args())
diff --git a/src/python/grpcio_tests/tests/stress/metrics_server.py b/src/python/grpcio_tests/tests/stress/metrics_server.py
index b994e4643e..33dd1d6f2a 100644
--- a/src/python/grpcio_tests/tests/stress/metrics_server.py
+++ b/src/python/grpcio_tests/tests/stress/metrics_server.py
@@ -36,7 +36,7 @@ from src.proto.grpc.testing import metrics_pb2
GAUGE_NAME = 'python_overall_qps'
-class MetricsServer(metrics_pb2.BetaMetricsServiceServicer):
+class MetricsServer(metrics_pb2.MetricsServiceServicer):
def __init__(self, histogram):
self._start_time = time.time()
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 196f84f65f..d7cd9e6df2 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -62,7 +62,6 @@ module GRPC
@call = call
@marshal = marshal
@op_notifier = nil # signals completion on clients
- @readq = Queue.new
@unmarshal = unmarshal
@metadata_received = metadata_received
@reads_complete = false
@@ -83,8 +82,7 @@ module GRPC
def run_on_client(requests, op_notifier, &blk)
@op_notifier = op_notifier
@enq_th = Thread.new { write_loop(requests) }
- @loop_th = start_read_loop
- each_queued_msg(&blk)
+ read_loop(&blk)
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -101,14 +99,13 @@ module GRPC
def run_on_server(gen_each_reply)
# Pass in the optional call object parameter if possible
if gen_each_reply.arity == 1
- replys = gen_each_reply.call(each_queued_msg)
+ replys = gen_each_reply.call(read_loop(is_client: false))
elsif gen_each_reply.arity == 2
- replys = gen_each_reply.call(each_queued_msg, @req_view)
+ replys = gen_each_reply.call(read_loop(is_client: false), @req_view)
else
fail 'Illegal arity of reply generator'
end
- @loop_th = start_read_loop(is_client: false)
write_loop(replys, is_client: false)
end
@@ -145,24 +142,6 @@ module GRPC
batch_result
end
- # each_queued_msg yields each message on this instances readq
- #
- # - messages are added to the readq by #read_loop
- # - iteration ends when the instance itself is added
- def each_queued_msg
- return enum_for(:each_queued_msg) unless block_given?
- count = 0
- loop do
- GRPC.logger.debug("each_queued_msg: waiting##{count}")
- count += 1
- req = @readq.pop
- GRPC.logger.debug("each_queued_msg: req = #{req}")
- fail req if req.is_a? StandardError
- break if req.equal?(END_OF_READS)
- yield req
- end
- end
-
def write_loop(requests, is_client: true)
GRPC.logger.debug('bidi-write-loop: starting')
count = 0
@@ -201,47 +180,45 @@ module GRPC
raise e
end
- # starts the read loop
- def start_read_loop(is_client: true)
- Thread.new do
- GRPC.logger.debug('bidi-read-loop: starting')
- begin
- count = 0
- # queue the initial read before beginning the loop
- loop do
- GRPC.logger.debug("bidi-read-loop: #{count}")
- count += 1
- batch_result = read_using_run_batch
-
- # handle the next message
- if batch_result.message.nil?
- GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
-
- if is_client
- batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
- @call.status = batch_result.status
- batch_result.check_status
- GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
- end
-
- @readq.push(END_OF_READS)
- GRPC.logger.debug('bidi-read-loop: done reading!')
- break
+ # Provides an enumerator that yields results of remote reads
+ def read_loop(is_client: true)
+ return enum_for(:read_loop,
+ is_client: is_client) unless block_given?
+ GRPC.logger.debug('bidi-read-loop: starting')
+ begin
+ count = 0
+ # queue the initial read before beginning the loop
+ loop do
+ GRPC.logger.debug("bidi-read-loop: #{count}")
+ count += 1
+ batch_result = read_using_run_batch
+
+ # handle the next message
+ if batch_result.message.nil?
+ GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
+
+ if is_client
+ batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
+ @call.status = batch_result.status
+ batch_result.check_status
+ GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
end
- # push the latest read onto the queue and continue reading
- res = @unmarshal.call(batch_result.message)
- @readq.push(res)
+ GRPC.logger.debug('bidi-read-loop: done reading!')
+ break
end
- rescue StandardError => e
- GRPC.logger.warn('bidi: read-loop failed')
- GRPC.logger.warn(e)
- @readq.push(e) # let each_queued_msg terminate with this error
+
+ res = @unmarshal.call(batch_result.message)
+ yield res
end
- GRPC.logger.debug('bidi-read-loop: finished')
- @reads_complete = true
- finished
+ rescue StandardError => e
+ GRPC.logger.warn('bidi: read-loop failed')
+ GRPC.logger.warn(e)
+ raise e
end
+ GRPC.logger.debug('bidi-read-loop: finished')
+ @reads_complete = true
+ finished
end
end
end