diff options
author | David Garcia Quintas <dgq@google.com> | 2016-05-31 13:59:37 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-05-31 13:59:37 -0700 |
commit | 8bec6f6a55afaa1e321f29bf816d2a010f510b3c (patch) | |
tree | 525e06b564b9d23d8340e25e7ddb9915fec0b1e5 /src/core/ext/transport | |
parent | 582f4350ed755aac0b07f12b499ad18f86f2a1b7 (diff) | |
parent | 0d6196025e62aea5aabc6341459f2c370e264230 (diff) |
Merge branch 'master' of github.com:grpc/grpc into lb_pollset_propagation
Diffstat (limited to 'src/core/ext/transport')
6 files changed, 72 insertions, 56 deletions
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 58af6f995a..a262306085 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -45,9 +45,9 @@ #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/tcp_client.h" -#include "src/core/lib/security/auth_filters.h" -#include "src/core/lib/security/credentials.h" -#include "src/core/lib/security/security_context.h" +#include "src/core/lib/security/context/security_context.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/transport/auth_filters.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/tsi/transport_security_interface.h" diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index e21fa2a072..0428bb1e3d 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -43,14 +43,8 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -static void setup_transport(grpc_exec_ctx *exec_ctx, void *server, - grpc_transport *transport) { - grpc_server_setup_transport(exec_ctx, server, transport, - grpc_server_get_channel_args(server)); -} - static void new_transport(grpc_exec_ctx *exec_ctx, void *server, - grpc_endpoint *tcp, + grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { /* * Beware that the call to grpc_create_chttp2_transport() has to happen before @@ -61,7 +55,8 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server, */ grpc_transport *transport = grpc_create_chttp2_transport( exec_ctx, grpc_server_get_channel_args(server), tcp, 0); - setup_transport(exec_ctx, server, transport); + grpc_server_setup_transport(exec_ctx, server, transport, accepting_pollset, + grpc_server_get_channel_args(server)); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 698b2bef61..ebbefbcd89 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -45,14 +45,14 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/tcp_server.h" -#include "src/core/lib/security/auth_filters.h" -#include "src/core/lib/security/credentials.h" -#include "src/core/lib/security/security_connector.h" -#include "src/core/lib/security/security_context.h" +#include "src/core/lib/security/context/security_context.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/transport/auth_filters.h" +#include "src/core/lib/security/transport/security_connector.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct grpc_server_secure_state { +typedef struct server_secure_state { grpc_server *server; grpc_tcp_server *tcp; grpc_server_security_connector *sc; @@ -62,13 +62,16 @@ typedef struct grpc_server_secure_state { gpr_refcount refcount; grpc_closure destroy_closure; grpc_closure *destroy_callback; -} grpc_server_secure_state; +} server_secure_state; -static void state_ref(grpc_server_secure_state *state) { - gpr_ref(&state->refcount); -} +typedef struct server_secure_connect { + server_secure_state *state; + grpc_pollset *accepting_pollset; +} server_secure_connect; + +static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); } -static void state_unref(grpc_server_secure_state *state) { +static void state_unref(server_secure_state *state) { if (gpr_unref(&state->refcount)) { /* ensure all threads have unlocked */ gpr_mu_lock(&state->mu); @@ -80,67 +83,66 @@ static void state_unref(grpc_server_secure_state *state) { } } -static void setup_transport(grpc_exec_ctx *exec_ctx, void *statep, - grpc_transport *transport, - grpc_auth_context *auth_context) { - grpc_server_secure_state *state = statep; - grpc_channel_args *args_copy; - grpc_arg args_to_add[2]; - args_to_add[0] = grpc_server_credentials_to_arg(state->creds); - args_to_add[1] = grpc_auth_context_to_arg(auth_context); - args_copy = grpc_channel_args_copy_and_add( - grpc_server_get_channel_args(state->server), args_to_add, - GPR_ARRAY_SIZE(args_to_add)); - grpc_server_setup_transport(exec_ctx, state->server, transport, args_copy); - grpc_channel_args_destroy(args_copy); -} - static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, grpc_security_status status, grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context) { - grpc_server_secure_state *state = statep; + server_secure_connect *state = statep; grpc_transport *transport; if (status == GRPC_SECURITY_OK) { if (secure_endpoint) { - gpr_mu_lock(&state->mu); - if (!state->is_shutdown) { + gpr_mu_lock(&state->state->mu); + if (!state->state->is_shutdown) { transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args(state->server), + exec_ctx, grpc_server_get_channel_args(state->state->server), secure_endpoint, 0); - setup_transport(exec_ctx, state, transport, auth_context); + grpc_channel_args *args_copy; + grpc_arg args_to_add[2]; + args_to_add[0] = grpc_server_credentials_to_arg(state->state->creds); + args_to_add[1] = grpc_auth_context_to_arg(auth_context); + args_copy = grpc_channel_args_copy_and_add( + grpc_server_get_channel_args(state->state->server), args_to_add, + GPR_ARRAY_SIZE(args_to_add)); + grpc_server_setup_transport(exec_ctx, state->state->server, transport, + state->accepting_pollset, args_copy); + grpc_channel_args_destroy(args_copy); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); } else { /* We need to consume this here, because the server may already have * gone away. */ grpc_endpoint_destroy(exec_ctx, secure_endpoint); } - gpr_mu_unlock(&state->mu); + gpr_mu_unlock(&state->state->mu); } } else { gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); } - state_unref(state); + state_unref(state->state); + gpr_free(state); } static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, + grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { - grpc_server_secure_state *state = statep; - state_ref(state); - grpc_server_security_connector_do_handshake( - exec_ctx, state->sc, acceptor, tcp, on_secure_handshake_done, state); + server_secure_connect *state = gpr_malloc(sizeof(*state)); + state->state = statep; + state_ref(state->state); + state->accepting_pollset = accepting_pollset; + grpc_server_security_connector_do_handshake(exec_ctx, state->state->sc, + acceptor, tcp, + on_secure_handshake_done, state); } /* Server callback: start listening on our ports */ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, grpc_pollset **pollsets, size_t pollset_count) { - grpc_server_secure_state *state = statep; + server_secure_state *state = statep; grpc_tcp_server_start(exec_ctx, state->tcp, pollsets, pollset_count, on_accept, state); } static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) { - grpc_server_secure_state *state = statep; + server_secure_state *state = statep; if (state->destroy_callback != NULL) { state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg, success); @@ -153,7 +155,7 @@ static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) { callbacks) */ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, grpc_closure *callback) { - grpc_server_secure_state *state = statep; + server_secure_state *state = statep; grpc_tcp_server *tcp; gpr_mu_lock(&state->mu); state->is_shutdown = 1; @@ -167,7 +169,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds) { grpc_resolved_addresses *resolved = NULL; grpc_tcp_server *tcp = NULL; - grpc_server_secure_state *state = NULL; + server_secure_state *state = NULL; size_t i; unsigned count = 0; int port_num = -1; diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.c b/src/core/ext/transport/chttp2/transport/frame_goaway.c index 69accb7696..827e7a6977 100644 --- a/src/core/ext/transport/chttp2/transport/frame_goaway.c +++ b/src/core/ext/transport/chttp2/transport/frame_goaway.c @@ -137,7 +137,8 @@ grpc_chttp2_parse_error grpc_chttp2_goaway_parser_parse( ++cur; /* fallthrough */ case GRPC_CHTTP2_GOAWAY_DEBUG: - memcpy(p->debug_data + p->debug_pos, cur, (size_t)(end - cur)); + if (end != cur) + memcpy(p->debug_data + p->debug_pos, cur, (size_t)(end - cur)); GPR_ASSERT((size_t)(end - cur) < UINT32_MAX - p->debug_pos); p->debug_pos += (uint32_t)(end - cur); p->state = GRPC_CHTTP2_GOAWAY_DEBUG; diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 687936bfd3..ed45bc9cb3 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1138,6 +1138,7 @@ static int parse_string_prefix(grpc_chttp2_hpack_parser *p, const uint8_t *cur, /* append some bytes to a string */ static void append_bytes(grpc_chttp2_hpack_parser_string *str, const uint8_t *data, size_t length) { + if (length == 0) return; if (length + str->length > str->capacity) { GPR_ASSERT(str->length + length <= UINT32_MAX); str->capacity = (uint32_t)(str->length + length); @@ -1445,6 +1446,11 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( stream id on a header */ if (stream_parsing != NULL) { if (parser->is_boundary) { + if (stream_parsing->header_frames_received == + GPR_ARRAY_SIZE(stream_parsing->got_metadata_on_parse)) { + gpr_log(GPR_ERROR, "too many trailer frames"); + return GRPC_CHTTP2_CONNECTION_ERROR; + } stream_parsing ->got_metadata_on_parse[stream_parsing->header_frames_received] = 1; stream_parsing->header_frames_received++; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index bebc6448e5..1bb53b756c 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -222,8 +222,11 @@ static void on_write_completed(cronet_bidirectional_stream *stream, static void process_recv_message(stream_obj *s, const uint8_t *recv_data) { gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes); uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); - memcpy(dst_p, recv_data, (size_t)s->total_read_bytes); - gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice); + if (s->total_read_bytes > 0) { + // Only copy if there is non-zero number of bytes + memcpy(dst_p, recv_data, (size_t)s->total_read_bytes); + gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice); + } grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0); *s->recv_message = (grpc_byte_buffer *)&s->sbs; } @@ -351,8 +354,17 @@ static void next_recv_step(stream_obj *s, enum e_caller caller) { if (grpc_cronet_trace) { gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); } - cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, - s->remaining_read_bytes); + if (s->remaining_read_bytes > 0) { + cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, + s->remaining_read_bytes); + } else { + // Calling the closing callback directly since this is a 0 byte read + // for an empty message. + process_recv_message(s, NULL); + enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); + invoke_closing_callback(s); + set_recv_state(s, CRONET_RECV_CLOSED); + } } } break; |