aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-06-20 08:09:52 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-06-20 08:09:52 -0700
commita4d2d0c798a182d5cc57ce630d31614173f2d67b (patch)
treea1f0f4e324f8bc65ced4e7b49af3af266f6ad900 /src/core/ext/transport
parentccc8ec54b0cf75f52c0a19a2674d6a6bc8e93743 (diff)
parent4e29480e430b94392105934750adfd85cb7849ce (diff)
Merge github.com:grpc/grpc into reuse_port
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c4
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c95
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c22
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c16
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c82
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c21
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_decoder.c232
-rw-r--r--src/core/ext/transport/chttp2/transport/bin_decoder.h66
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c311
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h3
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c10
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c4
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c42
13 files changed, 748 insertions, 160 deletions
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 28d1be878f..85f9efb3b6 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -103,13 +103,13 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
0);
GPR_ASSERT(c->result->transport);
- c->result->channel_args = c->args.channel_args;
+ c->result->channel_args = grpc_channel_args_copy(c->args.channel_args);
} else {
memset(c->result, 0, sizeof(*c->result));
}
notify = c->notify;
c->notify = NULL;
- notify->cb(exec_ctx, notify->cb_arg, error);
+ grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}
static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
new file mode 100644
index 0000000000..ca435c25ce
--- /dev/null
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc.h>
+#include <grpc/grpc_posix.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
+
+#include <fcntl.h>
+
+#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/tcp_posix.h"
+#include "src/core/lib/surface/api_trace.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/transport.h"
+
+grpc_channel *grpc_insecure_channel_create_from_fd(
+ const char *target, int fd, const grpc_channel_args *args) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_API_TRACE("grpc_insecure_channel_create(target=%p, fd=%d, args=%p)", 3,
+ (target, fd, args));
+
+ grpc_arg default_authority_arg;
+ default_authority_arg.type = GRPC_ARG_STRING;
+ default_authority_arg.key = GRPC_ARG_DEFAULT_AUTHORITY;
+ default_authority_arg.value.string = "test.authority";
+ grpc_channel_args *final_args =
+ grpc_channel_args_copy_and_add(args, &default_authority_arg, 1);
+
+ int flags = fcntl(fd, F_GETFL, 0);
+ GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
+
+ grpc_endpoint *client =
+ grpc_tcp_create(grpc_fd_create(fd, "client"),
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "fd-client");
+
+ grpc_transport *transport =
+ grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1);
+ GPR_ASSERT(transport);
+ grpc_channel *channel = grpc_channel_create(
+ &exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
+ grpc_channel_args_destroy(final_args);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0);
+
+ grpc_exec_ctx_finish(&exec_ctx);
+
+ return channel != NULL ? channel : grpc_lame_client_channel_create(
+ target, GRPC_STATUS_INTERNAL,
+ "Failed to create client channel");
+}
+
+#else // !GPR_SUPPORT_CHANNELS_FROM_FD
+
+grpc_channel *grpc_insecure_channel_create_from_fd(
+ const char *target, int fd, const grpc_channel_args *args) {
+ GPR_ASSERT(0);
+ return NULL;
+}
+
+#endif // GPR_SUPPORT_CHANNELS_FROM_FD
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index bceef152be..721ba82d8f 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -90,7 +90,6 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_auth_context *auth_context) {
connector *c = arg;
grpc_closure *notify;
- grpc_channel_args *args_copy = NULL;
gpr_mu_lock(&c->mu);
if (c->connecting_endpoint == NULL) {
memset(c->result, 0, sizeof(*c->result));
@@ -109,23 +108,20 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
0);
auth_context_arg = grpc_auth_context_to_arg(auth_context);
- args_copy = grpc_channel_args_copy_and_add(c->args.channel_args,
- &auth_context_arg, 1);
- c->result->channel_args = args_copy;
+ c->result->channel_args = grpc_channel_args_copy_and_add(
+ c->args.channel_args, &auth_context_arg, 1);
}
notify = c->notify;
c->notify = NULL;
- /* look at c->args which are connector args. */
- notify->cb(exec_ctx, notify->cb_arg, GRPC_ERROR_NONE);
- if (args_copy != NULL) grpc_channel_args_destroy(args_copy);
+ grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL);
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
connector *c = arg;
- grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector,
- c->connecting_endpoint,
- on_secure_handshake_done, c);
+ grpc_channel_security_connector_do_handshake(
+ exec_ctx, c->security_connector, c->connecting_endpoint, c->args.deadline,
+ on_secure_handshake_done, c);
}
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
@@ -147,13 +143,14 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
&c->initial_string_sent);
} else {
grpc_channel_security_connector_do_handshake(
- exec_ctx, c->security_connector, tcp, on_secure_handshake_done, c);
+ exec_ctx, c->security_connector, tcp, c->args.deadline,
+ on_secure_handshake_done, c);
}
} else {
memset(c->result, 0, sizeof(*c->result));
notify = c->notify;
c->notify = NULL;
- notify->cb(exec_ctx, notify->cb_arg, GRPC_ERROR_NONE);
+ grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}
}
@@ -175,7 +172,6 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
grpc_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
- GPR_ASSERT(notify->cb);
c->notify = notify;
c->args = *args;
c->result = result;
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index 9efc922a8d..e5c987925c 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -75,14 +75,14 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_unref(exec_ctx, tcp);
- grpc_exec_ctx_push(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
grpc_resolved_addresses *resolved = NULL;
grpc_tcp_server *tcp = NULL;
size_t i;
- unsigned count = 0;
+ size_t count = 0;
int port_num = -1;
int port_temp;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -120,13 +120,15 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
}
if (count == 0) {
char *msg;
- gpr_asprintf(&msg, "No address added out of total %d resolved", naddrs);
+ gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
+ naddrs);
err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
gpr_free(msg);
goto error;
} else if (count != naddrs) {
char *msg;
- gpr_asprintf(&msg, "Only %d addresses added out of total %d resolved",
+ gpr_asprintf(&msg, "Only %" PRIuPTR
+ " addresses added out of total %" PRIuPTR " resolved",
count, naddrs);
err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs);
gpr_free(msg);
@@ -153,6 +155,11 @@ error:
}
port_num = 0;
+ const char *msg = grpc_error_string(err);
+ gpr_log(GPR_ERROR, "%s", msg);
+ grpc_error_free_string(msg);
+ GRPC_ERROR_UNREF(err);
+
done:
grpc_exec_ctx_finish(&exec_ctx);
if (errors != NULL) {
@@ -160,7 +167,6 @@ done:
GRPC_ERROR_UNREF(errors[i]);
}
}
- GRPC_ERROR_UNREF(err);
gpr_free(errors);
return port_num;
}
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
new file mode 100644
index 0000000000..96bf4d6f30
--- /dev/null
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c
@@ -0,0 +1,82 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc.h>
+#include <grpc/grpc_posix.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_SUPPORT_CHANNELS_FROM_FD
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/tcp_posix.h"
+#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/server.h"
+
+void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
+ grpc_completion_queue *cq,
+ int fd) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
+ char *name;
+ gpr_asprintf(&name, "fd:%d", fd);
+
+ grpc_endpoint *server_endpoint = grpc_tcp_create(
+ grpc_fd_create(fd, name), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name);
+
+ gpr_free(name);
+
+ const grpc_channel_args *server_args = grpc_server_get_channel_args(server);
+ grpc_transport *transport = grpc_create_chttp2_transport(
+ &exec_ctx, server_args, server_endpoint, 0 /* is_client */);
+ grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, grpc_cq_pollset(cq));
+ grpc_server_setup_transport(&exec_ctx, server, transport, NULL, server_args);
+ grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+#else // !GPR_SUPPORT_CHANNELS_FROM_FD
+
+void grpc_server_add_insecure_channel_from_fd(grpc_server *server,
+ grpc_completion_queue *cq,
+ int fd) {
+ GPR_ASSERT(0);
+}
+
+#endif // GPR_SUPPORT_CHANNELS_FROM_FD
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index 3286b220d4..c42810e913 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -129,9 +129,11 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
state->state = statep;
state_ref(state->state);
state->accepting_pollset = accepting_pollset;
- grpc_server_security_connector_do_handshake(exec_ctx, state->state->sc,
- acceptor, tcp,
- on_secure_handshake_done, state);
+ grpc_server_security_connector_do_handshake(
+ exec_ctx, state->state->sc, acceptor, tcp,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_seconds(120, GPR_TIMESPAN)),
+ on_secure_handshake_done, state);
}
/* Server callback: start listening on our ports */
@@ -173,7 +175,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
grpc_tcp_server *tcp = NULL;
server_secure_state *state = NULL;
size_t i;
- unsigned count = 0;
+ size_t count = 0;
int port_num = -1;
int port_temp;
grpc_security_status status = GRPC_SECURITY_ERROR;
@@ -188,7 +190,11 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
3, (server, addr, creds));
/* create security context */
- if (creds == NULL) goto error;
+ if (creds == NULL) {
+ err = GRPC_ERROR_CREATE(
+ "No credentials specified for secure server port (creds==NULL)");
+ goto error;
+ }
status = grpc_server_credentials_create_security_connector(creds, &sc);
if (status != GRPC_SECURITY_OK) {
char *msg;
@@ -240,14 +246,15 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
}
if (count == 0) {
char *msg;
- gpr_asprintf(&msg, "No address added out of total %d resolved",
+ gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved",
resolved->naddrs);
err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
gpr_free(msg);
goto error;
} else if (count != resolved->naddrs) {
char *msg;
- gpr_asprintf(&msg, "Only %d addresses added out of total %d resolved",
+ gpr_asprintf(&msg, "Only %" PRIuPTR
+ " addresses added out of total %" PRIuPTR " resolved",
count, resolved->naddrs);
err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs);
gpr_free(msg);
diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.c b/src/core/ext/transport/chttp2/transport/bin_decoder.c
new file mode 100644
index 0000000000..2d90b01cd8
--- /dev/null
+++ b/src/core/ext/transport/chttp2/transport/bin_decoder.c
@@ -0,0 +1,232 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/transport/chttp2/transport/bin_decoder.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include "src/core/lib/support/string.h"
+
+static uint8_t decode_table[] = {
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 62, 0x40, 0x40, 0x40, 63,
+ 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0, 1, 2, 3, 4, 5, 6,
+ 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
+ 19, 20, 21, 22, 23, 24, 25, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36,
+ 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48,
+ 49, 50, 51, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40,
+ 0x40, 0x40, 0x40, 0x40};
+
+static const uint8_t tail_xtra[4] = {0, 0, 1, 2};
+
+static bool input_is_valid(uint8_t *input_ptr, size_t length) {
+ size_t i;
+
+ for (i = 0; i < length; ++i) {
+ if ((decode_table[input_ptr[i]] & 0xC0) != 0) {
+ gpr_log(GPR_ERROR,
+ "Base64 decoding failed, invalid character '%c' in base64 "
+ "input.\n",
+ (char)(*input_ptr));
+ return false;
+ }
+ }
+ return true;
+}
+
+#define COMPOSE_OUTPUT_BYTE_0(input_ptr) \
+ (uint8_t)((decode_table[input_ptr[0]] << 2) | \
+ (decode_table[input_ptr[1]] >> 4))
+
+#define COMPOSE_OUTPUT_BYTE_1(input_ptr) \
+ (uint8_t)((decode_table[input_ptr[1]] << 4) | \
+ (decode_table[input_ptr[2]] >> 2))
+
+#define COMPOSE_OUTPUT_BYTE_2(input_ptr) \
+ (uint8_t)((decode_table[input_ptr[2]] << 6) | decode_table[input_ptr[3]])
+
+bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx) {
+ size_t input_tail;
+
+ if (ctx->input_cur > ctx->input_end || ctx->output_cur > ctx->output_end) {
+ return false;
+ }
+
+ // Process a block of 4 input characters and 3 output bytes
+ while (ctx->input_end >= ctx->input_cur + 4 &&
+ ctx->output_end >= ctx->output_cur + 3) {
+ if (!input_is_valid(ctx->input_cur, 4)) return false;
+ ctx->output_cur[0] = COMPOSE_OUTPUT_BYTE_0(ctx->input_cur);
+ ctx->output_cur[1] = COMPOSE_OUTPUT_BYTE_1(ctx->input_cur);
+ ctx->output_cur[2] = COMPOSE_OUTPUT_BYTE_2(ctx->input_cur);
+ ctx->output_cur += 3;
+ ctx->input_cur += 4;
+ }
+
+ // Process the tail of input data
+ input_tail = (size_t)(ctx->input_end - ctx->input_cur);
+ if (input_tail == 4) {
+ // Process the input data with pad chars
+ if (ctx->input_cur[3] == '=') {
+ if (ctx->input_cur[2] == '=' && ctx->output_end >= ctx->output_cur + 1) {
+ if (!input_is_valid(ctx->input_cur, 2)) return false;
+ *(ctx->output_cur++) = COMPOSE_OUTPUT_BYTE_0(ctx->input_cur);
+ ctx->input_cur += 4;
+ } else if (ctx->output_end >= ctx->output_cur + 2) {
+ if (!input_is_valid(ctx->input_cur, 3)) return false;
+ *(ctx->output_cur++) = COMPOSE_OUTPUT_BYTE_0(ctx->input_cur);
+ *(ctx->output_cur++) = COMPOSE_OUTPUT_BYTE_1(ctx->input_cur);
+ ;
+ ctx->input_cur += 4;
+ }
+ }
+
+ } else if (ctx->contains_tail && input_tail > 1) {
+ // Process the input data without pad chars, but constains_tail is set
+ if (ctx->output_end >= ctx->output_cur + tail_xtra[input_tail]) {
+ if (!input_is_valid(ctx->input_cur, input_tail)) return false;
+ switch (input_tail) {
+ case 3:
+ ctx->output_cur[1] = COMPOSE_OUTPUT_BYTE_1(ctx->input_cur);
+ case 2:
+ ctx->output_cur[0] = COMPOSE_OUTPUT_BYTE_0(ctx->input_cur);
+ }
+ ctx->output_cur += tail_xtra[input_tail];
+ ctx->input_cur += input_tail;
+ }
+ }
+
+ return true;
+}
+
+gpr_slice grpc_chttp2_base64_decode(gpr_slice input) {
+ size_t input_length = GPR_SLICE_LENGTH(input);
+ size_t output_length = input_length / 4 * 3;
+ struct grpc_base64_decode_context ctx;
+ gpr_slice output;
+
+ if (input_length % 4 != 0) {
+ gpr_log(GPR_ERROR,
+ "Base64 decoding failed, input of "
+ "grpc_chttp2_base64_decode has a length of %d, which is not a "
+ "multiple of 4.\n",
+ (int)input_length);
+ return gpr_empty_slice();
+ }
+
+ if (input_length > 0) {
+ uint8_t *input_end = GPR_SLICE_END_PTR(input);
+ if (*(--input_end) == '=') {
+ output_length--;
+ if (*(--input_end) == '=') {
+ output_length--;
+ }
+ }
+ }
+ output = gpr_slice_malloc(output_length);
+
+ ctx.input_cur = GPR_SLICE_START_PTR(input);
+ ctx.input_end = GPR_SLICE_END_PTR(input);
+ ctx.output_cur = GPR_SLICE_START_PTR(output);
+ ctx.output_end = GPR_SLICE_END_PTR(output);
+ ctx.contains_tail = false;
+
+ if (!grpc_base64_decode_partial(&ctx)) {
+ char *s = gpr_dump_slice(input, GPR_DUMP_ASCII);
+ gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s);
+ gpr_free(s);
+ gpr_slice_unref(output);
+ return gpr_empty_slice();
+ }
+ GPR_ASSERT(ctx.output_cur == GPR_SLICE_END_PTR(output));
+ GPR_ASSERT(ctx.input_cur == GPR_SLICE_END_PTR(input));
+ return output;
+}
+
+gpr_slice grpc_chttp2_base64_decode_with_length(gpr_slice input,
+ size_t output_length) {
+ size_t input_length = GPR_SLICE_LENGTH(input);
+ gpr_slice output = gpr_slice_malloc(output_length);
+ struct grpc_base64_decode_context ctx;
+
+ // The length of a base64 string cannot be 4 * n + 1
+ if (input_length % 4 == 1) {
+ gpr_log(GPR_ERROR,
+ "Base64 decoding failed, input of "
+ "grpc_chttp2_base64_decode_with_length has a length of %d, which "
+ "has a tail of 1 byte.\n",
+ (int)input_length);
+ gpr_slice_unref(output);
+ return gpr_empty_slice();
+ }
+
+ if (output_length > input_length / 4 * 3 + tail_xtra[input_length % 4]) {
+ gpr_log(GPR_ERROR,
+ "Base64 decoding failed, output_length %d is longer "
+ "than the max possible output length %d.\n",
+ (int)output_length,
+ (int)(input_length / 4 * 3 + tail_xtra[input_length % 4]));
+ gpr_slice_unref(output);
+ return gpr_empty_slice();
+ }
+
+ ctx.input_cur = GPR_SLICE_START_PTR(input);
+ ctx.input_end = GPR_SLICE_END_PTR(input);
+ ctx.output_cur = GPR_SLICE_START_PTR(output);
+ ctx.output_end = GPR_SLICE_END_PTR(output);
+ ctx.contains_tail = true;
+
+ if (!grpc_base64_decode_partial(&ctx)) {
+ char *s = gpr_dump_slice(input, GPR_DUMP_ASCII);
+ gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s);
+ gpr_free(s);
+ gpr_slice_unref(output);
+ return gpr_empty_slice();
+ }
+ GPR_ASSERT(ctx.output_cur == GPR_SLICE_END_PTR(output));
+ GPR_ASSERT(ctx.input_cur <= GPR_SLICE_END_PTR(input));
+ return output;
+}
diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.h b/src/core/ext/transport/chttp2/transport/bin_decoder.h
new file mode 100644
index 0000000000..b9d40c9b74
--- /dev/null
+++ b/src/core/ext/transport/chttp2/transport/bin_decoder.h
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H
+#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H
+
+#include <grpc/support/slice.h>
+#include <stdbool.h>
+
+struct grpc_base64_decode_context {
+ /* input/output: */
+ uint8_t *input_cur;
+ uint8_t *input_end;
+ uint8_t *output_cur;
+ uint8_t *output_end;
+ /* Indicate if the decoder should handle the tail of input data*/
+ bool contains_tail;
+};
+
+/* base64 decode a grpc_base64_decode_context util either input_end is reached
+ or output_end is reached. When input_end is reached, (input_end - input_cur)
+ is less than 4. When output_end is reached, (output_end - output_cur) is less
+ than 3. Returns false if decoding is failed. */
+bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx);
+
+/* base64 decode a slice with pad chars. Returns a new slice, does not take
+ ownership of the input. Returns an empty slice if decoding is failed. */
+gpr_slice grpc_chttp2_base64_decode(gpr_slice input);
+
+/* base64 decode a slice without pad chars, data length is needed. Returns a new
+ slice, does not take ownership of the input. Returns an empty slice if
+ decoding is failed. */
+gpr_slice grpc_chttp2_base64_decode_with_length(gpr_slice input,
+ size_t output_length);
+
+#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H */
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 5264b9e1f8..9aa39ba26c 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -47,6 +47,7 @@
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h"
+#include "src/core/lib/http/parser.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -105,7 +106,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
- grpc_status_code status);
+ grpc_status_code status,
+ gpr_slice *optional_message);
static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
@@ -161,6 +163,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(t->ep == NULL);
+ gpr_slice_unref(t->optional_drop_message);
+
gpr_slice_buffer_destroy(&t->global.qbuf);
gpr_slice_buffer_destroy(&t->writing.outbuf);
@@ -190,8 +194,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
and maybe they hold resources that need to be freed */
while (t->global.pings.next != &t->global.pings) {
grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
- grpc_exec_ctx_push(exec_ctx, ping->on_recv,
- GRPC_ERROR_CREATE("Transport closed"), NULL);
+ grpc_exec_ctx_sched(exec_ctx, ping->on_recv,
+ GRPC_ERROR_CREATE("Transport closed"), NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -261,6 +265,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
+ t->optional_drop_message = gpr_empty_slice();
grpc_connectivity_state_init(
&t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
is_client ? "client_transport" : "server_transport");
@@ -444,7 +449,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error) {
if (!t->closed) {
t->closed = 1;
- connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE,
+ connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
if (t->ep) {
allow_endpoint_shutdown_locked(exec_ctx, t);
@@ -643,7 +648,7 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
t->executor.writing_active = 1;
REF_TRANSPORT(t, "writing");
prevent_endpoint_shutdown(t);
- grpc_exec_ctx_push(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL);
}
check_read_ops(exec_ctx, &t->global);
@@ -807,8 +812,10 @@ void grpc_chttp2_add_incoming_goaway(
gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
+ /* lie: use transient failure from the transport to indicate goaway has been
+ * received */
connectivity_state_set(
- exec_ctx, transport_global, GRPC_CHANNEL_FATAL_FAILURE,
+ exec_ctx, transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE,
grpc_error_set_str(
grpc_error_set_int(GRPC_ERROR_CREATE("GOAWAY received"),
GRPC_ERROR_INT_HTTP2_ERROR,
@@ -869,7 +876,7 @@ static void maybe_start_some_streams(
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_UNAVAILABLE);
+ GRPC_STATUS_UNAVAILABLE, NULL);
}
}
@@ -907,7 +914,7 @@ void grpc_chttp2_complete_closure_step(
stream_global->collecting_stats);
stream_global->collecting_stats = NULL;
}
- grpc_exec_ctx_push(exec_ctx, closure, closure->error, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure, closure->error, NULL);
}
*pclosure = NULL;
}
@@ -953,7 +960,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_from_api(exec_ctx, transport_global, stream_global,
- op->cancel_with_status);
+ op->cancel_with_status, op->optional_close_message);
}
if (op->close_with_status != GRPC_STATUS_OK) {
@@ -974,10 +981,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
if (metadata_size > metadata_peer_limit) {
gpr_log(GPR_DEBUG,
"to-be-sent initial metadata size exceeds peer limit "
- "(%lu vs. %lu)",
+ "(%" PRIuPTR " vs. %" PRIuPTR ")",
metadata_size, metadata_peer_limit);
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
} else {
if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
stream_global->seen_error = true;
@@ -1033,10 +1040,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
if (metadata_size > metadata_peer_limit) {
gpr_log(GPR_DEBUG,
"to-be-sent trailing metadata size exceeds peer limit "
- "(%lu vs. %lu)",
+ "(%" PRIuPTR " vs. %" PRIuPTR ")",
metadata_size, metadata_peer_limit);
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
} else {
if (contains_non_ok_status(transport_global,
op->send_trailing_metadata)) {
@@ -1127,7 +1134,7 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
for (ping = transport_global->pings.next; ping != &transport_global->pings;
ping = ping->next) {
if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
- grpc_exec_ctx_push(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -1160,7 +1167,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
return;
}
- grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL);
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
@@ -1228,14 +1235,14 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
&stream_global->received_initial_metadata,
stream_global->recv_initial_metadata);
- grpc_exec_ctx_push(exec_ctx, stream_global->recv_initial_metadata_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, stream_global->recv_initial_metadata_ready,
+ GRPC_ERROR_NONE, NULL);
stream_global->recv_initial_metadata_ready = NULL;
}
if (stream_global->recv_message_ready != NULL) {
@@ -1248,13 +1255,13 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames);
GPR_ASSERT(*stream_global->recv_message != NULL);
- grpc_exec_ctx_push(exec_ctx, stream_global->recv_message_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
stream_global->recv_message_ready = NULL;
} else if (stream_global->published_trailing_metadata) {
*stream_global->recv_message = NULL;
- grpc_exec_ctx_push(exec_ctx, stream_global->recv_message_ready,
- GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
stream_global->recv_message_ready = NULL;
}
}
@@ -1267,7 +1274,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
}
}
if (stream_global->all_incoming_byte_streams_finished) {
@@ -1334,7 +1341,8 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
- grpc_status_code status) {
+ grpc_status_code status,
+ gpr_slice *optional_message) {
if (!stream_global->read_closed || !stream_global->write_closed) {
if (stream_global->id != 0) {
gpr_slice_buffer_add(
@@ -1344,8 +1352,12 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
(uint32_t)grpc_chttp2_grpc_status_to_http2_error(status),
&stream_global->stats.outgoing));
}
+
+ if (optional_message) {
+ gpr_slice_ref(*optional_message);
+ }
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
- NULL);
+ optional_message);
}
if (status != GRPC_STATUS_OK && !stream_global->seen_error) {
stream_global->seen_error = true;
@@ -1465,93 +1477,95 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(status >= 0 && (int)status < 100);
- GPR_ASSERT(stream_global->id != 0);
-
- /* Hand roll a header block.
- This is unnecessarily ugly - at some point we should find a more elegant
- solution.
- It's complicated by the fact that our send machinery would be dead by the
- time we got around to sending this, so instead we ignore HPACK compression
- and just write the uncompressed bytes onto the wire. */
- status_hdr = gpr_slice_malloc(15 + (status >= 10));
- p = GPR_SLICE_START_PTR(status_hdr);
- *p++ = 0x40; /* literal header */
- *p++ = 11; /* len(grpc-status) */
- *p++ = 'g';
- *p++ = 'r';
- *p++ = 'p';
- *p++ = 'c';
- *p++ = '-';
- *p++ = 's';
- *p++ = 't';
- *p++ = 'a';
- *p++ = 't';
- *p++ = 'u';
- *p++ = 's';
- if (status < 10) {
- *p++ = 1;
- *p++ = (uint8_t)('0' + status);
- } else {
- *p++ = 2;
- *p++ = (uint8_t)('0' + (status / 10));
- *p++ = (uint8_t)('0' + (status % 10));
- }
- GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
- len += (uint32_t)GPR_SLICE_LENGTH(status_hdr);
-
- if (optional_message) {
- GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
- message_pfx = gpr_slice_malloc(15);
- p = GPR_SLICE_START_PTR(message_pfx);
- *p++ = 0x40;
- *p++ = 12; /* len(grpc-message) */
+ if (stream_global->id != 0 && !transport_global->is_client) {
+ /* Hand roll a header block.
+ This is unnecessarily ugly - at some point we should find a more elegant
+ solution.
+ It's complicated by the fact that our send machinery would be dead by the
+ time we got around to sending this, so instead we ignore HPACK
+ compression
+ and just write the uncompressed bytes onto the wire. */
+ status_hdr = gpr_slice_malloc(15 + (status >= 10));
+ p = GPR_SLICE_START_PTR(status_hdr);
+ *p++ = 0x40; /* literal header */
+ *p++ = 11; /* len(grpc-status) */
*p++ = 'g';
*p++ = 'r';
*p++ = 'p';
*p++ = 'c';
*p++ = '-';
- *p++ = 'm';
- *p++ = 'e';
- *p++ = 's';
*p++ = 's';
+ *p++ = 't';
*p++ = 'a';
- *p++ = 'g';
- *p++ = 'e';
- *p++ = (uint8_t)GPR_SLICE_LENGTH(*optional_message);
- GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
- len += (uint32_t)GPR_SLICE_LENGTH(message_pfx);
- len += (uint32_t)GPR_SLICE_LENGTH(*optional_message);
- }
-
- hdr = gpr_slice_malloc(9);
- p = GPR_SLICE_START_PTR(hdr);
- *p++ = (uint8_t)(len >> 16);
- *p++ = (uint8_t)(len >> 8);
- *p++ = (uint8_t)(len);
- *p++ = GRPC_CHTTP2_FRAME_HEADER;
- *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
- *p++ = (uint8_t)(stream_global->id >> 24);
- *p++ = (uint8_t)(stream_global->id >> 16);
- *p++ = (uint8_t)(stream_global->id >> 8);
- *p++ = (uint8_t)(stream_global->id);
- GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr));
-
- gpr_slice_buffer_add(&transport_global->qbuf, hdr);
- gpr_slice_buffer_add(&transport_global->qbuf, status_hdr);
- if (optional_message) {
- gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
- gpr_slice_buffer_add(&transport_global->qbuf,
- gpr_slice_ref(*optional_message));
- }
+ *p++ = 't';
+ *p++ = 'u';
+ *p++ = 's';
+ if (status < 10) {
+ *p++ = 1;
+ *p++ = (uint8_t)('0' + status);
+ } else {
+ *p++ = 2;
+ *p++ = (uint8_t)('0' + (status / 10));
+ *p++ = (uint8_t)('0' + (status % 10));
+ }
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
+ len += (uint32_t)GPR_SLICE_LENGTH(status_hdr);
+
+ if (optional_message) {
+ GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
+ message_pfx = gpr_slice_malloc(15);
+ p = GPR_SLICE_START_PTR(message_pfx);
+ *p++ = 0x40;
+ *p++ = 12; /* len(grpc-message) */
+ *p++ = 'g';
+ *p++ = 'r';
+ *p++ = 'p';
+ *p++ = 'c';
+ *p++ = '-';
+ *p++ = 'm';
+ *p++ = 'e';
+ *p++ = 's';
+ *p++ = 's';
+ *p++ = 'a';
+ *p++ = 'g';
+ *p++ = 'e';
+ *p++ = (uint8_t)GPR_SLICE_LENGTH(*optional_message);
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
+ len += (uint32_t)GPR_SLICE_LENGTH(message_pfx);
+ len += (uint32_t)GPR_SLICE_LENGTH(*optional_message);
+ }
- gpr_slice_buffer_add(
- &transport_global->qbuf,
- grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR,
- &stream_global->stats.outgoing));
+ hdr = gpr_slice_malloc(9);
+ p = GPR_SLICE_START_PTR(hdr);
+ *p++ = (uint8_t)(len >> 16);
+ *p++ = (uint8_t)(len >> 8);
+ *p++ = (uint8_t)(len);
+ *p++ = GRPC_CHTTP2_FRAME_HEADER;
+ *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
+ *p++ = (uint8_t)(stream_global->id >> 24);
+ *p++ = (uint8_t)(stream_global->id >> 16);
+ *p++ = (uint8_t)(stream_global->id >> 8);
+ *p++ = (uint8_t)(stream_global->id);
+ GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr));
+
+ gpr_slice_buffer_add(&transport_global->qbuf, hdr);
+ gpr_slice_buffer_add(&transport_global->qbuf, status_hdr);
+ if (optional_message) {
+ gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
+ gpr_slice_buffer_add(&transport_global->qbuf,
+ gpr_slice_ref(*optional_message));
+ }
- if (optional_message) {
- gpr_slice_ref(*optional_message);
+ gpr_slice_buffer_add(
+ &transport_global->qbuf,
+ grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR,
+ &stream_global->stats.outgoing));
+
+ if (optional_message) {
+ gpr_slice_ref(*optional_message);
+ }
}
+
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
optional_message);
grpc_error *err = GRPC_ERROR_CREATE("Stream closed");
@@ -1569,8 +1583,12 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
+ grpc_chttp2_transport *transport = TRANSPORT_FROM_GLOBAL(transport_global);
cancel_from_api(user_data, transport_global, stream_global,
- GRPC_STATUS_UNAVAILABLE);
+ GRPC_STATUS_UNAVAILABLE,
+ GPR_SLICE_IS_EMPTY(transport->optional_drop_message)
+ ? NULL
+ : &transport->optional_drop_message);
}
static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
@@ -1643,24 +1661,64 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
- grpc_exec_ctx_push(exec_ctx, &t->parsing_action, error, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL);
} else {
post_reading_action_locked(exec_ctx, t, s_unused, arg);
}
}
+static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport *t) {
+ grpc_http_parser parser;
+ size_t i = 0;
+ grpc_error *error = GRPC_ERROR_NONE;
+ grpc_http_response response;
+ memset(&response, 0, sizeof(response));
+
+ grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
+
+ grpc_error *parse_error = GRPC_ERROR_NONE;
+ for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
+ parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i]);
+ }
+ if (parse_error == GRPC_ERROR_NONE &&
+ (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
+ error = grpc_error_set_int(
+ GRPC_ERROR_CREATE("Trying to connect an http1.x server"),
+ GRPC_ERROR_INT_HTTP_STATUS, response.status);
+ }
+ GRPC_ERROR_UNREF(parse_error);
+
+ grpc_http_parser_destroy(&parser);
+ grpc_http_response_destroy(&response);
+ return error;
+}
+
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
- grpc_error *errors[2] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE};
+ grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
+ GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing,
t->read_buffer.slices[i]);
};
+ if (i != t->read_buffer.count) {
+ gpr_slice_unref(t->optional_drop_message);
+ errors[2] = try_http_parsing(exec_ctx, t);
+ if (errors[2] != GRPC_ERROR_NONE) {
+ t->optional_drop_message = gpr_slice_from_copied_string(
+ "Connection dropped: received http1.x response");
+ } else {
+ t->optional_drop_message = gpr_slice_from_copied_string(
+ "Connection dropped: received unparseable response");
+ }
+ }
grpc_error *err =
- errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE
+ errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE &&
+ errors[2] == GRPC_ERROR_NONE
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
GPR_ARRAY_SIZE(errors));
@@ -1794,6 +1852,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
add_to_pollset_locked, pollset, 0);
}
+static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset_set *pollset_set) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
+ (grpc_chttp2_stream *)gs,
+ add_to_pollset_set_locked, pollset_set, 0);
+}
+
/*******************************************************************************
* BYTE STREAM
*/
@@ -1870,10 +1935,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
}
if (bs->slices.count > 0) {
*arg->slice = gpr_slice_buffer_take_first(&bs->slices);
- grpc_exec_ctx_push(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL);
} else if (bs->error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_push(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error),
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error),
+ NULL);
} else {
bs->on_next = arg->on_complete;
bs->next = arg->slice;
@@ -1930,7 +1995,7 @@ static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream;
if (bs->on_next != NULL) {
*bs->next = arg->slice;
- grpc_exec_ctx_push(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL);
bs->on_next = NULL;
} else {
gpr_slice_buffer_add(&bs->slices, arg->slice);
@@ -1968,7 +2033,7 @@ static void incoming_byte_stream_finished_failed_locked(
grpc_chttp2_incoming_byte_stream *bs = a->bs;
grpc_error *error = a->error;
gpr_free(a);
- grpc_exec_ctx_push(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
+ grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL);
bs->on_next = NULL;
GRPC_ERROR_UNREF(bs->error);
bs->error = error;
@@ -2045,10 +2110,13 @@ static char *format_flowctl_context_var(const char *context, const char *var,
int64_t val, uint32_t id,
char **scope) {
char *underscore_pos;
+ char *buf;
char *result;
if (context == NULL) {
*scope = NULL;
- gpr_asprintf(&result, "%s(%lld)", var, val);
+ gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val);
+ result = gpr_leftpad(buf, ' ', 40);
+ gpr_free(buf);
return result;
}
underscore_pos = strchr(context, '_');
@@ -2059,7 +2127,9 @@ static char *format_flowctl_context_var(const char *context, const char *var,
gpr_asprintf(scope, "%s[%d]", tmp, id);
gpr_free(tmp);
}
- gpr_asprintf(&result, "%s.%s(%lld)", underscore_pos + 1, var, val);
+ gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val);
+ result = gpr_leftpad(buf, ' ', 40);
+ gpr_free(buf);
return result;
}
@@ -2080,6 +2150,8 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
uint32_t stream_id, int64_t val1, int64_t val2) {
char *scope1;
char *scope2;
+ char *tmp_phase;
+ char *tmp_scope1;
char *label1 =
format_flowctl_context_var(context1, var1, val1, stream_id, &scope1);
char *label2 =
@@ -2087,14 +2159,18 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
char *clisvr = is_client ? "client" : "server";
char *prefix;
- gpr_asprintf(&prefix, "FLOW % 8s: %s % 11s ", phase, clisvr, scope1);
+ tmp_phase = gpr_leftpad(phase, ' ', 8);
+ tmp_scope1 = gpr_leftpad(scope1, ' ', 11);
+ gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1);
+ gpr_free(tmp_phase);
+ gpr_free(tmp_scope1);
switch (op) {
case GRPC_CHTTP2_FLOWCTL_MOVE:
GPR_ASSERT(samestr(scope1, scope2));
if (val2 != 0) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "%sMOVE % 40s <- % 40s giving %d", prefix, label1, label2,
+ "%sMOVE %s <- %s giving %" PRId64, prefix, label1, label2,
val1 + val2);
}
break;
@@ -2102,7 +2178,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
GPR_ASSERT(val2 >= 0);
if (val2 != 0) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "%sCREDIT % 40s by % 40s giving %d", prefix, label1, label2,
+ "%sCREDIT %s by %s giving %" PRId64, prefix, label1, label2,
val1 + val2);
}
break;
@@ -2110,7 +2186,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
GPR_ASSERT(val2 >= 0);
if (val2 != 0) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "%sDEBIT % 40s by % 40s giving %d", prefix, label1, label2,
+ "%sDEBIT %s by %s giving %" PRId64, prefix, label1, label2,
val1 - val2);
}
break;
@@ -2135,6 +2211,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
"chttp2",
init_stream,
set_pollset,
+ set_pollset_set,
perform_stream_op,
perform_transport_op,
destroy_stream,
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 7e5d58380e..54e72ebd24 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -383,6 +383,9 @@ struct grpc_chttp2_transport {
/** Transport op to be applied post-parsing */
grpc_transport_op *post_parsing_op;
+
+ /** Message explaining the reason of dropping connection */
+ gpr_slice optional_drop_message;
};
typedef struct {
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 72b3131d7b..785134091f 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -559,7 +559,7 @@ static grpc_error *update_incoming_window(
uint32_t incoming_frame_size = transport_parsing->incoming_frame_size;
if (incoming_frame_size > transport_parsing->incoming_window) {
char *msg;
- gpr_asprintf(&msg, "frame of size %d overflows incoming window of %d",
+ gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64,
transport_parsing->incoming_frame_size,
transport_parsing->incoming_window);
grpc_error *err = GRPC_ERROR_CREATE(msg);
@@ -569,7 +569,7 @@ static grpc_error *update_incoming_window(
if (incoming_frame_size > stream_parsing->incoming_window) {
char *msg;
- gpr_asprintf(&msg, "frame of size %d overflows incoming window of %d",
+ gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64,
transport_parsing->incoming_frame_size,
stream_parsing->incoming_window);
grpc_error *err = GRPC_ERROR_CREATE(msg);
@@ -678,7 +678,8 @@ static void on_initial_header(void *tp, grpc_mdelem *md) {
if (new_size > metadata_size_limit) {
if (!stream_parsing->exceeded_metadata_size) {
gpr_log(GPR_DEBUG,
- "received initial metadata size exceeds limit (%lu vs. %lu)",
+ "received initial metadata size exceeds limit (%" PRIuPTR
+ " vs. %" PRIuPTR ")",
new_size, metadata_size_limit);
stream_parsing->seen_error = true;
stream_parsing->exceeded_metadata_size = true;
@@ -724,7 +725,8 @@ static void on_trailing_header(void *tp, grpc_mdelem *md) {
if (new_size > metadata_size_limit) {
if (!stream_parsing->exceeded_metadata_size) {
gpr_log(GPR_DEBUG,
- "received trailing metadata size exceeds limit (%lu vs. %lu)",
+ "received trailing metadata size exceeds limit (%" PRIuPTR
+ " vs. %" PRIuPTR ")",
new_size, metadata_size_limit);
stream_parsing->seen_error = true;
stream_parsing->exceeded_metadata_size = true;
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index ca56debc84..add7678182 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -187,8 +187,8 @@ void grpc_chttp2_perform_writes(
grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
&transport_writing->done_cb);
} else {
- grpc_exec_ctx_push(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE,
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE,
+ NULL);
}
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index ea0f2bcba4..25d8aca250 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -152,14 +152,18 @@ static void next_recv_step(stream_obj *s, enum e_caller caller);
static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {}
+static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
+ grpc_transport *gt, grpc_stream *gs,
+ grpc_pollset_set *pollset_set) {}
+
static void enqueue_callbacks(grpc_closure *callback_list[]) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (callback_list[0]) {
- grpc_exec_ctx_push(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL);
callback_list[0] = NULL;
}
if (callback_list[1]) {
- grpc_exec_ctx_push(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL);
+ grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL);
callback_list[1] = NULL;
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -218,8 +222,11 @@ static void on_write_completed(cronet_bidirectional_stream *stream,
static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
- memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
- gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+ if (s->total_read_bytes > 0) {
+ // Only copy if there is non-zero number of bytes
+ memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
+ gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+ }
grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
*s->recv_message = (grpc_byte_buffer *)&s->sbs;
}
@@ -347,8 +354,17 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) {
if (grpc_cronet_trace) {
gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
}
- cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
- s->remaining_read_bytes);
+ if (s->remaining_read_bytes > 0) {
+ cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
+ s->remaining_read_bytes);
+ } else {
+ // Calling the closing callback directly since this is a 0 byte read
+ // for an empty message.
+ process_recv_message(s, NULL);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ invoke_closing_callback(s);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ }
}
}
break;
@@ -634,7 +650,13 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
}
}
-const grpc_transport_vtable grpc_cronet_vtable = {
- sizeof(stream_obj), "cronet_http", init_stream,
- set_pollset_do_nothing, perform_stream_op, NULL,
- destroy_stream, destroy_transport, NULL};
+const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
+ "cronet_http",
+ init_stream,
+ set_pollset_do_nothing,
+ set_pollset_set_do_nothing,
+ perform_stream_op,
+ NULL,
+ destroy_stream,
+ destroy_transport,
+ NULL};