diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c | 3 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/client/secure/cronet_channel_create.c | 69 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_api_dummy.c | 85 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 640 | ||||
-rw-r--r-- | src/core/lib/http/parser.c.orig | 357 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.c | 18 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.h | 6 |
7 files changed, 1172 insertions, 6 deletions
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index a74c1a27eb..629959b7c0 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -178,6 +178,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_security_connector *sc = NULL; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *err = GRPC_ERROR_NONE; + grpc_error **errors = NULL; GRPC_API_TRACE( "grpc_server_add_secure_http2_port(" @@ -220,7 +221,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, gpr_mu_init(&state->mu); gpr_ref_init(&state->refcount, 1); - grpc_error **errors = gpr_malloc(sizeof(*errors) * resolved->naddrs); + errors = gpr_malloc(sizeof(*errors) * resolved->naddrs); for (i = 0; i < resolved->naddrs; i++) { errors[i] = grpc_tcp_server_add_port( tcp, (struct sockaddr *)&resolved->addrs[i].addr, diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c new file mode 100644 index 0000000000..df1acddcc0 --- /dev/null +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -0,0 +1,69 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/impl/codegen/port_platform.h> + +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/transport_impl.h" + +// Cronet transport object +typedef struct cronet_transport { + grpc_transport base; // must be first element in this structure + void *engine; + char *host; +} cronet_transport; + +extern grpc_transport_vtable grpc_cronet_vtable; + +GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( + void *engine, const char *target, const grpc_channel_args *args, + void *reserved) { + cronet_transport *ct = gpr_malloc(sizeof(cronet_transport)); + ct->base.vtable = &grpc_cronet_vtable; + ct->engine = engine; + ct->host = gpr_malloc(strlen(target) + 1); + strcpy(ct->host, target); + gpr_log(GPR_DEBUG, + "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine, + ct->host); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + return grpc_channel_create(&exec_ctx, target, args, + GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); +} diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c new file mode 100644 index 0000000000..687026c9fd --- /dev/null +++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c @@ -0,0 +1,85 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* This file has empty implementation of all the functions exposed by the cronet +library, so we can build it in all environments */ + +#include <stdbool.h> + +#include <grpc/support/log.h> + +#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" + +#ifdef GRPC_COMPILE_WITH_CRONET +/* link with the real CRONET library in the build system */ +#else +/* Dummy implementation of cronet API just to test for build-ability */ +cronet_bidirectional_stream* cronet_bidirectional_stream_create( + cronet_engine* engine, void* annotation, + cronet_bidirectional_stream_callback* callback) { + GPR_ASSERT(0); + return NULL; +} + +int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_start( + cronet_bidirectional_stream* stream, const char* url, int priority, + const char* method, const cronet_bidirectional_stream_header_array* headers, + bool end_of_stream) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream, + char* buffer, int capacity) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream, + const char* buffer, int count, + bool end_of_stream) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { + GPR_ASSERT(0); + return 0; +} + +#endif /* GRPC_COMPILE_WITH_CRONET */ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c new file mode 100644 index 0000000000..ea0f2bcba4 --- /dev/null +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -0,0 +1,640 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <string.h> + +#include <grpc/impl/codegen/port_platform.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + +#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/transport_impl.h" +#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" + +#define GRPC_HEADER_SIZE_IN_BYTES 5 + +// Global flag that gets set with GRPC_TRACE env variable +int grpc_cronet_trace = 1; + +// Cronet transport object +struct grpc_cronet_transport { + grpc_transport base; /* must be first element in this structure */ + cronet_engine *engine; + char *host; +}; + +typedef struct grpc_cronet_transport grpc_cronet_transport; + +enum send_state { + CRONET_SEND_IDLE = 0, + CRONET_REQ_STARTED, + CRONET_SEND_HEADER, + CRONET_WRITE, + CRONET_WRITE_COMPLETED, +}; + +enum recv_state { + CRONET_RECV_IDLE = 0, + CRONET_RECV_READ_LENGTH, + CRONET_RECV_READ_DATA, + CRONET_RECV_CLOSED, +}; + +static const char *recv_state_name[] = { + "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,", + "CRONET_RECV_CLOSED"}; + +// Enum that identifies calling function. +enum e_caller { + PERFORM_STREAM_OP, + ON_READ_COMPLETE, + ON_RESPONSE_HEADERS_RECEIVED, + ON_RESPONSE_TRAILERS_RECEIVED +}; + +enum callback_id { + CB_SEND_INITIAL_METADATA = 0, + CB_SEND_MESSAGE, + CB_SEND_TRAILING_METADATA, + CB_RECV_MESSAGE, + CB_RECV_INITIAL_METADATA, + CB_RECV_TRAILING_METADATA, + CB_NUM_CALLBACKS +}; + +struct stream_obj { + // we store received bytes here as they trickle in. + gpr_slice_buffer write_slice_buffer; + cronet_bidirectional_stream *cbs; + gpr_slice slice; + gpr_slice_buffer read_slice_buffer; + struct grpc_slice_buffer_stream sbs; + char *read_buffer; + int remaining_read_bytes; + int total_read_bytes; + + char *write_buffer; + size_t write_buffer_size; + + // Hold the URL + char *url; + + bool response_headers_received; + bool read_requested; + bool response_trailers_received; + bool read_closed; + + // Recv message stuff + grpc_byte_buffer **recv_message; + // Initial metadata stuff + grpc_metadata_batch *recv_initial_metadata; + // Trailing metadata stuff + grpc_metadata_batch *recv_trailing_metadata; + grpc_chttp2_incoming_metadata_buffer imb; + + // This mutex protects receive state machine execution + gpr_mu recv_mu; + // we can queue up up to 2 callbacks for each OP + grpc_closure *callback_list[CB_NUM_CALLBACKS][2]; + + // storage for header + cronet_bidirectional_stream_header *headers; + uint32_t num_headers; + cronet_bidirectional_stream_header_array header_array; + // state tracking + enum recv_state cronet_recv_state; + enum send_state cronet_send_state; +}; + +typedef struct stream_obj stream_obj; + +static void next_send_step(stream_obj *s); +static void next_recv_step(stream_obj *s, enum e_caller caller); + +static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset *pollset) {} + +static void enqueue_callbacks(grpc_closure *callback_list[]) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + if (callback_list[0]) { + grpc_exec_ctx_push(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); + callback_list[0] = NULL; + } + if (callback_list[1]) { + grpc_exec_ctx_push(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL); + callback_list[1] = NULL; + } + grpc_exec_ctx_finish(&exec_ctx); +} + +static void on_canceled(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_canceled %p", stream); + } +} + +static void on_failed(cronet_bidirectional_stream *stream, int net_error) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error); + } +} + +static void on_succeeded(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_succeeded %p", stream); + } +} + +static void on_response_trailers_received( + cronet_bidirectional_stream *stream, + const cronet_bidirectional_stream_header_array *trailers) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_response_trailers_received"); + } + stream_obj *s = (stream_obj *)stream->annotation; + + memset(&s->imb, 0, sizeof(s->imb)); + grpc_chttp2_incoming_metadata_buffer_init(&s->imb); + unsigned int i = 0; + for (i = 0; i < trailers->count; i++) { + grpc_chttp2_incoming_metadata_buffer_add( + &s->imb, grpc_mdelem_from_metadata_strings( + grpc_mdstr_from_string(trailers->headers[i].key), + grpc_mdstr_from_string(trailers->headers[i].value))); + } + s->response_trailers_received = true; + next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED); +} + +static void on_write_completed(cronet_bidirectional_stream *stream, + const char *data) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: on_write_completed"); + } + stream_obj *s = (stream_obj *)stream->annotation; + enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]); + s->cronet_send_state = CRONET_WRITE_COMPLETED; + next_send_step(s); +} + +static void process_recv_message(stream_obj *s, const uint8_t *recv_data) { + gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes); + uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); + memcpy(dst_p, recv_data, (size_t)s->total_read_bytes); + gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice); + grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0); + *s->recv_message = (grpc_byte_buffer *)&s->sbs; +} + +static int parse_grpc_header(const uint8_t *data) { + const uint8_t *p = data + 1; + int length = 0; + length |= ((uint8_t)*p++) << 24; + length |= ((uint8_t)*p++) << 16; + length |= ((uint8_t)*p++) << 8; + length |= ((uint8_t)*p++); + return length; +} + +static void on_read_completed(cronet_bidirectional_stream *stream, char *data, + int count) { + stream_obj *s = (stream_obj *)stream->annotation; + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d", + count, s->total_read_bytes, s->remaining_read_bytes); + } + if (count > 0) { + GPR_ASSERT(s->recv_message); + s->remaining_read_bytes -= count; + next_recv_step(s, ON_READ_COMPLETE); + } else { + s->read_closed = true; + next_recv_step(s, ON_READ_COMPLETE); + } +} + +static void on_response_headers_received( + cronet_bidirectional_stream *stream, + const cronet_bidirectional_stream_header_array *headers, + const char *negotiated_protocol) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_response_headers_received"); + } + stream_obj *s = (stream_obj *)stream->annotation; + enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]); + s->response_headers_received = true; + next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED); +} + +static void on_request_headers_sent(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: on_request_headers_sent"); + } + stream_obj *s = (stream_obj *)stream->annotation; + enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]); + s->cronet_send_state = CRONET_SEND_HEADER; + next_send_step(s); +} + +// Callback function pointers (invoked by cronet in response to events) +static cronet_bidirectional_stream_callback callbacks = { + on_request_headers_sent, + on_response_headers_received, + on_read_completed, + on_write_completed, + on_response_trailers_received, + on_succeeded, + on_failed, + on_canceled}; + +static void invoke_closing_callback(stream_obj *s) { + grpc_chttp2_incoming_metadata_buffer_publish(&s->imb, + s->recv_trailing_metadata); + if (s->callback_list[CB_RECV_TRAILING_METADATA]) { + enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]); + } +} + +static void set_recv_state(stream_obj *s, enum recv_state state) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]); + } + s->cronet_recv_state = state; +} + +// This is invoked from perform_stream_op, and all on_xxxx callbacks. +static void next_recv_step(stream_obj *s, enum e_caller caller) { + gpr_mu_lock(&s->recv_mu); + switch (s->cronet_recv_state) { + case CRONET_RECV_IDLE: + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE"); + } + if (caller == PERFORM_STREAM_OP || + caller == ON_RESPONSE_HEADERS_RECEIVED) { + if (s->read_closed && s->response_trailers_received) { + invoke_closing_callback(s); + set_recv_state(s, CRONET_RECV_CLOSED); + } else if (s->response_headers_received == true && + s->read_requested == true) { + set_recv_state(s, CRONET_RECV_READ_LENGTH); + s->total_read_bytes = s->remaining_read_bytes = + GRPC_HEADER_SIZE_IN_BYTES; + GPR_ASSERT(s->read_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + } + cronet_bidirectional_stream_read(s->cbs, s->read_buffer, + s->remaining_read_bytes); + } + } + break; + case CRONET_RECV_READ_LENGTH: + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH"); + } + if (caller == ON_READ_COMPLETE) { + if (s->read_closed) { + invoke_closing_callback(s); + enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); + set_recv_state(s, CRONET_RECV_CLOSED); + } else { + GPR_ASSERT(s->remaining_read_bytes == 0); + set_recv_state(s, CRONET_RECV_READ_DATA); + s->total_read_bytes = s->remaining_read_bytes = + parse_grpc_header((const uint8_t *)s->read_buffer); + s->read_buffer = + gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes); + GPR_ASSERT(s->read_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + } + cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, + s->remaining_read_bytes); + } + } + break; + case CRONET_RECV_READ_DATA: + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA"); + } + if (caller == ON_READ_COMPLETE) { + if (s->remaining_read_bytes > 0) { + int offset = s->total_read_bytes - s->remaining_read_bytes; + GPR_ASSERT(s->read_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + } + cronet_bidirectional_stream_read( + s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes); + } else { + gpr_slice_buffer_init(&s->read_slice_buffer); + uint8_t *p = (uint8_t *)s->read_buffer; + process_recv_message(s, p); + set_recv_state(s, CRONET_RECV_IDLE); + enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); + } + } + break; + case CRONET_RECV_CLOSED: + break; + default: + GPR_ASSERT(0); // Should not reach here + break; + } + gpr_mu_unlock(&s->recv_mu); +} + +// This function takes the data from s->write_slice_buffer and assembles into +// a contiguous byte stream with 5 byte gRPC header prepended. +static void create_grpc_frame(stream_obj *s) { + gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer); + uint8_t *raw_data = GPR_SLICE_START_PTR(slice); + size_t length = GPR_SLICE_LENGTH(slice); + s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; + s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size); + uint8_t *p = (uint8_t *)s->write_buffer; + // Append 5 byte header + *p++ = 0; + *p++ = (uint8_t)(length >> 24); + *p++ = (uint8_t)(length >> 16); + *p++ = (uint8_t)(length >> 8); + *p++ = (uint8_t)(length); + // append actual data + memcpy(p, raw_data, length); +} + +static void do_write(stream_obj *s) { + gpr_slice_buffer *sb = &s->write_slice_buffer; + GPR_ASSERT(sb->count <= 1); + if (sb->count > 0) { + create_grpc_frame(s); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + } + cronet_bidirectional_stream_write(s->cbs, s->write_buffer, + (int)s->write_buffer_size, false); + } +} + +// +static void next_send_step(stream_obj *s) { + switch (s->cronet_send_state) { + case CRONET_SEND_IDLE: + GPR_ASSERT( + s->cbs); // cronet_bidirectional_stream is not initialized yet. + s->cronet_send_state = CRONET_REQ_STARTED; + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url); + } + cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST", + &s->header_array, false); + // we no longer need the memory that was allocated earlier. + gpr_free(s->header_array.headers); + break; + case CRONET_SEND_HEADER: + do_write(s); + s->cronet_send_state = CRONET_WRITE; + break; + case CRONET_WRITE_COMPLETED: + do_write(s); + break; + default: + GPR_ASSERT(0); + break; + } +} + +static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head, + const char *host, + stream_obj *s) { + grpc_linked_mdelem *curr = head; + // Walk the linked list and get number of header fields + uint32_t num_headers_available = 0; + while (curr != NULL) { + curr = curr->next; + num_headers_available++; + } + // Allocate enough memory + s->headers = (cronet_bidirectional_stream_header *)gpr_malloc( + sizeof(cronet_bidirectional_stream_header) * num_headers_available); + + // Walk the linked list again, this time copying the header fields. + // s->num_headers + // can be less than num_headers_available, as some headers are not used for + // cronet + curr = head; + s->num_headers = 0; + while (s->num_headers < num_headers_available) { + grpc_mdelem *mdelem = curr->md; + curr = curr->next; + const char *key = grpc_mdstr_as_c_string(mdelem->key); + const char *value = grpc_mdstr_as_c_string(mdelem->value); + if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 || + strcmp(key, ":authority") == 0) { + // Cronet populates these fields on its own. + continue; + } + if (strcmp(key, ":path") == 0) { + // Create URL by appending :path value to the hostname + gpr_asprintf(&s->url, "https://%s%s", host, value); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "extracted URL = %s", s->url); + } + continue; + } + s->headers[s->num_headers].key = key; + s->headers[s->num_headers].value = value; + s->num_headers++; + if (curr == NULL) { + break; + } + } +} + +static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_transport_stream_op *op) { + grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; + GPR_ASSERT(ct->engine); + stream_obj *s = (stream_obj *)gs; + if (op->recv_trailing_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, + "perform_stream_op - recv_trailing_metadata: on_complete=%p", + op->on_complete); + } + s->recv_trailing_metadata = op->recv_trailing_metadata; + GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]); + s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete; + } + if (op->recv_message) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p", + op->on_complete); + } + s->recv_message = (grpc_byte_buffer **)op->recv_message; + GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]); + GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]); + s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready; + s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete; + s->read_requested = true; + next_recv_step(s, PERFORM_STREAM_OP); + } + if (op->recv_initial_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p", + op->on_complete); + } + s->recv_initial_metadata = op->recv_initial_metadata; + GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]); + GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]); + s->callback_list[CB_RECV_INITIAL_METADATA][0] = + op->recv_initial_metadata_ready; + s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete; + } + if (op->send_initial_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, + "perform_stream_op - send_initial_metadata: on_complete=%p", + op->on_complete); + } + s->num_headers = 0; + convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head, + ct->host, s); + s->header_array.count = s->num_headers; + s->header_array.capacity = s->num_headers; + s->header_array.headers = s->headers; + GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]); + s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete; + } + if (op->send_message) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p", + op->on_complete); + } + grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice, + op->send_message->length, NULL); + // Check that compression flag is not ON. We don't support compression yet. + // TODO (makdharma): add compression support + GPR_ASSERT(op->send_message->flags == 0); + gpr_slice_buffer_add(&s->write_slice_buffer, s->slice); + if (s->cbs == NULL) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create"); + } + s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks); + GPR_ASSERT(s->cbs); + s->read_closed = false; + s->response_trailers_received = false; + s->response_headers_received = false; + s->cronet_send_state = CRONET_SEND_IDLE; + s->cronet_recv_state = CRONET_RECV_IDLE; + } + GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]); + s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete; + next_send_step(s); + } + if (op->send_trailing_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, + "perform_stream_op - send_trailing_metadata: on_complete=%p", + op->on_complete); + } + GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]); + s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete; + if (s->cbs) { + // Send an "empty" write to the far end to signal that we're done. + // This will induce the server to send down trailers. + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + } + cronet_bidirectional_stream_write(s->cbs, "abc", 0, true); + } else { + // We never created a stream. This was probably an empty request. + invoke_closing_callback(s); + } + } +} + +static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_stream_refcount *refcount, + const void *server_data) { + stream_obj *s = (stream_obj *)gs; + memset(s->callback_list, 0, sizeof(s->callback_list)); + s->cbs = NULL; + gpr_mu_init(&s->recv_mu); + s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); + s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); + gpr_slice_buffer_init(&s->write_slice_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_transport - init_stream"); + } + return 0; +} + +static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, void *and_free_memory) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "Destroy stream"); + } + stream_obj *s = (stream_obj *)gs; + s->cbs = NULL; + gpr_free(s->read_buffer); + gpr_free(s->write_buffer); + gpr_free(s->url); + gpr_mu_destroy(&s->recv_mu); + if (and_free_memory) { + gpr_free(and_free_memory); + } +} + +static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { + grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; + gpr_free(ct->host); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "Destroy transport"); + } +} + +const grpc_transport_vtable grpc_cronet_vtable = { + sizeof(stream_obj), "cronet_http", init_stream, + set_pollset_do_nothing, perform_stream_op, NULL, + destroy_stream, destroy_transport, NULL}; diff --git a/src/core/lib/http/parser.c.orig b/src/core/lib/http/parser.c.orig new file mode 100644 index 0000000000..74d90fd8bf --- /dev/null +++ b/src/core/lib/http/parser.c.orig @@ -0,0 +1,357 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/http/parser.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +int grpc_http1_trace = 0; + +static char *buf2str(void *buffer, size_t length) { + char *out = gpr_malloc(length + 1); + memcpy(out, buffer, length); + out[length] = 0; + return out; +} + +static grpc_error *handle_response_line(grpc_http_parser *parser) { + uint8_t *beg = parser->cur_line; + uint8_t *cur = beg; + uint8_t *end = beg + parser->cur_line_length; + + if (cur == end || *cur++ != 'H') return GRPC_ERROR_CREATE("Expected 'H'"); + if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); + if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); + if (cur == end || *cur++ != 'P') return GRPC_ERROR_CREATE("Expected 'P'"); + if (cur == end || *cur++ != '/') return GRPC_ERROR_CREATE("Expected '/'"); + if (cur == end || *cur++ != '1') return GRPC_ERROR_CREATE("Expected '1'"); + if (cur == end || *cur++ != '.') return GRPC_ERROR_CREATE("Expected '.'"); + if (cur == end || *cur < '0' || *cur++ > '1') { + return GRPC_ERROR_CREATE("Expected HTTP/1.0 or HTTP/1.1"); + } + if (cur == end || *cur++ != ' ') return GRPC_ERROR_CREATE("Expected ' '"); + if (cur == end || *cur < '1' || *cur++ > '9') + return GRPC_ERROR_CREATE("Expected status code"); + if (cur == end || *cur < '0' || *cur++ > '9') + return GRPC_ERROR_CREATE("Expected status code"); + if (cur == end || *cur < '0' || *cur++ > '9') + return GRPC_ERROR_CREATE("Expected status code"); + parser->http.response->status = + (cur[-3] - '0') * 100 + (cur[-2] - '0') * 10 + (cur[-1] - '0'); + if (cur == end || *cur++ != ' ') return GRPC_ERROR_CREATE("Expected ' '"); + + /* we don't really care about the status code message */ + + return GRPC_ERROR_NONE; +} + +static grpc_error *handle_request_line(grpc_http_parser *parser) { + uint8_t *beg = parser->cur_line; + uint8_t *cur = beg; + uint8_t *end = beg + parser->cur_line_length; + uint8_t vers_major = 0; + uint8_t vers_minor = 0; + + while (cur != end && *cur++ != ' ') + ; + if (cur == end) return GRPC_ERROR_CREATE("No method on HTTP request line"); + parser->http.request->method = buf2str(beg, (size_t)(cur - beg - 1)); + + beg = cur; + while (cur != end && *cur++ != ' ') + ; + if (cur == end) return GRPC_ERROR_CREATE("No path on HTTP request line"); + parser->http.request->path = buf2str(beg, (size_t)(cur - beg - 1)); + + if (cur == end || *cur++ != 'H') return GRPC_ERROR_CREATE("Expected 'H'"); + if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); + if (cur == end || *cur++ != 'T') return GRPC_ERROR_CREATE("Expected 'T'"); + if (cur == end || *cur++ != 'P') return GRPC_ERROR_CREATE("Expected 'P'"); + if (cur == end || *cur++ != '/') return GRPC_ERROR_CREATE("Expected '/'"); + vers_major = (uint8_t)(*cur++ - '1' + 1); + ++cur; + if (cur == end) + return GRPC_ERROR_CREATE("End of line in HTTP version string"); + vers_minor = (uint8_t)(*cur++ - '1' + 1); + + if (vers_major == 1) { + if (vers_minor == 0) { + parser->http.request->version = GRPC_HTTP_HTTP10; + } else if (vers_minor == 1) { + parser->http.request->version = GRPC_HTTP_HTTP11; + } else { + return GRPC_ERROR_CREATE( + "Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); + } + } else if (vers_major == 2) { + if (vers_minor == 0) { + parser->http.request->version = GRPC_HTTP_HTTP20; + } else { + return GRPC_ERROR_CREATE( + "Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); + } + } else { + return GRPC_ERROR_CREATE("Expected one of HTTP/1.0, HTTP/1.1, or HTTP/2.0"); + } + + return GRPC_ERROR_NONE; +} + +static grpc_error *handle_first_line(grpc_http_parser *parser) { + switch (parser->type) { + case GRPC_HTTP_REQUEST: + return handle_request_line(parser); + case GRPC_HTTP_RESPONSE: + return handle_response_line(parser); + } + GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here")); +} + +static grpc_error *add_header(grpc_http_parser *parser) { + uint8_t *beg = parser->cur_line; + uint8_t *cur = beg; + uint8_t *end = beg + parser->cur_line_length; + size_t *hdr_count = NULL; + grpc_http_header **hdrs = NULL; + grpc_http_header hdr = {NULL, NULL}; + grpc_error *error = GRPC_ERROR_NONE; + + GPR_ASSERT(cur != end); + + if (*cur == ' ' || *cur == '\t') { + error = GRPC_ERROR_CREATE("Continued header lines not supported yet"); + goto done; + } + + while (cur != end && *cur != ':') { + cur++; + } + if (cur == end) { +<<<<<<< HEAD + error = GRPC_ERROR_CREATE("Didn't find ':' in header string"); + goto done; +======= + if (grpc_http1_trace) { + gpr_log(GPR_ERROR, "Didn't find ':' in header string"); + } + goto error; +>>>>>>> a709afe241d8b264a1c326315f757b4a8d330207 + } + GPR_ASSERT(cur >= beg); + hdr.key = buf2str(beg, (size_t)(cur - beg)); + cur++; /* skip : */ + + while (cur != end && (*cur == ' ' || *cur == '\t')) { + cur++; + } + GPR_ASSERT((size_t)(end - cur) >= parser->cur_line_end_length); + hdr.value = buf2str(cur, (size_t)(end - cur) - parser->cur_line_end_length); + + switch (parser->type) { + case GRPC_HTTP_RESPONSE: + hdr_count = &parser->http.response->hdr_count; + hdrs = &parser->http.response->hdrs; + break; + case GRPC_HTTP_REQUEST: + hdr_count = &parser->http.request->hdr_count; + hdrs = &parser->http.request->hdrs; + break; + } + + if (*hdr_count == parser->hdr_capacity) { + parser->hdr_capacity = + GPR_MAX(parser->hdr_capacity + 1, parser->hdr_capacity * 3 / 2); + *hdrs = gpr_realloc(*hdrs, parser->hdr_capacity * sizeof(**hdrs)); + } + (*hdrs)[(*hdr_count)++] = hdr; + +done: + if (error != GRPC_ERROR_NONE) { + gpr_free(hdr.key); + gpr_free(hdr.value); + } + return error; +} + +static grpc_error *finish_line(grpc_http_parser *parser) { + grpc_error *err; + switch (parser->state) { + case GRPC_HTTP_FIRST_LINE: + err = handle_first_line(parser); + if (err != GRPC_ERROR_NONE) return err; + parser->state = GRPC_HTTP_HEADERS; + break; + case GRPC_HTTP_HEADERS: + if (parser->cur_line_length == parser->cur_line_end_length) { + parser->state = GRPC_HTTP_BODY; + break; + } + err = add_header(parser); + if (err != GRPC_ERROR_NONE) { + return err; + } + break; + case GRPC_HTTP_BODY: + GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here")); + } + + parser->cur_line_length = 0; + return GRPC_ERROR_NONE; +} + +static grpc_error *addbyte_body(grpc_http_parser *parser, uint8_t byte) { + size_t *body_length = NULL; + char **body = NULL; + + if (parser->type == GRPC_HTTP_RESPONSE) { + body_length = &parser->http.response->body_length; + body = &parser->http.response->body; + } else if (parser->type == GRPC_HTTP_REQUEST) { + body_length = &parser->http.request->body_length; + body = &parser->http.request->body; + } else { + GPR_UNREACHABLE_CODE(return GRPC_ERROR_CREATE("Should never reach here")); + } + + if (*body_length == parser->body_capacity) { + parser->body_capacity = GPR_MAX(8, parser->body_capacity * 3 / 2); + *body = gpr_realloc((void *)*body, parser->body_capacity); + } + (*body)[*body_length] = (char)byte; + (*body_length)++; + + return GRPC_ERROR_NONE; +} + +static bool check_line(grpc_http_parser *parser) { + if (parser->cur_line_length >= 2 && + parser->cur_line[parser->cur_line_length - 2] == '\r' && + parser->cur_line[parser->cur_line_length - 1] == '\n') { + return true; + } + + // HTTP request with \n\r line termiantors. + else if (parser->cur_line_length >= 2 && + parser->cur_line[parser->cur_line_length - 2] == '\n' && + parser->cur_line[parser->cur_line_length - 1] == '\r') { + return true; + } + + // HTTP request with only \n line terminators. + else if (parser->cur_line_length >= 1 && + parser->cur_line[parser->cur_line_length - 1] == '\n') { + parser->cur_line_end_length = 1; + return true; + } + + return false; +} + +static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte) { + switch (parser->state) { + case GRPC_HTTP_FIRST_LINE: + case GRPC_HTTP_HEADERS: + if (parser->cur_line_length >= GRPC_HTTP_PARSER_MAX_HEADER_LENGTH) { + if (grpc_http1_trace) + gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded", + GRPC_HTTP_PARSER_MAX_HEADER_LENGTH); + return 0; + } + parser->cur_line[parser->cur_line_length] = byte; + parser->cur_line_length++; + if (check_line(parser)) { + return finish_line(parser); + } else { + return GRPC_ERROR_NONE; + } + GPR_UNREACHABLE_CODE(return 0); + case GRPC_HTTP_BODY: + return addbyte_body(parser, byte); + } + GPR_UNREACHABLE_CODE(return 0); +} + +void grpc_http_parser_init(grpc_http_parser *parser, grpc_http_type type, + void *request_or_response) { + memset(parser, 0, sizeof(*parser)); + parser->state = GRPC_HTTP_FIRST_LINE; + parser->type = type; + parser->http.request_or_response = request_or_response; + parser->cur_line_end_length = 2; +} + +void grpc_http_parser_destroy(grpc_http_parser *parser) {} + +void grpc_http_request_destroy(grpc_http_request *request) { + size_t i; + gpr_free(request->body); + for (i = 0; i < request->hdr_count; i++) { + gpr_free(request->hdrs[i].key); + gpr_free(request->hdrs[i].value); + } + gpr_free(request->hdrs); + gpr_free(request->method); + gpr_free(request->path); +} + +void grpc_http_response_destroy(grpc_http_response *response) { + size_t i; + gpr_free(response->body); + for (i = 0; i < response->hdr_count; i++) { + gpr_free(response->hdrs[i].key); + gpr_free(response->hdrs[i].value); + } + gpr_free(response->hdrs); +} + +grpc_error *grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice) { + size_t i; + + for (i = 0; i < GPR_SLICE_LENGTH(slice); i++) { + grpc_error *err = addbyte(parser, GPR_SLICE_START_PTR(slice)[i]); + if (err != GRPC_ERROR_NONE) return err; + } + + return GRPC_ERROR_NONE; +} + +grpc_error *grpc_http_parser_eof(grpc_http_parser *parser) { + if (parser->state != GRPC_HTTP_BODY) { + return GRPC_ERROR_CREATE("Did not finish headers"); + } + return GRPC_ERROR_NONE; +} diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index df6cf956d9..98ffccd59b 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -81,6 +81,7 @@ typedef struct { grpc_closure read_closure; grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; + grpc_udp_server_orphan_cb orphan_cb; } server_port; /* the overall server */ @@ -168,6 +169,10 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { server_port *sp = &s->ports[i]; sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; + + GPR_ASSERT(sp->orphan_cb); + sp->orphan_cb(sp->emfd); + grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, "udp_listener_shutdown"); } @@ -268,7 +273,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { static int add_socket_to_server(grpc_udp_server *s, int fd, const struct sockaddr *addr, size_t addr_len, - grpc_udp_server_read_cb read_cb) { + grpc_udp_server_read_cb read_cb, + grpc_udp_server_orphan_cb orphan_cb) { server_port *sp; int port; char *addr_str; @@ -292,6 +298,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; sp->read_cb = read_cb; + sp->orphan_cb = orphan_cb; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); gpr_free(name); @@ -301,7 +308,8 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, } int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, - size_t addr_len, grpc_udp_server_read_cb read_cb) { + size_t addr_len, grpc_udp_server_read_cb read_cb, + grpc_udp_server_orphan_cb orphan_cb) { int allocated_port1 = -1; int allocated_port2 = -1; unsigned i; @@ -348,7 +356,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, addr = (struct sockaddr *)&wild6; addr_len = sizeof(wild6); fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); - allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb); + allocated_port1 = + add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -370,7 +379,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, addr = (struct sockaddr *)&addr4_copy; addr_len = sizeof(addr4_copy); } - allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb); + allocated_port2 = + add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb); done: gpr_free(allocated_addr); diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index d8cf957a22..33c5ce11cd 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -48,6 +48,9 @@ typedef struct grpc_udp_server grpc_udp_server; typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, struct grpc_server *server); +/* Called when the grpc_fd is about to be orphaned (and the FD closed). */ +typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd); + /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); @@ -69,7 +72,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); /* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, - size_t addr_len, grpc_udp_server_read_cb read_cb); + size_t addr_len, grpc_udp_server_read_cb read_cb, + grpc_udp_server_orphan_cb orphan_cb); void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, grpc_closure *on_done); |