aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/client_config/resolver_registry.c8
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.c4
-rw-r--r--src/core/ext/transport/cronet/client/secure/cronet_channel_create.c69
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_api_dummy.c85
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c640
-rw-r--r--src/core/lib/surface/version.c2
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs3
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs12
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/INativeCall.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs18
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs72
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs2
-rw-r--r--src/csharp/build_packages.bat2
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c30
-rw-r--r--src/node/tools/package.json2
-rw-r--r--src/python/grpcio/grpc/__init__.py3
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.c2
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.h4
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py3
-rw-r--r--src/python/grpcio/grpc_version.py2
-rw-r--r--src/python/grpcio/precompiled.py114
-rw-r--r--src/ruby/ext/grpc/extconf.rb8
-rw-r--r--src/ruby/ext/grpc/rb_byte_buffer.c3
-rw-r--r--src/ruby/ext/grpc/rb_call.c3
-rw-r--r--src/ruby/ext/grpc/rb_call_credentials.c6
-rw-r--r--src/ruby/ext/grpc/rb_channel.c3
-rw-r--r--src/ruby/ext/grpc/rb_channel_args.c3
-rw-r--r--src/ruby/ext/grpc/rb_channel_credentials.c5
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c2
-rw-r--r--src/ruby/ext/grpc/rb_event_thread.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h4
-rw-r--r--src/ruby/ext/grpc/rb_server.c3
-rw-r--r--src/ruby/ext/grpc/rb_server_credentials.c3
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb6
-rw-r--r--src/ruby/lib/grpc/generic/service.rb16
-rw-r--r--src/ruby/lib/grpc/version.rb2
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb23
-rw-r--r--src/ruby/spec/generic/service_spec.rb69
-rw-r--r--src/ruby/tools/version.rb2
46 files changed, 976 insertions, 294 deletions
diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c
index 07f29bcb27..e7a4abd568 100644
--- a/src/core/ext/client_config/resolver_registry.c
+++ b/src/core/ext/client_config/resolver_registry.c
@@ -47,7 +47,6 @@ static int g_number_of_resolvers = 0;
static char *g_default_resolver_prefix;
void grpc_resolver_registry_init(const char *default_resolver_prefix) {
- g_number_of_resolvers = 0;
g_default_resolver_prefix = gpr_strdup(default_resolver_prefix);
}
@@ -57,6 +56,13 @@ void grpc_resolver_registry_shutdown(void) {
grpc_resolver_factory_unref(g_all_of_the_resolvers[i]);
}
gpr_free(g_default_resolver_prefix);
+ // FIXME(ctiller): this should live in grpc_resolver_registry_init,
+ // however that would have the client_config plugin call this AFTER we start
+ // registering resolvers from third party plugins, and so they'd never show
+ // up.
+ // We likely need some kind of dependency system for plugins.... what form
+ // that takes is TBD.
+ g_number_of_resolvers = 0;
}
void grpc_register_resolver_type(grpc_resolver_factory *factory) {
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 3f6051b892..dcdc0c6285 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -306,8 +306,10 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p,
- p->num_subchannels);
+ if (grpc_lb_round_robin_trace) {
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p,
+ p->num_subchannels);
+ }
for (i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 8c8593748d..53633227ac 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -783,7 +783,8 @@ void grpc_chttp2_add_incoming_goaway(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
uint32_t goaway_error, gpr_slice goaway_text) {
char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
+ GRPC_CHTTP2_IF_TRACING(
+ gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
gpr_free(msg);
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
index 555027c866..ebeee37f0d 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
@@ -63,6 +63,8 @@
/* don't consider adding anything bigger than this to the hpack table */
#define MAX_DECODER_SPACE_USAGE 512
+extern int grpc_http_trace;
+
typedef struct {
int is_first_frame;
/* number of bytes in 'output' when we started the frame - used to calculate
@@ -532,7 +534,9 @@ void grpc_chttp2_hpack_compressor_set_max_table_size(
}
}
c->advertise_table_size_change = 1;
- gpr_log(GPR_DEBUG, "set max table size from encoder to %d", max_table_size);
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "set max table size from encoder to %d", max_table_size);
+ }
}
void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c,
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c
index 4d64506de2..295f31c44f 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.c
@@ -253,7 +253,9 @@ void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
if (tbl->max_bytes == max_bytes) {
return;
}
- gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes);
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes);
+ }
while (tbl->mem_used > max_bytes) {
evict1(tbl);
}
diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
new file mode 100644
index 0000000000..df1acddcc0
--- /dev/null
+++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/transport_impl.h"
+
+// Cronet transport object
+typedef struct cronet_transport {
+ grpc_transport base; // must be first element in this structure
+ void *engine;
+ char *host;
+} cronet_transport;
+
+extern grpc_transport_vtable grpc_cronet_vtable;
+
+GRPCAPI grpc_channel *grpc_cronet_secure_channel_create(
+ void *engine, const char *target, const grpc_channel_args *args,
+ void *reserved) {
+ cronet_transport *ct = gpr_malloc(sizeof(cronet_transport));
+ ct->base.vtable = &grpc_cronet_vtable;
+ ct->engine = engine;
+ ct->host = gpr_malloc(strlen(target) + 1);
+ strcpy(ct->host, target);
+ gpr_log(GPR_DEBUG,
+ "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine,
+ ct->host);
+
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ return grpc_channel_create(&exec_ctx, target, args,
+ GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct);
+}
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
new file mode 100644
index 0000000000..687026c9fd
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* This file has empty implementation of all the functions exposed by the cronet
+library, so we can build it in all environments */
+
+#include <stdbool.h>
+
+#include <grpc/support/log.h>
+
+#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+
+#ifdef GRPC_COMPILE_WITH_CRONET
+/* link with the real CRONET library in the build system */
+#else
+/* Dummy implementation of cronet API just to test for build-ability */
+cronet_bidirectional_stream* cronet_bidirectional_stream_create(
+ cronet_engine* engine, void* annotation,
+ cronet_bidirectional_stream_callback* callback) {
+ GPR_ASSERT(0);
+ return NULL;
+}
+
+int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_start(
+ cronet_bidirectional_stream* stream, const char* url, int priority,
+ const char* method, const cronet_bidirectional_stream_header_array* headers,
+ bool end_of_stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream,
+ char* buffer, int capacity) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
+ const char* buffer, int count,
+ bool end_of_stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+#endif /* GRPC_COMPILE_WITH_CRONET */
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
new file mode 100644
index 0000000000..5bb085195c
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -0,0 +1,640 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/impl/codegen/port_platform.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/transport_impl.h"
+#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+
+#define GRPC_HEADER_SIZE_IN_BYTES 5
+
+// Global flag that gets set with GRPC_TRACE env variable
+int grpc_cronet_trace = 1;
+
+// Cronet transport object
+struct grpc_cronet_transport {
+ grpc_transport base; /* must be first element in this structure */
+ cronet_engine *engine;
+ char *host;
+};
+
+typedef struct grpc_cronet_transport grpc_cronet_transport;
+
+enum send_state {
+ CRONET_SEND_IDLE = 0,
+ CRONET_REQ_STARTED,
+ CRONET_SEND_HEADER,
+ CRONET_WRITE,
+ CRONET_WRITE_COMPLETED,
+};
+
+enum recv_state {
+ CRONET_RECV_IDLE = 0,
+ CRONET_RECV_READ_LENGTH,
+ CRONET_RECV_READ_DATA,
+ CRONET_RECV_CLOSED,
+};
+
+static const char *recv_state_name[] = {
+ "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,",
+ "CRONET_RECV_CLOSED"};
+
+// Enum that identifies calling function.
+enum e_caller {
+ PERFORM_STREAM_OP,
+ ON_READ_COMPLETE,
+ ON_RESPONSE_HEADERS_RECEIVED,
+ ON_RESPONSE_TRAILERS_RECEIVED
+};
+
+enum callback_id {
+ CB_SEND_INITIAL_METADATA = 0,
+ CB_SEND_MESSAGE,
+ CB_SEND_TRAILING_METADATA,
+ CB_RECV_MESSAGE,
+ CB_RECV_INITIAL_METADATA,
+ CB_RECV_TRAILING_METADATA,
+ CB_NUM_CALLBACKS
+};
+
+struct stream_obj {
+ // we store received bytes here as they trickle in.
+ gpr_slice_buffer write_slice_buffer;
+ cronet_bidirectional_stream *cbs;
+ gpr_slice slice;
+ gpr_slice_buffer read_slice_buffer;
+ struct grpc_slice_buffer_stream sbs;
+ char *read_buffer;
+ int remaining_read_bytes;
+ int total_read_bytes;
+
+ char *write_buffer;
+ size_t write_buffer_size;
+
+ // Hold the URL
+ char *url;
+
+ bool response_headers_received;
+ bool read_requested;
+ bool response_trailers_received;
+ bool read_closed;
+
+ // Recv message stuff
+ grpc_byte_buffer **recv_message;
+ // Initial metadata stuff
+ grpc_metadata_batch *recv_initial_metadata;
+ // Trailing metadata stuff
+ grpc_metadata_batch *recv_trailing_metadata;
+ grpc_chttp2_incoming_metadata_buffer imb;
+
+ // This mutex protects receive state machine execution
+ gpr_mu recv_mu;
+ // we can queue up up to 2 callbacks for each OP
+ grpc_closure *callback_list[CB_NUM_CALLBACKS][2];
+
+ // storage for header
+ cronet_bidirectional_stream_header *headers;
+ uint32_t num_headers;
+ cronet_bidirectional_stream_header_array header_array;
+ // state tracking
+ enum recv_state cronet_recv_state;
+ enum send_state cronet_send_state;
+};
+
+typedef struct stream_obj stream_obj;
+
+static void next_send_step(stream_obj *s);
+static void next_recv_step(stream_obj *s, enum e_caller caller);
+
+static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset *pollset) {}
+
+static void enqueue_callbacks(grpc_closure *callback_list[]) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ if (callback_list[0]) {
+ grpc_exec_ctx_enqueue(&exec_ctx, callback_list[0], true, NULL);
+ callback_list[0] = NULL;
+ }
+ if (callback_list[1]) {
+ grpc_exec_ctx_enqueue(&exec_ctx, callback_list[1], true, NULL);
+ callback_list[1] = NULL;
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void on_canceled(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_canceled %p", stream);
+ }
+}
+
+static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error);
+ }
+}
+
+static void on_succeeded(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_succeeded %p", stream);
+ }
+}
+
+static void on_response_trailers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *trailers) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_response_trailers_received");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+
+ memset(&s->imb, 0, sizeof(s->imb));
+ grpc_chttp2_incoming_metadata_buffer_init(&s->imb);
+ unsigned int i = 0;
+ for (i = 0; i < trailers->count; i++) {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->imb, grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(trailers->headers[i].key),
+ grpc_mdstr_from_string(trailers->headers[i].value)));
+ }
+ s->response_trailers_received = true;
+ next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED);
+}
+
+static void on_write_completed(cronet_bidirectional_stream *stream,
+ const char *data) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: on_write_completed");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]);
+ s->cronet_send_state = CRONET_WRITE_COMPLETED;
+ next_send_step(s);
+}
+
+static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
+ gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
+ uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
+ memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
+ gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+ grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
+ *s->recv_message = (grpc_byte_buffer *)&s->sbs;
+}
+
+static int parse_grpc_header(const uint8_t *data) {
+ const uint8_t *p = data + 1;
+ int length = 0;
+ length |= ((uint8_t)*p++) << 24;
+ length |= ((uint8_t)*p++) << 16;
+ length |= ((uint8_t)*p++) << 8;
+ length |= ((uint8_t)*p++);
+ return length;
+}
+
+static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
+ int count) {
+ stream_obj *s = (stream_obj *)stream->annotation;
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d",
+ count, s->total_read_bytes, s->remaining_read_bytes);
+ }
+ if (count > 0) {
+ GPR_ASSERT(s->recv_message);
+ s->remaining_read_bytes -= count;
+ next_recv_step(s, ON_READ_COMPLETE);
+ } else {
+ s->read_closed = true;
+ next_recv_step(s, ON_READ_COMPLETE);
+ }
+}
+
+static void on_response_headers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *headers,
+ const char *negotiated_protocol) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_response_headers_received");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]);
+ s->response_headers_received = true;
+ next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED);
+}
+
+static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: on_request_headers_sent");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]);
+ s->cronet_send_state = CRONET_SEND_HEADER;
+ next_send_step(s);
+}
+
+// Callback function pointers (invoked by cronet in response to events)
+static cronet_bidirectional_stream_callback callbacks = {
+ on_request_headers_sent,
+ on_response_headers_received,
+ on_read_completed,
+ on_write_completed,
+ on_response_trailers_received,
+ on_succeeded,
+ on_failed,
+ on_canceled};
+
+static void invoke_closing_callback(stream_obj *s) {
+ grpc_chttp2_incoming_metadata_buffer_publish(&s->imb,
+ s->recv_trailing_metadata);
+ if (s->callback_list[CB_RECV_TRAILING_METADATA]) {
+ enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]);
+ }
+}
+
+static void set_recv_state(stream_obj *s, enum recv_state state) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]);
+ }
+ s->cronet_recv_state = state;
+}
+
+// This is invoked from perform_stream_op, and all on_xxxx callbacks.
+static void next_recv_step(stream_obj *s, enum e_caller caller) {
+ gpr_mu_lock(&s->recv_mu);
+ switch (s->cronet_recv_state) {
+ case CRONET_RECV_IDLE:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE");
+ }
+ if (caller == PERFORM_STREAM_OP ||
+ caller == ON_RESPONSE_HEADERS_RECEIVED) {
+ if (s->read_closed && s->response_trailers_received) {
+ invoke_closing_callback(s);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ } else if (s->response_headers_received == true &&
+ s->read_requested == true) {
+ set_recv_state(s, CRONET_RECV_READ_LENGTH);
+ s->total_read_bytes = s->remaining_read_bytes =
+ GRPC_HEADER_SIZE_IN_BYTES;
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(s->cbs, s->read_buffer,
+ s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_LENGTH:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH");
+ }
+ if (caller == ON_READ_COMPLETE) {
+ if (s->read_closed) {
+ invoke_closing_callback(s);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ } else {
+ GPR_ASSERT(s->remaining_read_bytes == 0);
+ set_recv_state(s, CRONET_RECV_READ_DATA);
+ s->total_read_bytes = s->remaining_read_bytes =
+ parse_grpc_header((const uint8_t *)s->read_buffer);
+ s->read_buffer =
+ gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes);
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
+ s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_DATA:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA");
+ }
+ if (caller == ON_READ_COMPLETE) {
+ if (s->remaining_read_bytes > 0) {
+ int offset = s->total_read_bytes - s->remaining_read_bytes;
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(
+ s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes);
+ } else {
+ gpr_slice_buffer_init(&s->read_slice_buffer);
+ uint8_t *p = (uint8_t *)s->read_buffer;
+ process_recv_message(s, p);
+ set_recv_state(s, CRONET_RECV_IDLE);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ }
+ }
+ break;
+ case CRONET_RECV_CLOSED:
+ break;
+ default:
+ GPR_ASSERT(0); // Should not reach here
+ break;
+ }
+ gpr_mu_unlock(&s->recv_mu);
+}
+
+// This function takes the data from s->write_slice_buffer and assembles into
+// a contiguous byte stream with 5 byte gRPC header prepended.
+static void create_grpc_frame(stream_obj *s) {
+ gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer);
+ uint8_t *raw_data = GPR_SLICE_START_PTR(slice);
+ size_t length = GPR_SLICE_LENGTH(slice);
+ s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
+ s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size);
+ uint8_t *p = (uint8_t *)s->write_buffer;
+ // Append 5 byte header
+ *p++ = 0;
+ *p++ = (uint8_t)(length >> 24);
+ *p++ = (uint8_t)(length >> 16);
+ *p++ = (uint8_t)(length >> 8);
+ *p++ = (uint8_t)(length);
+ // append actual data
+ memcpy(p, raw_data, length);
+}
+
+static void do_write(stream_obj *s) {
+ gpr_slice_buffer *sb = &s->write_slice_buffer;
+ GPR_ASSERT(sb->count <= 1);
+ if (sb->count > 0) {
+ create_grpc_frame(s);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ }
+ cronet_bidirectional_stream_write(s->cbs, s->write_buffer,
+ (int)s->write_buffer_size, false);
+ }
+}
+
+//
+static void next_send_step(stream_obj *s) {
+ switch (s->cronet_send_state) {
+ case CRONET_SEND_IDLE:
+ GPR_ASSERT(
+ s->cbs); // cronet_bidirectional_stream is not initialized yet.
+ s->cronet_send_state = CRONET_REQ_STARTED;
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url);
+ }
+ cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST",
+ &s->header_array, false);
+ // we no longer need the memory that was allocated earlier.
+ gpr_free(s->header_array.headers);
+ break;
+ case CRONET_SEND_HEADER:
+ do_write(s);
+ s->cronet_send_state = CRONET_WRITE;
+ break;
+ case CRONET_WRITE_COMPLETED:
+ do_write(s);
+ break;
+ default:
+ GPR_ASSERT(0);
+ break;
+ }
+}
+
+static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head,
+ const char *host,
+ stream_obj *s) {
+ grpc_linked_mdelem *curr = head;
+ // Walk the linked list and get number of header fields
+ uint32_t num_headers_available = 0;
+ while (curr != NULL) {
+ curr = curr->next;
+ num_headers_available++;
+ }
+ // Allocate enough memory
+ s->headers = (cronet_bidirectional_stream_header *)gpr_malloc(
+ sizeof(cronet_bidirectional_stream_header) * num_headers_available);
+
+ // Walk the linked list again, this time copying the header fields.
+ // s->num_headers
+ // can be less than num_headers_available, as some headers are not used for
+ // cronet
+ curr = head;
+ s->num_headers = 0;
+ while (s->num_headers < num_headers_available) {
+ grpc_mdelem *mdelem = curr->md;
+ curr = curr->next;
+ const char *key = grpc_mdstr_as_c_string(mdelem->key);
+ const char *value = grpc_mdstr_as_c_string(mdelem->value);
+ if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
+ strcmp(key, ":authority") == 0) {
+ // Cronet populates these fields on its own.
+ continue;
+ }
+ if (strcmp(key, ":path") == 0) {
+ // Create URL by appending :path value to the hostname
+ gpr_asprintf(&s->url, "https://%s%s", host, value);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "extracted URL = %s", s->url);
+ }
+ continue;
+ }
+ s->headers[s->num_headers].key = key;
+ s->headers[s->num_headers].value = value;
+ s->num_headers++;
+ if (curr == NULL) {
+ break;
+ }
+ }
+}
+
+static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ GPR_ASSERT(ct->engine);
+ stream_obj *s = (stream_obj *)gs;
+ if (op->recv_trailing_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - recv_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ s->recv_trailing_metadata = op->recv_trailing_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]);
+ s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete;
+ }
+ if (op->recv_message) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p",
+ op->on_complete);
+ }
+ s->recv_message = (grpc_byte_buffer **)op->recv_message;
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]);
+ s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready;
+ s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete;
+ s->read_requested = true;
+ next_recv_step(s, PERFORM_STREAM_OP);
+ }
+ if (op->recv_initial_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p",
+ op->on_complete);
+ }
+ s->recv_initial_metadata = op->recv_initial_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]);
+ s->callback_list[CB_RECV_INITIAL_METADATA][0] =
+ op->recv_initial_metadata_ready;
+ s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete;
+ }
+ if (op->send_initial_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - send_initial_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ s->num_headers = 0;
+ convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head,
+ ct->host, s);
+ s->header_array.count = s->num_headers;
+ s->header_array.capacity = s->num_headers;
+ s->header_array.headers = s->headers;
+ GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]);
+ s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete;
+ }
+ if (op->send_message) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p",
+ op->on_complete);
+ }
+ grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice,
+ op->send_message->length, NULL);
+ // Check that compression flag is not ON. We don't support compression yet.
+ // TODO (makdharma): add compression support
+ GPR_ASSERT(op->send_message->flags == 0);
+ gpr_slice_buffer_add(&s->write_slice_buffer, s->slice);
+ if (s->cbs == NULL) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create");
+ }
+ s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
+ GPR_ASSERT(s->cbs);
+ s->read_closed = false;
+ s->response_trailers_received = false;
+ s->response_headers_received = false;
+ s->cronet_send_state = CRONET_SEND_IDLE;
+ s->cronet_recv_state = CRONET_RECV_IDLE;
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]);
+ s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete;
+ next_send_step(s);
+ }
+ if (op->send_trailing_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - send_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]);
+ s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete;
+ if (s->cbs) {
+ // Send an "empty" write to the far end to signal that we're done.
+ // This will induce the server to send down trailers.
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ }
+ cronet_bidirectional_stream_write(s->cbs, "abc", 0, true);
+ } else {
+ // We never created a stream. This was probably an empty request.
+ invoke_closing_callback(s);
+ }
+ }
+}
+
+static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_stream_refcount *refcount,
+ const void *server_data) {
+ stream_obj *s = (stream_obj *)gs;
+ memset(s->callback_list, 0, sizeof(s->callback_list));
+ s->cbs = NULL;
+ gpr_mu_init(&s->recv_mu);
+ s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ gpr_slice_buffer_init(&s->write_slice_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_transport - init_stream");
+ }
+ return 0;
+}
+
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, void *and_free_memory) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "Destroy stream");
+ }
+ stream_obj *s = (stream_obj *)gs;
+ s->cbs = NULL;
+ gpr_free(s->read_buffer);
+ gpr_free(s->write_buffer);
+ gpr_free(s->url);
+ gpr_mu_destroy(&s->recv_mu);
+ if (and_free_memory) {
+ gpr_free(and_free_memory);
+ }
+}
+
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ gpr_free(ct->host);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "Destroy transport");
+ }
+}
+
+const grpc_transport_vtable grpc_cronet_vtable = {
+ sizeof(stream_obj), "cronet_http", init_stream,
+ set_pollset_do_nothing, perform_stream_op, NULL,
+ destroy_stream, destroy_transport, NULL};
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index fe954cbefb..0f4b1111f4 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -36,4 +36,4 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "0.14.0-dev"; }
+const char *grpc_version_string(void) { return "0.14.0-pre1"; }
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index 058371521d..0e204761f6 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -168,7 +168,7 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
- var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata());
+ var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
fakeCall.SendCompletionHandler(true);
fakeCall.SendStatusFromServerHandler(true);
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
index 1bec258ca2..909112a47c 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
@@ -165,7 +165,8 @@ namespace Grpc.Core.Internal.Tests
SendCompletionHandler = callback;
}
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalPayload, WriteFlags writeFlags)
{
SendStatusFromServerHandler = callback;
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index eafe2ccab8..b1566b44a7 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -139,8 +139,11 @@ namespace Grpc.Core.Internal
/// Sends call result status, indicating we are done with writes.
/// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// </summary>
- public Task SendStatusFromServerAsync(Status status, Metadata trailers)
+ public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple<TResponse, WriteFlags> optionalWrite)
{
+ byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null;
+ var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags);
+
lock (myLock)
{
GrpcPreconditions.CheckState(started);
@@ -149,11 +152,16 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
+ call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent,
+ payload, writeFlags);
}
halfcloseRequested = true;
initialMetadataSent = true;
sendStatusFromServerTcs = new TaskCompletionSource<object>();
+ if (optionalWrite != null)
+ {
+ streamingWritesCounter++;
+ }
return sendStatusFromServerTcs.Task;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 500653ba5d..244b97d4a4 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -135,13 +135,16 @@ namespace Grpc.Core.Internal
}
}
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalPayload, WriteFlags writeFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
+ var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
+ Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
+ optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
index cbef599139..cd3719cb50 100644
--- a/src/csharp/Grpc.Core/Internal/INativeCall.cs
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -78,7 +78,7 @@ namespace Grpc.Core.Internal
void StartSendCloseFromClient(SendCompletionHandler callback);
- void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, Grpc.Core.WriteFlags writeFlags);
void StartServerSide(ReceivedCloseOnServerHandler callback);
}
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index 9ee0ba3bc0..c277c73ef0 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -421,20 +421,21 @@ namespace Grpc.Core.Internal
public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call);
public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description);
public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call,
@@ -593,7 +594,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
@@ -601,7 +602,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
@@ -610,7 +611,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -618,7 +619,8 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 00d82d51e8..85b7a4b01e 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -75,27 +75,32 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ Tuple<TResponse,WriteFlags> responseTuple = null;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
- var result = await handler(request, context).ConfigureAwait(false);
+ var response = await handler(request, context).ConfigureAwait(false);
status = context.Status;
- await responseStream.WriteAsync(result).ConfigureAwait(false);
+ responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -139,17 +144,21 @@ namespace Grpc.Core.Internal
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -183,33 +192,31 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ Tuple<TResponse,WriteFlags> responseTuple = null;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
- var result = await handler(requestStream, context).ConfigureAwait(false);
+ var response = await handler(requestStream, context).ConfigureAwait(false);
status = context.Status;
- try
- {
- await responseStream.WriteAsync(result).ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- status = Status.DefaultCancelled;
- }
+ responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -251,16 +258,20 @@ namespace Grpc.Core.Internal
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -278,7 +289,7 @@ namespace Grpc.Core.Internal
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
- await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
}
}
@@ -297,6 +308,11 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
+ public static WriteFlags GetWriteFlags(WriteOptions writeOptions)
+ {
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
+ }
+
public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
where TRequest : class
where TResponse : class
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index f7a9cb9c1c..70ce9a18df 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -53,6 +53,6 @@ namespace Grpc.Core
/// <summary>
/// Current version of gRPC C#
/// </summary>
- public const string CurrentVersion = "0.14.0-dev";
+ public const string CurrentVersion = "0.14.0-pre1";
}
}
diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat
index 9a60be26b6..9c7b877fea 100644
--- a/src/csharp/build_packages.bat
+++ b/src/csharp/build_packages.bat
@@ -1,7 +1,7 @@
@rem Builds gRPC NuGet packages
@rem Current package versions
-set VERSION=0.14.0-dev
+set VERSION=0.14.0-pre1
set PROTOBUF_VERSION=3.0.0-beta2
@rem Packages that depend on prerelease packages (like Google.Protobuf) need to have prerelease suffix as well.
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index aeef8a79e9..5b8ff9b819 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -715,10 +715,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
const char *status_details, grpc_metadata_array *trailing_metadata,
- int32_t send_empty_initial_metadata) {
+ int32_t send_empty_initial_metadata, const char* optional_send_buffer,
+ size_t optional_send_buffer_len, uint32_t write_flags) {
/* TODO: don't use magic number */
- grpc_op ops[2];
- size_t nops = send_empty_initial_metadata ? 2 : 1;
+ grpc_op ops[3];
+ size_t nops = 1;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@@ -731,12 +732,23 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0;
ops[0].reserved = NULL;
- ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
- ops[1].data.send_initial_metadata.count = 0;
- ops[1].data.send_initial_metadata.metadata = NULL;
- ops[1].flags = 0;
- ops[1].reserved = NULL;
-
+ if (optional_send_buffer) {
+ ops[nops].op = GRPC_OP_SEND_MESSAGE;
+ ctx->send_message = string_to_byte_buffer(optional_send_buffer,
+ optional_send_buffer_len);
+ ops[nops].data.send_message = ctx->send_message;
+ ops[nops].flags = write_flags;
+ ops[nops].reserved = NULL;
+ nops ++;
+ }
+ if (send_empty_initial_metadata) {
+ ops[nops].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[nops].data.send_initial_metadata.count = 0;
+ ops[nops].data.send_initial_metadata.metadata = NULL;
+ ops[nops].flags = 0;
+ ops[nops].reserved = NULL;
+ nops++;
+ }
return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
diff --git a/src/node/tools/package.json b/src/node/tools/package.json
index d98ed0b1fc..9bca5eab6f 100644
--- a/src/node/tools/package.json
+++ b/src/node/tools/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc-tools",
- "version": "0.14.0-dev",
+ "version": "0.14.0-pre1",
"author": "Google Inc.",
"description": "Tools for developing with gRPC on Node.js",
"homepage": "http://www.grpc.io/",
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 7086519106..b844a14c48 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -27,4 +27,5 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+__import__('pkg_resources').declare_namespace(__name__)
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c
index f0a40dbb35..09551472b5 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.c
+++ b/src/python/grpcio/grpc/_cython/imports.generated.c
@@ -125,6 +125,7 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
+grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -395,6 +396,7 @@ void pygrpc_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
+ grpc_cronet_secure_channel_create_import = (grpc_cronet_secure_channel_create_type) GetProcAddress(library, "grpc_cronet_secure_channel_create");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h
index d5e810b7cf..54c8aaad13 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.h
+++ b/src/python/grpcio/grpc/_cython/imports.generated.h
@@ -43,6 +43,7 @@
#include <grpc/census.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include <grpc/grpc_cronet.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/byte_buffer.h>
@@ -325,6 +326,9 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
+typedef grpc_channel *(*grpc_cronet_secure_channel_create_type)(void *engine, const char *target, const grpc_channel_args *args, void *reserved);
+extern grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
+#define grpc_cronet_secure_channel_create grpc_cronet_secure_channel_create_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index dab62530aa..5314329c2c 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -222,6 +222,9 @@ CORE_SOURCE_FILES = [
'src/core/ext/client_config/uri_parser.c',
'src/core/ext/transport/chttp2/server/insecure/server_chttp2.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',
+ 'src/core/ext/transport/cronet/client/secure/cronet_channel_create.c',
+ 'src/core/ext/transport/cronet/transport/cronet_api_dummy.c',
+ 'src/core/ext/transport/cronet/transport/cronet_transport.c',
'src/core/ext/lb_policy/grpclb/load_balancer_api.c',
'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c',
'third_party/nanopb/pb_common.c',
diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py
index 873b4e2a91..87fadf9a97 100644
--- a/src/python/grpcio/grpc_version.py
+++ b/src/python/grpcio/grpc_version.py
@@ -29,4 +29,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!!
-VERSION='0.14.0.dev0'
+VERSION='0.14.0rc1'
diff --git a/src/python/grpcio/precompiled.py b/src/python/grpcio/precompiled.py
deleted file mode 100644
index b6aa7fc90e..0000000000
--- a/src/python/grpcio/precompiled.py
+++ /dev/null
@@ -1,114 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import os
-import platform
-import shutil
-import sys
-import sysconfig
-
-import setuptools
-
-import commands
-import grpc_version
-
-try:
- from urllib2 import urlopen
-except ImportError:
- from urllib.request import urlopen
-
-PYTHON_STEM = os.path.dirname(os.path.abspath(__file__))
-BINARIES_REPOSITORY = os.environ.get(
- 'GRPC_PYTHON_BINARIES_REPOSITORY',
- 'https://storage.googleapis.com/grpc-precompiled-binaries/python')
-USE_PRECOMPILED_BINARIES = bool(int(os.environ.get(
- 'GRPC_PYTHON_USE_PRECOMPILED_BINARIES', '1')))
-
-def _tagged_ext_name(base):
- uname = platform.uname()
- tags = (
- grpc_version.VERSION,
- 'py{}'.format(sysconfig.get_python_version()),
- uname[0],
- uname[4],
- )
- ucs = 'ucs{}'.format(sysconfig.get_config_var('Py_UNICODE_SIZE'))
- return '{base}-{tags}-{ucs}'.format(
- base=base, tags='-'.join(tags), ucs=ucs)
-
-
-class BuildTaggedExt(setuptools.Command):
-
- description = 'build the gRPC tagged extensions'
- user_options = []
-
- def initialize_options(self):
- # distutils requires this override.
- pass
-
- def finalize_options(self):
- # distutils requires this override.
- pass
-
- def run(self):
- if 'linux' in sys.platform:
- self.run_command('build_ext')
- try:
- os.makedirs('dist/')
- except OSError:
- pass
- shutil.copyfile(
- os.path.join(PYTHON_STEM, 'grpc/_cython/cygrpc.so'),
- 'dist/{}.so'.format(_tagged_ext_name('cygrpc')))
- else:
- sys.stderr.write('nothing to do for build_tagged_ext\n')
-
-
-def update_setup_arguments(setup_arguments):
- if not USE_PRECOMPILED_BINARIES:
- sys.stderr.write('not using precompiled extension')
- return
- url = '{}/{}.so'.format(BINARIES_REPOSITORY, _tagged_ext_name('cygrpc'))
- target_path = os.path.join(PYTHON_STEM, 'grpc/_cython/cygrpc.so')
- try:
- extension = urlopen(url).read()
- except:
- sys.stderr.write(
- 'could not download precompiled extension: {}\n'.format(url))
- return
- try:
- with open(target_path, 'w') as target:
- target.write(extension)
- setup_arguments['ext_modules'] = []
- except:
- sys.stderr.write(
- 'could not write precompiled extension to directory: {} -> {}\n'
- .format(url, target_path))
- return
- setup_arguments['package_data']['grpc._cython'].append('cygrpc.so')
diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb
index 07f7bb93b8..6d65db8306 100644
--- a/src/ruby/ext/grpc/extconf.rb
+++ b/src/ruby/ext/grpc/extconf.rb
@@ -78,9 +78,11 @@ output_dir = File.expand_path(RbConfig::CONFIG['topdir'])
grpc_lib_dir = File.join(output_dir, 'libs', grpc_config)
ENV['BUILDDIR'] = output_dir
-puts 'Building internal gRPC into ' + grpc_lib_dir
-system("make -j -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config}")
-exit 1 unless $? == 0
+unless windows
+ puts 'Building internal gRPC into ' + grpc_lib_dir
+ system("make -j -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config}")
+ exit 1 unless $? == 0
+end
$CFLAGS << ' -I' + File.join(grpc_root, 'include')
$LDFLAGS << ' ' + File.join(grpc_lib_dir, 'libgrpc.a') unless windows
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c
index cba910d832..1172691116 100644
--- a/src/ruby/ext/grpc/rb_byte_buffer.c
+++ b/src/ruby/ext/grpc/rb_byte_buffer.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_byte_buffer.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/support/slice.h>
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 48c49a21e9..1b06273af9 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_call.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c
index 38bf1f7710..79ca5b32ce 100644
--- a/src/ruby/ext/grpc/rb_call_credentials.c
+++ b/src/ruby/ext/grpc/rb_call_credentials.c
@@ -32,10 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_call_credentials.h"
-#include <ruby/ruby.h>
#include <ruby/thread.h>
#include <grpc/grpc.h>
@@ -86,11 +86,11 @@ static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args,
rb_funcall(exception_object, rb_intern("backtrace"), 0),
rb_intern("join"),
1, rb_str_new2("\n\tfrom "));
- VALUE exception_info = rb_funcall(exception_object, rb_intern("to_s"), 0);
+ VALUE rb_exception_info = rb_funcall(exception_object, rb_intern("to_s"), 0);
const char *exception_classname = rb_obj_classname(exception_object);
(void)args;
gpr_log(GPR_INFO, "Call credentials callback failed: %s: %s\n%s",
- exception_classname, StringValueCStr(exception_info),
+ exception_classname, StringValueCStr(rb_exception_info),
StringValueCStr(backtrace));
rb_hash_aset(result, rb_str_new2("metadata"), Qnil);
/* Currently only gives the exception class name. It should be possible get
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 984afad107..013321ffc8 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_channel.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
diff --git a/src/ruby/ext/grpc/rb_channel_args.c b/src/ruby/ext/grpc/rb_channel_args.c
index 2ffb8f41da..87c0e0a705 100644
--- a/src/ruby/ext/grpc/rb_channel_args.c
+++ b/src/ruby/ext/grpc/rb_channel_args.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_channel_args.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include "rb_grpc.h"
diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c
index 09bd3093a9..cbb23885aa 100644
--- a/src/ruby/ext/grpc/rb_channel_credentials.c
+++ b/src/ruby/ext/grpc/rb_channel_credentials.c
@@ -31,14 +31,13 @@
*
*/
+#include <ruby/ruby.h>
+
#include <string.h>
-#include <ruby/ruby.h>
#include "rb_grpc_imports.generated.h"
#include "rb_channel_credentials.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 2a2eee190c..4bb615f8be 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -32,10 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_completion_queue.h"
-#include <ruby/ruby.h>
#include <ruby/thread.h>
#include <grpc/grpc.h>
diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c
index 2649a1087f..9e85bbcfbf 100644
--- a/src/ruby/ext/grpc/rb_event_thread.c
+++ b/src/ruby/ext/grpc/rb_event_thread.c
@@ -32,12 +32,12 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_event_thread.h"
#include <stdbool.h>
-#include <ruby/ruby.h>
#include <ruby/thread.h>
#include <grpc/support/alloc.h>
#include <grpc/support/sync.h>
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 9145c52ef8..5277148fc9 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -32,11 +32,11 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_grpc.h"
#include <math.h>
-#include <ruby/ruby.h>
#include <ruby/vm.h>
#include <sys/time.h>
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index bc43f9d36b..cebbe8c40f 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -125,6 +125,7 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
+grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -391,6 +392,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
+ grpc_cronet_secure_channel_create_import = (grpc_cronet_secure_channel_create_type) GetProcAddress(library, "grpc_cronet_secure_channel_create");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index b67361ca25..d7ea6c574c 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -43,6 +43,7 @@
#include <grpc/census.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include <grpc/grpc_cronet.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/byte_buffer.h>
@@ -325,6 +326,9 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
+typedef grpc_channel *(*grpc_cronet_secure_channel_create_type)(void *engine, const char *target, const grpc_channel_args *args, void *reserved);
+extern grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
+#define grpc_cronet_secure_channel_create grpc_cronet_secure_channel_create_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 96e60c6776..2b3acaaf59 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_server.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include "rb_call.h"
diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c
index b2d7280a30..3b0fb6c910 100644
--- a/src/ruby/ext/grpc/rb_server_credentials.c
+++ b/src/ruby/ext/grpc/rb_server_credentials.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_server_credentials.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 81082d13bf..238aaa9656 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -458,10 +458,8 @@ module GRPC
unless cls.include?(GenericService)
fail "#{cls} must 'include GenericService'"
end
- if cls.rpc_descs.size.zero?
- fail "#{cls} should specify some rpc descriptions"
- end
- cls.assert_rpc_descs_have_methods
+ fail "#{cls} should specify some rpc descriptions" if
+ cls.rpc_descs.size.zero?
end
# This should be called while holding @run_mutex
diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb
index 8e940b5b13..0a166e823e 100644
--- a/src/ruby/lib/grpc/generic/service.rb
+++ b/src/ruby/lib/grpc/generic/service.rb
@@ -110,6 +110,9 @@ module GRPC
rpc_descs[name] = RpcDesc.new(name, input, output,
marshal_class_method,
unmarshal_class_method)
+ define_method(name) do
+ fail GRPC::BadStatus, GRPC::Core::StatusCodes::UNIMPLEMENTED
+ end
end
def inherited(subclass)
@@ -199,19 +202,6 @@ module GRPC
end
end
end
-
- # Asserts that the appropriate methods are defined for each added rpc
- # spec. Is intended to aid verifying that server classes are correctly
- # implemented.
- def assert_rpc_descs_have_methods
- rpc_descs.each_pair do |m, spec|
- mth_name = GenericService.underscore(m.to_s).to_sym
- unless instance_methods.include?(mth_name)
- fail "#{self} does not provide instance method '#{mth_name}'"
- end
- spec.assert_arity_matches(instance_method(mth_name))
- end
- end
end
def self.included(o)
diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb
index 67c6a5d5a1..5117c85cea 100644
--- a/src/ruby/lib/grpc/version.rb
+++ b/src/ruby/lib/grpc/version.rb
@@ -29,5 +29,5 @@
# GRPC contains the General RPC module.
module GRPC
- VERSION = '0.14.0.dev'
+ VERSION = '0.14.0.pre1'
end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index e688057cb1..2a42736237 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -308,10 +308,6 @@ describe GRPC::RpcServer do
expect { @srv.handle(EmptyService) }.to raise_error
end
- it 'raises if the service does not define its rpc methods' do
- expect { @srv.handle(NoRpcImplementation) }.to raise_error
- end
-
it 'raises if a handler method is already registered' do
@srv.handle(EchoService)
expect { r.handle(EchoService) }.to raise_error
@@ -349,6 +345,25 @@ describe GRPC::RpcServer do
t.join
end
+ it 'should return UNIMPLEMENTED on unimplemented methods', server: true do
+ @srv.handle(NoRpcImplementation)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ blk = proc do
+ cq = GRPC::Core::CompletionQueue.new
+ stub = GRPC::ClientStub.new(@host, cq, :this_channel_is_insecure,
+ **client_opts)
+ stub.request_response('/an_rpc', req, marshal, unmarshal)
+ end
+ expect(&blk).to raise_error do |error|
+ expect(error).to be_a(GRPC::BadStatus)
+ expect(error.code).to be(GRPC::Core::StatusCodes::UNIMPLEMENTED)
+ end
+ @srv.stop
+ t.join
+ end
+
it 'should handle multiple sequential requests', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
diff --git a/src/ruby/spec/generic/service_spec.rb b/src/ruby/spec/generic/service_spec.rb
index 5e7b6c7aba..76034e4f74 100644
--- a/src/ruby/spec/generic/service_spec.rb
+++ b/src/ruby/spec/generic/service_spec.rb
@@ -273,73 +273,4 @@ describe GenericService do
end
end
end
-
- describe '#assert_rpc_descs_have_methods' do
- it 'fails if there is no instance method for an rpc descriptor' do
- c1 = Class.new do
- include GenericService
- rpc :AnRpc, GoodMsg, GoodMsg
- end
- expect { c1.assert_rpc_descs_have_methods }.to raise_error
-
- c2 = Class.new do
- include GenericService
- rpc :AnRpc, GoodMsg, GoodMsg
- rpc :AnotherRpc, GoodMsg, GoodMsg
-
- def an_rpc
- end
- end
- expect { c2.assert_rpc_descs_have_methods }.to raise_error
- end
-
- it 'passes if there are corresponding methods for each descriptor' do
- c = Class.new do
- include GenericService
- rpc :AnRpc, GoodMsg, GoodMsg
- rpc :AServerStreamer, GoodMsg, stream(GoodMsg)
- rpc :AClientStreamer, stream(GoodMsg), GoodMsg
- rpc :ABidiStreamer, stream(GoodMsg), stream(GoodMsg)
-
- def an_rpc(_req, _call)
- end
-
- def a_server_streamer(_req, _call)
- end
-
- def a_client_streamer(_call)
- end
-
- def a_bidi_streamer(_call)
- end
- end
- expect { c.assert_rpc_descs_have_methods }.to_not raise_error
- end
-
- it 'passes for subclasses of that include GenericService' do
- base = Class.new do
- include GenericService
- rpc :AnRpc, GoodMsg, GoodMsg
-
- def an_rpc(_req, _call)
- end
- end
- c = Class.new(base)
- expect { c.assert_rpc_descs_have_methods }.to_not raise_error
- expect(c.include?(GenericService)).to be(true)
- end
-
- it 'passes if subclasses define the rpc methods' do
- base = Class.new do
- include GenericService
- rpc :AnRpc, GoodMsg, GoodMsg
- end
- c = Class.new(base) do
- def an_rpc(_req, _call)
- end
- end
- expect { c.assert_rpc_descs_have_methods }.to_not raise_error
- expect(c.include?(GenericService)).to be(true)
- end
- end
end
diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb
index 12ad21b80e..ec085d2655 100644
--- a/src/ruby/tools/version.rb
+++ b/src/ruby/tools/version.rb
@@ -29,6 +29,6 @@
module GRPC
module Tools
- VERSION = '0.14.0.dev'
+ VERSION = '0.14.0.pre1'
end
end