diff options
author | 2016-06-20 08:09:52 -0700 | |
---|---|---|
committer | 2016-06-20 08:09:52 -0700 | |
commit | a4d2d0c798a182d5cc57ce630d31614173f2d67b (patch) | |
tree | a1f0f4e324f8bc65ced4e7b49af3af266f6ad900 /src/core/ext/transport | |
parent | ccc8ec54b0cf75f52c0a19a2674d6a6bc8e93743 (diff) | |
parent | 4e29480e430b94392105934750adfd85cb7849ce (diff) |
Merge github.com:grpc/grpc into reuse_port
Diffstat (limited to 'src/core/ext/transport')
13 files changed, 748 insertions, 160 deletions
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 28d1be878f..85f9efb3b6 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -103,13 +103,13 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); GPR_ASSERT(c->result->transport); - c->result->channel_args = c->args.channel_args; + c->result->channel_args = grpc_channel_args_copy(c->args.channel_args); } else { memset(c->result, 0, sizeof(*c->result)); } notify = c->notify; c->notify = NULL; - notify->cb(exec_ctx, notify->cb_arg, error); + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {} diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c new file mode 100644 index 0000000000..ca435c25ce --- /dev/null +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c @@ -0,0 +1,95 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> +#include <grpc/grpc_posix.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +#ifdef GPR_SUPPORT_CHANNELS_FROM_FD + +#include <fcntl.h> + +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/tcp_posix.h" +#include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/transport.h" + +grpc_channel *grpc_insecure_channel_create_from_fd( + const char *target, int fd, const grpc_channel_args *args) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_API_TRACE("grpc_insecure_channel_create(target=%p, fd=%d, args=%p)", 3, + (target, fd, args)); + + grpc_arg default_authority_arg; + default_authority_arg.type = GRPC_ARG_STRING; + default_authority_arg.key = GRPC_ARG_DEFAULT_AUTHORITY; + default_authority_arg.value.string = "test.authority"; + grpc_channel_args *final_args = + grpc_channel_args_copy_and_add(args, &default_authority_arg, 1); + + int flags = fcntl(fd, F_GETFL, 0); + GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); + + grpc_endpoint *client = + grpc_tcp_create(grpc_fd_create(fd, "client"), + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "fd-client"); + + grpc_transport *transport = + grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1); + GPR_ASSERT(transport); + grpc_channel *channel = grpc_channel_create( + &exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); + grpc_channel_args_destroy(final_args); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0); + + grpc_exec_ctx_finish(&exec_ctx); + + return channel != NULL ? channel : grpc_lame_client_channel_create( + target, GRPC_STATUS_INTERNAL, + "Failed to create client channel"); +} + +#else // !GPR_SUPPORT_CHANNELS_FROM_FD + +grpc_channel *grpc_insecure_channel_create_from_fd( + const char *target, int fd, const grpc_channel_args *args) { + GPR_ASSERT(0); + return NULL; +} + +#endif // GPR_SUPPORT_CHANNELS_FROM_FD diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index bceef152be..721ba82d8f 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -90,7 +90,6 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_auth_context *auth_context) { connector *c = arg; grpc_closure *notify; - grpc_channel_args *args_copy = NULL; gpr_mu_lock(&c->mu); if (c->connecting_endpoint == NULL) { memset(c->result, 0, sizeof(*c->result)); @@ -109,23 +108,20 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0); auth_context_arg = grpc_auth_context_to_arg(auth_context); - args_copy = grpc_channel_args_copy_and_add(c->args.channel_args, - &auth_context_arg, 1); - c->result->channel_args = args_copy; + c->result->channel_args = grpc_channel_args_copy_and_add( + c->args.channel_args, &auth_context_arg, 1); } notify = c->notify; c->notify = NULL; - /* look at c->args which are connector args. */ - notify->cb(exec_ctx, notify->cb_arg, GRPC_ERROR_NONE); - if (args_copy != NULL) grpc_channel_args_destroy(args_copy); + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector, - c->connecting_endpoint, - on_secure_handshake_done, c); + grpc_channel_security_connector_do_handshake( + exec_ctx, c->security_connector, c->connecting_endpoint, c->args.deadline, + on_secure_handshake_done, c); } static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -147,13 +143,14 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { &c->initial_string_sent); } else { grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, tcp, on_secure_handshake_done, c); + exec_ctx, c->security_connector, tcp, c->args.deadline, + on_secure_handshake_done, c); } } else { memset(c->result, 0, sizeof(*c->result)); notify = c->notify; c->notify = NULL; - notify->cb(exec_ctx, notify->cb_arg, GRPC_ERROR_NONE); + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); } } @@ -175,7 +172,6 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, grpc_closure *notify) { connector *c = (connector *)con; GPR_ASSERT(c->notify == NULL); - GPR_ASSERT(notify->cb); c->notify = notify; c->args = *args; c->result = result; diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 9efc922a8d..e5c987925c 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -75,14 +75,14 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_push(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { grpc_resolved_addresses *resolved = NULL; grpc_tcp_server *tcp = NULL; size_t i; - unsigned count = 0; + size_t count = 0; int port_num = -1; int port_temp; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -120,13 +120,15 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { } if (count == 0) { char *msg; - gpr_asprintf(&msg, "No address added out of total %d resolved", naddrs); + gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", + naddrs); err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); gpr_free(msg); goto error; } else if (count != naddrs) { char *msg; - gpr_asprintf(&msg, "Only %d addresses added out of total %d resolved", + gpr_asprintf(&msg, "Only %" PRIuPTR + " addresses added out of total %" PRIuPTR " resolved", count, naddrs); err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); gpr_free(msg); @@ -153,6 +155,11 @@ error: } port_num = 0; + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); + done: grpc_exec_ctx_finish(&exec_ctx); if (errors != NULL) { @@ -160,7 +167,6 @@ done: GRPC_ERROR_UNREF(errors[i]); } } - GRPC_ERROR_UNREF(err); gpr_free(errors); return port_num; } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c new file mode 100644 index 0000000000..96bf4d6f30 --- /dev/null +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c @@ -0,0 +1,82 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> +#include <grpc/grpc_posix.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +#ifdef GPR_SUPPORT_CHANNELS_FROM_FD + +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/tcp_posix.h" +#include "src/core/lib/surface/completion_queue.h" +#include "src/core/lib/surface/server.h" + +void grpc_server_add_insecure_channel_from_fd(grpc_server *server, + grpc_completion_queue *cq, + int fd) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + char *name; + gpr_asprintf(&name, "fd:%d", fd); + + grpc_endpoint *server_endpoint = grpc_tcp_create( + grpc_fd_create(fd, name), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, name); + + gpr_free(name); + + const grpc_channel_args *server_args = grpc_server_get_channel_args(server); + grpc_transport *transport = grpc_create_chttp2_transport( + &exec_ctx, server_args, server_endpoint, 0 /* is_client */); + grpc_endpoint_add_to_pollset(&exec_ctx, server_endpoint, grpc_cq_pollset(cq)); + grpc_server_setup_transport(&exec_ctx, server, transport, NULL, server_args); + grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL, 0); + grpc_exec_ctx_finish(&exec_ctx); +} + +#else // !GPR_SUPPORT_CHANNELS_FROM_FD + +void grpc_server_add_insecure_channel_from_fd(grpc_server *server, + grpc_completion_queue *cq, + int fd) { + GPR_ASSERT(0); +} + +#endif // GPR_SUPPORT_CHANNELS_FROM_FD diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 3286b220d4..c42810e913 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -129,9 +129,11 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, state->state = statep; state_ref(state->state); state->accepting_pollset = accepting_pollset; - grpc_server_security_connector_do_handshake(exec_ctx, state->state->sc, - acceptor, tcp, - on_secure_handshake_done, state); + grpc_server_security_connector_do_handshake( + exec_ctx, state->state->sc, acceptor, tcp, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(120, GPR_TIMESPAN)), + on_secure_handshake_done, state); } /* Server callback: start listening on our ports */ @@ -173,7 +175,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_tcp_server *tcp = NULL; server_secure_state *state = NULL; size_t i; - unsigned count = 0; + size_t count = 0; int port_num = -1; int port_temp; grpc_security_status status = GRPC_SECURITY_ERROR; @@ -188,7 +190,11 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, 3, (server, addr, creds)); /* create security context */ - if (creds == NULL) goto error; + if (creds == NULL) { + err = GRPC_ERROR_CREATE( + "No credentials specified for secure server port (creds==NULL)"); + goto error; + } status = grpc_server_credentials_create_security_connector(creds, &sc); if (status != GRPC_SECURITY_OK) { char *msg; @@ -240,14 +246,15 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, } if (count == 0) { char *msg; - gpr_asprintf(&msg, "No address added out of total %d resolved", + gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", resolved->naddrs); err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs); gpr_free(msg); goto error; } else if (count != resolved->naddrs) { char *msg; - gpr_asprintf(&msg, "Only %d addresses added out of total %d resolved", + gpr_asprintf(&msg, "Only %" PRIuPTR + " addresses added out of total %" PRIuPTR " resolved", count, resolved->naddrs); err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs); gpr_free(msg); diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.c b/src/core/ext/transport/chttp2/transport/bin_decoder.c new file mode 100644 index 0000000000..2d90b01cd8 --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.c @@ -0,0 +1,232 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/transport/chttp2/transport/bin_decoder.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include "src/core/lib/support/string.h" + +static uint8_t decode_table[] = { + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 62, 0x40, 0x40, 0x40, 63, + 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0, 1, 2, 3, 4, 5, 6, + 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, + 19, 20, 21, 22, 23, 24, 25, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, + 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, + 49, 50, 51, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, 0x40, + 0x40, 0x40, 0x40, 0x40}; + +static const uint8_t tail_xtra[4] = {0, 0, 1, 2}; + +static bool input_is_valid(uint8_t *input_ptr, size_t length) { + size_t i; + + for (i = 0; i < length; ++i) { + if ((decode_table[input_ptr[i]] & 0xC0) != 0) { + gpr_log(GPR_ERROR, + "Base64 decoding failed, invalid character '%c' in base64 " + "input.\n", + (char)(*input_ptr)); + return false; + } + } + return true; +} + +#define COMPOSE_OUTPUT_BYTE_0(input_ptr) \ + (uint8_t)((decode_table[input_ptr[0]] << 2) | \ + (decode_table[input_ptr[1]] >> 4)) + +#define COMPOSE_OUTPUT_BYTE_1(input_ptr) \ + (uint8_t)((decode_table[input_ptr[1]] << 4) | \ + (decode_table[input_ptr[2]] >> 2)) + +#define COMPOSE_OUTPUT_BYTE_2(input_ptr) \ + (uint8_t)((decode_table[input_ptr[2]] << 6) | decode_table[input_ptr[3]]) + +bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx) { + size_t input_tail; + + if (ctx->input_cur > ctx->input_end || ctx->output_cur > ctx->output_end) { + return false; + } + + // Process a block of 4 input characters and 3 output bytes + while (ctx->input_end >= ctx->input_cur + 4 && + ctx->output_end >= ctx->output_cur + 3) { + if (!input_is_valid(ctx->input_cur, 4)) return false; + ctx->output_cur[0] = COMPOSE_OUTPUT_BYTE_0(ctx->input_cur); + ctx->output_cur[1] = COMPOSE_OUTPUT_BYTE_1(ctx->input_cur); + ctx->output_cur[2] = COMPOSE_OUTPUT_BYTE_2(ctx->input_cur); + ctx->output_cur += 3; + ctx->input_cur += 4; + } + + // Process the tail of input data + input_tail = (size_t)(ctx->input_end - ctx->input_cur); + if (input_tail == 4) { + // Process the input data with pad chars + if (ctx->input_cur[3] == '=') { + if (ctx->input_cur[2] == '=' && ctx->output_end >= ctx->output_cur + 1) { + if (!input_is_valid(ctx->input_cur, 2)) return false; + *(ctx->output_cur++) = COMPOSE_OUTPUT_BYTE_0(ctx->input_cur); + ctx->input_cur += 4; + } else if (ctx->output_end >= ctx->output_cur + 2) { + if (!input_is_valid(ctx->input_cur, 3)) return false; + *(ctx->output_cur++) = COMPOSE_OUTPUT_BYTE_0(ctx->input_cur); + *(ctx->output_cur++) = COMPOSE_OUTPUT_BYTE_1(ctx->input_cur); + ; + ctx->input_cur += 4; + } + } + + } else if (ctx->contains_tail && input_tail > 1) { + // Process the input data without pad chars, but constains_tail is set + if (ctx->output_end >= ctx->output_cur + tail_xtra[input_tail]) { + if (!input_is_valid(ctx->input_cur, input_tail)) return false; + switch (input_tail) { + case 3: + ctx->output_cur[1] = COMPOSE_OUTPUT_BYTE_1(ctx->input_cur); + case 2: + ctx->output_cur[0] = COMPOSE_OUTPUT_BYTE_0(ctx->input_cur); + } + ctx->output_cur += tail_xtra[input_tail]; + ctx->input_cur += input_tail; + } + } + + return true; +} + +gpr_slice grpc_chttp2_base64_decode(gpr_slice input) { + size_t input_length = GPR_SLICE_LENGTH(input); + size_t output_length = input_length / 4 * 3; + struct grpc_base64_decode_context ctx; + gpr_slice output; + + if (input_length % 4 != 0) { + gpr_log(GPR_ERROR, + "Base64 decoding failed, input of " + "grpc_chttp2_base64_decode has a length of %d, which is not a " + "multiple of 4.\n", + (int)input_length); + return gpr_empty_slice(); + } + + if (input_length > 0) { + uint8_t *input_end = GPR_SLICE_END_PTR(input); + if (*(--input_end) == '=') { + output_length--; + if (*(--input_end) == '=') { + output_length--; + } + } + } + output = gpr_slice_malloc(output_length); + + ctx.input_cur = GPR_SLICE_START_PTR(input); + ctx.input_end = GPR_SLICE_END_PTR(input); + ctx.output_cur = GPR_SLICE_START_PTR(output); + ctx.output_end = GPR_SLICE_END_PTR(output); + ctx.contains_tail = false; + + if (!grpc_base64_decode_partial(&ctx)) { + char *s = gpr_dump_slice(input, GPR_DUMP_ASCII); + gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s); + gpr_free(s); + gpr_slice_unref(output); + return gpr_empty_slice(); + } + GPR_ASSERT(ctx.output_cur == GPR_SLICE_END_PTR(output)); + GPR_ASSERT(ctx.input_cur == GPR_SLICE_END_PTR(input)); + return output; +} + +gpr_slice grpc_chttp2_base64_decode_with_length(gpr_slice input, + size_t output_length) { + size_t input_length = GPR_SLICE_LENGTH(input); + gpr_slice output = gpr_slice_malloc(output_length); + struct grpc_base64_decode_context ctx; + + // The length of a base64 string cannot be 4 * n + 1 + if (input_length % 4 == 1) { + gpr_log(GPR_ERROR, + "Base64 decoding failed, input of " + "grpc_chttp2_base64_decode_with_length has a length of %d, which " + "has a tail of 1 byte.\n", + (int)input_length); + gpr_slice_unref(output); + return gpr_empty_slice(); + } + + if (output_length > input_length / 4 * 3 + tail_xtra[input_length % 4]) { + gpr_log(GPR_ERROR, + "Base64 decoding failed, output_length %d is longer " + "than the max possible output length %d.\n", + (int)output_length, + (int)(input_length / 4 * 3 + tail_xtra[input_length % 4])); + gpr_slice_unref(output); + return gpr_empty_slice(); + } + + ctx.input_cur = GPR_SLICE_START_PTR(input); + ctx.input_end = GPR_SLICE_END_PTR(input); + ctx.output_cur = GPR_SLICE_START_PTR(output); + ctx.output_end = GPR_SLICE_END_PTR(output); + ctx.contains_tail = true; + + if (!grpc_base64_decode_partial(&ctx)) { + char *s = gpr_dump_slice(input, GPR_DUMP_ASCII); + gpr_log(GPR_ERROR, "Base64 decoding failed, input string:\n%s\n", s); + gpr_free(s); + gpr_slice_unref(output); + return gpr_empty_slice(); + } + GPR_ASSERT(ctx.output_cur == GPR_SLICE_END_PTR(output)); + GPR_ASSERT(ctx.input_cur <= GPR_SLICE_END_PTR(input)); + return output; +} diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.h b/src/core/ext/transport/chttp2/transport/bin_decoder.h new file mode 100644 index 0000000000..b9d40c9b74 --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.h @@ -0,0 +1,66 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H + +#include <grpc/support/slice.h> +#include <stdbool.h> + +struct grpc_base64_decode_context { + /* input/output: */ + uint8_t *input_cur; + uint8_t *input_end; + uint8_t *output_cur; + uint8_t *output_end; + /* Indicate if the decoder should handle the tail of input data*/ + bool contains_tail; +}; + +/* base64 decode a grpc_base64_decode_context util either input_end is reached + or output_end is reached. When input_end is reached, (input_end - input_cur) + is less than 4. When output_end is reached, (output_end - output_cur) is less + than 3. Returns false if decoding is failed. */ +bool grpc_base64_decode_partial(struct grpc_base64_decode_context *ctx); + +/* base64 decode a slice with pad chars. Returns a new slice, does not take + ownership of the input. Returns an empty slice if decoding is failed. */ +gpr_slice grpc_chttp2_base64_decode(gpr_slice input); + +/* base64 decode a slice without pad chars, data length is needed. Returns a new + slice, does not take ownership of the input. Returns an empty slice if + decoding is failed. */ +gpr_slice grpc_chttp2_base64_decode_with_length(gpr_slice input, + size_t output_length); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H */ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 5264b9e1f8..9aa39ba26c 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -47,6 +47,7 @@ #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/status_conversion.h" #include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" +#include "src/core/lib/http/parser.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" @@ -105,7 +106,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status); + grpc_status_code status, + gpr_slice *optional_message); static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, @@ -161,6 +163,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, GPR_ASSERT(t->ep == NULL); + gpr_slice_unref(t->optional_drop_message); + gpr_slice_buffer_destroy(&t->global.qbuf); gpr_slice_buffer_destroy(&t->writing.outbuf); @@ -190,8 +194,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, and maybe they hold resources that need to be freed */ while (t->global.pings.next != &t->global.pings) { grpc_chttp2_outstanding_ping *ping = t->global.pings.next; - grpc_exec_ctx_push(exec_ctx, ping->on_recv, - GRPC_ERROR_CREATE("Transport closed"), NULL); + grpc_exec_ctx_sched(exec_ctx, ping->on_recv, + GRPC_ERROR_CREATE("Transport closed"), NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -261,6 +265,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->parsing.deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->writing.is_client = is_client; + t->optional_drop_message = gpr_empty_slice(); grpc_connectivity_state_init( &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, is_client ? "client_transport" : "server_transport"); @@ -444,7 +449,7 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!t->closed) { t->closed = 1; - connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE, + connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); if (t->ep) { allow_endpoint_shutdown_locked(exec_ctx, t); @@ -643,7 +648,7 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, t->executor.writing_active = 1; REF_TRANSPORT(t, "writing"); prevent_endpoint_shutdown(t); - grpc_exec_ctx_push(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); } check_read_ops(exec_ctx, &t->global); @@ -807,8 +812,10 @@ void grpc_chttp2_add_incoming_goaway( gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg)); gpr_slice_unref(goaway_text); transport_global->seen_goaway = 1; + /* lie: use transient failure from the transport to indicate goaway has been + * received */ connectivity_state_set( - exec_ctx, transport_global, GRPC_CHANNEL_FATAL_FAILURE, + exec_ctx, transport_global, GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_set_str( grpc_error_set_int(GRPC_ERROR_CREATE("GOAWAY received"), GRPC_ERROR_INT_HTTP2_ERROR, @@ -869,7 +876,7 @@ static void maybe_start_some_streams( grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_UNAVAILABLE); + GRPC_STATUS_UNAVAILABLE, NULL); } } @@ -907,7 +914,7 @@ void grpc_chttp2_complete_closure_step( stream_global->collecting_stats); stream_global->collecting_stats = NULL; } - grpc_exec_ctx_push(exec_ctx, closure, closure->error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, closure->error, NULL); } *pclosure = NULL; } @@ -953,7 +960,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, if (op->cancel_with_status != GRPC_STATUS_OK) { cancel_from_api(exec_ctx, transport_global, stream_global, - op->cancel_with_status); + op->cancel_with_status, op->optional_close_message); } if (op->close_with_status != GRPC_STATUS_OK) { @@ -974,10 +981,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, if (metadata_size > metadata_peer_limit) { gpr_log(GPR_DEBUG, "to-be-sent initial metadata size exceeds peer limit " - "(%lu vs. %lu)", + "(%" PRIuPTR " vs. %" PRIuPTR ")", metadata_size, metadata_peer_limit); cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); } else { if (contains_non_ok_status(transport_global, op->send_initial_metadata)) { stream_global->seen_error = true; @@ -1033,10 +1040,10 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, if (metadata_size > metadata_peer_limit) { gpr_log(GPR_DEBUG, "to-be-sent trailing metadata size exceeds peer limit " - "(%lu vs. %lu)", + "(%" PRIuPTR " vs. %" PRIuPTR ")", metadata_size, metadata_peer_limit); cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); } else { if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) { @@ -1127,7 +1134,7 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_push(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -1160,7 +1167,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, return; } - grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( @@ -1228,14 +1235,14 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, } if (stream_global->exceeded_metadata_size) { cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); } } grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_initial_metadata, stream_global->recv_initial_metadata); - grpc_exec_ctx_push(exec_ctx, stream_global->recv_initial_metadata_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_initial_metadata_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { @@ -1248,13 +1255,13 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames); GPR_ASSERT(*stream_global->recv_message != NULL); - grpc_exec_ctx_push(exec_ctx, stream_global->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; } else if (stream_global->published_trailing_metadata) { *stream_global->recv_message = NULL; - grpc_exec_ctx_push(exec_ctx, stream_global->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; } } @@ -1267,7 +1274,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, } if (stream_global->exceeded_metadata_size) { cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED); + GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); } } if (stream_global->all_incoming_byte_streams_finished) { @@ -1334,7 +1341,8 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status) { + grpc_status_code status, + gpr_slice *optional_message) { if (!stream_global->read_closed || !stream_global->write_closed) { if (stream_global->id != 0) { gpr_slice_buffer_add( @@ -1344,8 +1352,12 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status), &stream_global->stats.outgoing)); } + + if (optional_message) { + gpr_slice_ref(*optional_message); + } grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, - NULL); + optional_message); } if (status != GRPC_STATUS_OK && !stream_global->seen_error) { stream_global->seen_error = true; @@ -1465,93 +1477,95 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, GPR_ASSERT(status >= 0 && (int)status < 100); - GPR_ASSERT(stream_global->id != 0); - - /* Hand roll a header block. - This is unnecessarily ugly - at some point we should find a more elegant - solution. - It's complicated by the fact that our send machinery would be dead by the - time we got around to sending this, so instead we ignore HPACK compression - and just write the uncompressed bytes onto the wire. */ - status_hdr = gpr_slice_malloc(15 + (status >= 10)); - p = GPR_SLICE_START_PTR(status_hdr); - *p++ = 0x40; /* literal header */ - *p++ = 11; /* len(grpc-status) */ - *p++ = 'g'; - *p++ = 'r'; - *p++ = 'p'; - *p++ = 'c'; - *p++ = '-'; - *p++ = 's'; - *p++ = 't'; - *p++ = 'a'; - *p++ = 't'; - *p++ = 'u'; - *p++ = 's'; - if (status < 10) { - *p++ = 1; - *p++ = (uint8_t)('0' + status); - } else { - *p++ = 2; - *p++ = (uint8_t)('0' + (status / 10)); - *p++ = (uint8_t)('0' + (status % 10)); - } - GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr)); - len += (uint32_t)GPR_SLICE_LENGTH(status_hdr); - - if (optional_message) { - GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127); - message_pfx = gpr_slice_malloc(15); - p = GPR_SLICE_START_PTR(message_pfx); - *p++ = 0x40; - *p++ = 12; /* len(grpc-message) */ + if (stream_global->id != 0 && !transport_global->is_client) { + /* Hand roll a header block. + This is unnecessarily ugly - at some point we should find a more elegant + solution. + It's complicated by the fact that our send machinery would be dead by the + time we got around to sending this, so instead we ignore HPACK + compression + and just write the uncompressed bytes onto the wire. */ + status_hdr = gpr_slice_malloc(15 + (status >= 10)); + p = GPR_SLICE_START_PTR(status_hdr); + *p++ = 0x40; /* literal header */ + *p++ = 11; /* len(grpc-status) */ *p++ = 'g'; *p++ = 'r'; *p++ = 'p'; *p++ = 'c'; *p++ = '-'; - *p++ = 'm'; - *p++ = 'e'; - *p++ = 's'; *p++ = 's'; + *p++ = 't'; *p++ = 'a'; - *p++ = 'g'; - *p++ = 'e'; - *p++ = (uint8_t)GPR_SLICE_LENGTH(*optional_message); - GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx)); - len += (uint32_t)GPR_SLICE_LENGTH(message_pfx); - len += (uint32_t)GPR_SLICE_LENGTH(*optional_message); - } - - hdr = gpr_slice_malloc(9); - p = GPR_SLICE_START_PTR(hdr); - *p++ = (uint8_t)(len >> 16); - *p++ = (uint8_t)(len >> 8); - *p++ = (uint8_t)(len); - *p++ = GRPC_CHTTP2_FRAME_HEADER; - *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS; - *p++ = (uint8_t)(stream_global->id >> 24); - *p++ = (uint8_t)(stream_global->id >> 16); - *p++ = (uint8_t)(stream_global->id >> 8); - *p++ = (uint8_t)(stream_global->id); - GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr)); - - gpr_slice_buffer_add(&transport_global->qbuf, hdr); - gpr_slice_buffer_add(&transport_global->qbuf, status_hdr); - if (optional_message) { - gpr_slice_buffer_add(&transport_global->qbuf, message_pfx); - gpr_slice_buffer_add(&transport_global->qbuf, - gpr_slice_ref(*optional_message)); - } + *p++ = 't'; + *p++ = 'u'; + *p++ = 's'; + if (status < 10) { + *p++ = 1; + *p++ = (uint8_t)('0' + status); + } else { + *p++ = 2; + *p++ = (uint8_t)('0' + (status / 10)); + *p++ = (uint8_t)('0' + (status % 10)); + } + GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr)); + len += (uint32_t)GPR_SLICE_LENGTH(status_hdr); + + if (optional_message) { + GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127); + message_pfx = gpr_slice_malloc(15); + p = GPR_SLICE_START_PTR(message_pfx); + *p++ = 0x40; + *p++ = 12; /* len(grpc-message) */ + *p++ = 'g'; + *p++ = 'r'; + *p++ = 'p'; + *p++ = 'c'; + *p++ = '-'; + *p++ = 'm'; + *p++ = 'e'; + *p++ = 's'; + *p++ = 's'; + *p++ = 'a'; + *p++ = 'g'; + *p++ = 'e'; + *p++ = (uint8_t)GPR_SLICE_LENGTH(*optional_message); + GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx)); + len += (uint32_t)GPR_SLICE_LENGTH(message_pfx); + len += (uint32_t)GPR_SLICE_LENGTH(*optional_message); + } - gpr_slice_buffer_add( - &transport_global->qbuf, - grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR, - &stream_global->stats.outgoing)); + hdr = gpr_slice_malloc(9); + p = GPR_SLICE_START_PTR(hdr); + *p++ = (uint8_t)(len >> 16); + *p++ = (uint8_t)(len >> 8); + *p++ = (uint8_t)(len); + *p++ = GRPC_CHTTP2_FRAME_HEADER; + *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS; + *p++ = (uint8_t)(stream_global->id >> 24); + *p++ = (uint8_t)(stream_global->id >> 16); + *p++ = (uint8_t)(stream_global->id >> 8); + *p++ = (uint8_t)(stream_global->id); + GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr)); + + gpr_slice_buffer_add(&transport_global->qbuf, hdr); + gpr_slice_buffer_add(&transport_global->qbuf, status_hdr); + if (optional_message) { + gpr_slice_buffer_add(&transport_global->qbuf, message_pfx); + gpr_slice_buffer_add(&transport_global->qbuf, + gpr_slice_ref(*optional_message)); + } - if (optional_message) { - gpr_slice_ref(*optional_message); + gpr_slice_buffer_add( + &transport_global->qbuf, + grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR, + &stream_global->stats.outgoing)); + + if (optional_message) { + gpr_slice_ref(*optional_message); + } } + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, optional_message); grpc_error *err = GRPC_ERROR_CREATE("Stream closed"); @@ -1569,8 +1583,12 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) { + grpc_chttp2_transport *transport = TRANSPORT_FROM_GLOBAL(transport_global); cancel_from_api(user_data, transport_global, stream_global, - GRPC_STATUS_UNAVAILABLE); + GRPC_STATUS_UNAVAILABLE, + GPR_SLICE_IS_EMPTY(transport->optional_drop_message) + ? NULL + : &transport->optional_drop_message); } static void end_all_the_calls(grpc_exec_ctx *exec_ctx, @@ -1643,24 +1661,64 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); grpc_chttp2_prepare_to_read(transport_global, transport_parsing); - grpc_exec_ctx_push(exec_ctx, &t->parsing_action, error, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL); } else { post_reading_action_locked(exec_ctx, t, s_unused, arg); } } +static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t) { + grpc_http_parser parser; + size_t i = 0; + grpc_error *error = GRPC_ERROR_NONE; + grpc_http_response response; + memset(&response, 0, sizeof(response)); + + grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response); + + grpc_error *parse_error = GRPC_ERROR_NONE; + for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) { + parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i]); + } + if (parse_error == GRPC_ERROR_NONE && + (parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) { + error = grpc_error_set_int( + GRPC_ERROR_CREATE("Trying to connect an http1.x server"), + GRPC_ERROR_INT_HTTP_STATUS, response.status); + } + GRPC_ERROR_UNREF(parse_error); + + grpc_http_parser_destroy(&parser); + grpc_http_response_destroy(&response); + return error; +} + static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_chttp2_transport *t = arg; GPR_TIMER_BEGIN("reading_action.parse", 0); size_t i = 0; - grpc_error *errors[2] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE}; + grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, + GRPC_ERROR_NONE}; for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) { errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing, t->read_buffer.slices[i]); }; + if (i != t->read_buffer.count) { + gpr_slice_unref(t->optional_drop_message); + errors[2] = try_http_parsing(exec_ctx, t); + if (errors[2] != GRPC_ERROR_NONE) { + t->optional_drop_message = gpr_slice_from_copied_string( + "Connection dropped: received http1.x response"); + } else { + t->optional_drop_message = gpr_slice_from_copied_string( + "Connection dropped: received unparseable response"); + } + } grpc_error *err = - errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE + errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE && + errors[2] == GRPC_ERROR_NONE ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors, GPR_ARRAY_SIZE(errors)); @@ -1794,6 +1852,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, add_to_pollset_locked, pollset, 0); } +static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset_set *pollset_set) { + grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt, + (grpc_chttp2_stream *)gs, + add_to_pollset_set_locked, pollset_set, 0); +} + /******************************************************************************* * BYTE STREAM */ @@ -1870,10 +1935,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, } if (bs->slices.count > 0) { *arg->slice = gpr_slice_buffer_take_first(&bs->slices); - grpc_exec_ctx_push(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); } else if (bs->error != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), - NULL); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), + NULL); } else { bs->on_next = arg->on_complete; bs->next = arg->slice; @@ -1930,7 +1995,7 @@ static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream; if (bs->on_next != NULL) { *bs->next = arg->slice; - grpc_exec_ctx_push(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); bs->on_next = NULL; } else { gpr_slice_buffer_add(&bs->slices, arg->slice); @@ -1968,7 +2033,7 @@ static void incoming_byte_stream_finished_failed_locked( grpc_chttp2_incoming_byte_stream *bs = a->bs; grpc_error *error = a->error; gpr_free(a); - grpc_exec_ctx_push(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); bs->on_next = NULL; GRPC_ERROR_UNREF(bs->error); bs->error = error; @@ -2045,10 +2110,13 @@ static char *format_flowctl_context_var(const char *context, const char *var, int64_t val, uint32_t id, char **scope) { char *underscore_pos; + char *buf; char *result; if (context == NULL) { *scope = NULL; - gpr_asprintf(&result, "%s(%lld)", var, val); + gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val); + result = gpr_leftpad(buf, ' ', 40); + gpr_free(buf); return result; } underscore_pos = strchr(context, '_'); @@ -2059,7 +2127,9 @@ static char *format_flowctl_context_var(const char *context, const char *var, gpr_asprintf(scope, "%s[%d]", tmp, id); gpr_free(tmp); } - gpr_asprintf(&result, "%s.%s(%lld)", underscore_pos + 1, var, val); + gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val); + result = gpr_leftpad(buf, ' ', 40); + gpr_free(buf); return result; } @@ -2080,6 +2150,8 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, uint32_t stream_id, int64_t val1, int64_t val2) { char *scope1; char *scope2; + char *tmp_phase; + char *tmp_scope1; char *label1 = format_flowctl_context_var(context1, var1, val1, stream_id, &scope1); char *label2 = @@ -2087,14 +2159,18 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, char *clisvr = is_client ? "client" : "server"; char *prefix; - gpr_asprintf(&prefix, "FLOW % 8s: %s % 11s ", phase, clisvr, scope1); + tmp_phase = gpr_leftpad(phase, ' ', 8); + tmp_scope1 = gpr_leftpad(scope1, ' ', 11); + gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1); + gpr_free(tmp_phase); + gpr_free(tmp_scope1); switch (op) { case GRPC_CHTTP2_FLOWCTL_MOVE: GPR_ASSERT(samestr(scope1, scope2)); if (val2 != 0) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "%sMOVE % 40s <- % 40s giving %d", prefix, label1, label2, + "%sMOVE %s <- %s giving %" PRId64, prefix, label1, label2, val1 + val2); } break; @@ -2102,7 +2178,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, GPR_ASSERT(val2 >= 0); if (val2 != 0) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "%sCREDIT % 40s by % 40s giving %d", prefix, label1, label2, + "%sCREDIT %s by %s giving %" PRId64, prefix, label1, label2, val1 + val2); } break; @@ -2110,7 +2186,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, GPR_ASSERT(val2 >= 0); if (val2 != 0) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "%sDEBIT % 40s by % 40s giving %d", prefix, label1, label2, + "%sDEBIT %s by %s giving %" PRId64, prefix, label1, label2, val1 - val2); } break; @@ -2135,6 +2211,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), "chttp2", init_stream, set_pollset, + set_pollset_set, perform_stream_op, perform_transport_op, destroy_stream, diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 7e5d58380e..54e72ebd24 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -383,6 +383,9 @@ struct grpc_chttp2_transport { /** Transport op to be applied post-parsing */ grpc_transport_op *post_parsing_op; + + /** Message explaining the reason of dropping connection */ + gpr_slice optional_drop_message; }; typedef struct { diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 72b3131d7b..785134091f 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -559,7 +559,7 @@ static grpc_error *update_incoming_window( uint32_t incoming_frame_size = transport_parsing->incoming_frame_size; if (incoming_frame_size > transport_parsing->incoming_window) { char *msg; - gpr_asprintf(&msg, "frame of size %d overflows incoming window of %d", + gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64, transport_parsing->incoming_frame_size, transport_parsing->incoming_window); grpc_error *err = GRPC_ERROR_CREATE(msg); @@ -569,7 +569,7 @@ static grpc_error *update_incoming_window( if (incoming_frame_size > stream_parsing->incoming_window) { char *msg; - gpr_asprintf(&msg, "frame of size %d overflows incoming window of %d", + gpr_asprintf(&msg, "frame of size %d overflows incoming window of %" PRId64, transport_parsing->incoming_frame_size, stream_parsing->incoming_window); grpc_error *err = GRPC_ERROR_CREATE(msg); @@ -678,7 +678,8 @@ static void on_initial_header(void *tp, grpc_mdelem *md) { if (new_size > metadata_size_limit) { if (!stream_parsing->exceeded_metadata_size) { gpr_log(GPR_DEBUG, - "received initial metadata size exceeds limit (%lu vs. %lu)", + "received initial metadata size exceeds limit (%" PRIuPTR + " vs. %" PRIuPTR ")", new_size, metadata_size_limit); stream_parsing->seen_error = true; stream_parsing->exceeded_metadata_size = true; @@ -724,7 +725,8 @@ static void on_trailing_header(void *tp, grpc_mdelem *md) { if (new_size > metadata_size_limit) { if (!stream_parsing->exceeded_metadata_size) { gpr_log(GPR_DEBUG, - "received trailing metadata size exceeds limit (%lu vs. %lu)", + "received trailing metadata size exceeds limit (%" PRIuPTR + " vs. %" PRIuPTR ")", new_size, metadata_size_limit); stream_parsing->seen_error = true; stream_parsing->exceeded_metadata_size = true; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index ca56debc84..add7678182 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -187,8 +187,8 @@ void grpc_chttp2_perform_writes( grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, &transport_writing->done_cb); } else { - grpc_exec_ctx_push(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, + NULL); } } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index ea0f2bcba4..25d8aca250 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -152,14 +152,18 @@ static void next_recv_step(stream_obj *s, enum e_caller caller); static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_pollset *pollset) {} +static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, + grpc_transport *gt, grpc_stream *gs, + grpc_pollset_set *pollset_set) {} + static void enqueue_callbacks(grpc_closure *callback_list[]) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (callback_list[0]) { - grpc_exec_ctx_push(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); callback_list[0] = NULL; } if (callback_list[1]) { - grpc_exec_ctx_push(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL); callback_list[1] = NULL; } grpc_exec_ctx_finish(&exec_ctx); @@ -218,8 +222,11 @@ static void on_write_completed(cronet_bidirectional_stream *stream, static void process_recv_message(stream_obj *s, const uint8_t *recv_data) { gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes); uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); - memcpy(dst_p, recv_data, (size_t)s->total_read_bytes); - gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice); + if (s->total_read_bytes > 0) { + // Only copy if there is non-zero number of bytes + memcpy(dst_p, recv_data, (size_t)s->total_read_bytes); + gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice); + } grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0); *s->recv_message = (grpc_byte_buffer *)&s->sbs; } @@ -347,8 +354,17 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) { if (grpc_cronet_trace) { gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); } - cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, - s->remaining_read_bytes); + if (s->remaining_read_bytes > 0) { + cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, + s->remaining_read_bytes); + } else { + // Calling the closing callback directly since this is a 0 byte read + // for an empty message. + process_recv_message(s, NULL); + enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); + invoke_closing_callback(s); + set_recv_state(s, CRONET_RECV_CLOSED); + } } } break; @@ -634,7 +650,13 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { } } -const grpc_transport_vtable grpc_cronet_vtable = { - sizeof(stream_obj), "cronet_http", init_stream, - set_pollset_do_nothing, perform_stream_op, NULL, - destroy_stream, destroy_transport, NULL}; +const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), + "cronet_http", + init_stream, + set_pollset_do_nothing, + set_pollset_set_do_nothing, + perform_stream_op, + NULL, + destroy_stream, + destroy_transport, + NULL}; |