diff options
author | 2016-05-18 22:12:43 -0700 | |
---|---|---|
committer | 2016-05-18 22:12:43 -0700 | |
commit | a4880374f9e49ce648cdf3b7512804613280075e (patch) | |
tree | 8ee37af5eab830def6f5f196e5f79b55169f12f5 /src/core | |
parent | fc1ed17fdea8ba574586cfc479f77de3b200f1c6 (diff) | |
parent | a71b0afdc2916e85c03d73eb9fbedd3d9070676a (diff) |
Merge branch 'master' of github.com:grpc/grpc into credentials_refactoring
Diffstat (limited to 'src/core')
21 files changed, 2259 insertions, 41 deletions
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c index 9918fbdcb4..91fa917661 100644 --- a/src/core/ext/client_config/subchannel_call_holder.c +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -174,6 +174,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (holder->connected_subchannel == NULL) { + gpr_atm_no_barrier_store(&holder->subchannel_call, 1); fail_locked(exec_ctx, holder); } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) { /* already cancelled before subchannel became ready */ diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c new file mode 100644 index 0000000000..df1acddcc0 --- /dev/null +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -0,0 +1,69 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/impl/codegen/port_platform.h> + +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/transport_impl.h" + +// Cronet transport object +typedef struct cronet_transport { + grpc_transport base; // must be first element in this structure + void *engine; + char *host; +} cronet_transport; + +extern grpc_transport_vtable grpc_cronet_vtable; + +GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( + void *engine, const char *target, const grpc_channel_args *args, + void *reserved) { + cronet_transport *ct = gpr_malloc(sizeof(cronet_transport)); + ct->base.vtable = &grpc_cronet_vtable; + ct->engine = engine; + ct->host = gpr_malloc(strlen(target) + 1); + strcpy(ct->host, target); + gpr_log(GPR_DEBUG, + "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine, + ct->host); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + return grpc_channel_create(&exec_ctx, target, args, + GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); +} diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c new file mode 100644 index 0000000000..687026c9fd --- /dev/null +++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c @@ -0,0 +1,85 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* This file has empty implementation of all the functions exposed by the cronet +library, so we can build it in all environments */ + +#include <stdbool.h> + +#include <grpc/support/log.h> + +#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" + +#ifdef GRPC_COMPILE_WITH_CRONET +/* link with the real CRONET library in the build system */ +#else +/* Dummy implementation of cronet API just to test for build-ability */ +cronet_bidirectional_stream* cronet_bidirectional_stream_create( + cronet_engine* engine, void* annotation, + cronet_bidirectional_stream_callback* callback) { + GPR_ASSERT(0); + return NULL; +} + +int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_start( + cronet_bidirectional_stream* stream, const char* url, int priority, + const char* method, const cronet_bidirectional_stream_header_array* headers, + bool end_of_stream) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream, + char* buffer, int capacity) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream, + const char* buffer, int count, + bool end_of_stream) { + GPR_ASSERT(0); + return 0; +} + +int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { + GPR_ASSERT(0); + return 0; +} + +#endif /* GRPC_COMPILE_WITH_CRONET */ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c new file mode 100644 index 0000000000..5bb085195c --- /dev/null +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -0,0 +1,640 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <string.h> + +#include <grpc/impl/codegen/port_platform.h> +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + +#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/transport_impl.h" +#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h" + +#define GRPC_HEADER_SIZE_IN_BYTES 5 + +// Global flag that gets set with GRPC_TRACE env variable +int grpc_cronet_trace = 1; + +// Cronet transport object +struct grpc_cronet_transport { + grpc_transport base; /* must be first element in this structure */ + cronet_engine *engine; + char *host; +}; + +typedef struct grpc_cronet_transport grpc_cronet_transport; + +enum send_state { + CRONET_SEND_IDLE = 0, + CRONET_REQ_STARTED, + CRONET_SEND_HEADER, + CRONET_WRITE, + CRONET_WRITE_COMPLETED, +}; + +enum recv_state { + CRONET_RECV_IDLE = 0, + CRONET_RECV_READ_LENGTH, + CRONET_RECV_READ_DATA, + CRONET_RECV_CLOSED, +}; + +static const char *recv_state_name[] = { + "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,", + "CRONET_RECV_CLOSED"}; + +// Enum that identifies calling function. +enum e_caller { + PERFORM_STREAM_OP, + ON_READ_COMPLETE, + ON_RESPONSE_HEADERS_RECEIVED, + ON_RESPONSE_TRAILERS_RECEIVED +}; + +enum callback_id { + CB_SEND_INITIAL_METADATA = 0, + CB_SEND_MESSAGE, + CB_SEND_TRAILING_METADATA, + CB_RECV_MESSAGE, + CB_RECV_INITIAL_METADATA, + CB_RECV_TRAILING_METADATA, + CB_NUM_CALLBACKS +}; + +struct stream_obj { + // we store received bytes here as they trickle in. + gpr_slice_buffer write_slice_buffer; + cronet_bidirectional_stream *cbs; + gpr_slice slice; + gpr_slice_buffer read_slice_buffer; + struct grpc_slice_buffer_stream sbs; + char *read_buffer; + int remaining_read_bytes; + int total_read_bytes; + + char *write_buffer; + size_t write_buffer_size; + + // Hold the URL + char *url; + + bool response_headers_received; + bool read_requested; + bool response_trailers_received; + bool read_closed; + + // Recv message stuff + grpc_byte_buffer **recv_message; + // Initial metadata stuff + grpc_metadata_batch *recv_initial_metadata; + // Trailing metadata stuff + grpc_metadata_batch *recv_trailing_metadata; + grpc_chttp2_incoming_metadata_buffer imb; + + // This mutex protects receive state machine execution + gpr_mu recv_mu; + // we can queue up up to 2 callbacks for each OP + grpc_closure *callback_list[CB_NUM_CALLBACKS][2]; + + // storage for header + cronet_bidirectional_stream_header *headers; + uint32_t num_headers; + cronet_bidirectional_stream_header_array header_array; + // state tracking + enum recv_state cronet_recv_state; + enum send_state cronet_send_state; +}; + +typedef struct stream_obj stream_obj; + +static void next_send_step(stream_obj *s); +static void next_recv_step(stream_obj *s, enum e_caller caller); + +static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset *pollset) {} + +static void enqueue_callbacks(grpc_closure *callback_list[]) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + if (callback_list[0]) { + grpc_exec_ctx_enqueue(&exec_ctx, callback_list[0], true, NULL); + callback_list[0] = NULL; + } + if (callback_list[1]) { + grpc_exec_ctx_enqueue(&exec_ctx, callback_list[1], true, NULL); + callback_list[1] = NULL; + } + grpc_exec_ctx_finish(&exec_ctx); +} + +static void on_canceled(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_canceled %p", stream); + } +} + +static void on_failed(cronet_bidirectional_stream *stream, int net_error) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error); + } +} + +static void on_succeeded(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "on_succeeded %p", stream); + } +} + +static void on_response_trailers_received( + cronet_bidirectional_stream *stream, + const cronet_bidirectional_stream_header_array *trailers) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_response_trailers_received"); + } + stream_obj *s = (stream_obj *)stream->annotation; + + memset(&s->imb, 0, sizeof(s->imb)); + grpc_chttp2_incoming_metadata_buffer_init(&s->imb); + unsigned int i = 0; + for (i = 0; i < trailers->count; i++) { + grpc_chttp2_incoming_metadata_buffer_add( + &s->imb, grpc_mdelem_from_metadata_strings( + grpc_mdstr_from_string(trailers->headers[i].key), + grpc_mdstr_from_string(trailers->headers[i].value))); + } + s->response_trailers_received = true; + next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED); +} + +static void on_write_completed(cronet_bidirectional_stream *stream, + const char *data) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: on_write_completed"); + } + stream_obj *s = (stream_obj *)stream->annotation; + enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]); + s->cronet_send_state = CRONET_WRITE_COMPLETED; + next_send_step(s); +} + +static void process_recv_message(stream_obj *s, const uint8_t *recv_data) { + gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes); + uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); + memcpy(dst_p, recv_data, (size_t)s->total_read_bytes); + gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice); + grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0); + *s->recv_message = (grpc_byte_buffer *)&s->sbs; +} + +static int parse_grpc_header(const uint8_t *data) { + const uint8_t *p = data + 1; + int length = 0; + length |= ((uint8_t)*p++) << 24; + length |= ((uint8_t)*p++) << 16; + length |= ((uint8_t)*p++) << 8; + length |= ((uint8_t)*p++); + return length; +} + +static void on_read_completed(cronet_bidirectional_stream *stream, char *data, + int count) { + stream_obj *s = (stream_obj *)stream->annotation; + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d", + count, s->total_read_bytes, s->remaining_read_bytes); + } + if (count > 0) { + GPR_ASSERT(s->recv_message); + s->remaining_read_bytes -= count; + next_recv_step(s, ON_READ_COMPLETE); + } else { + s->read_closed = true; + next_recv_step(s, ON_READ_COMPLETE); + } +} + +static void on_response_headers_received( + cronet_bidirectional_stream *stream, + const cronet_bidirectional_stream_header_array *headers, + const char *negotiated_protocol) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: on_response_headers_received"); + } + stream_obj *s = (stream_obj *)stream->annotation; + enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]); + s->response_headers_received = true; + next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED); +} + +static void on_request_headers_sent(cronet_bidirectional_stream *stream) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: on_request_headers_sent"); + } + stream_obj *s = (stream_obj *)stream->annotation; + enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]); + s->cronet_send_state = CRONET_SEND_HEADER; + next_send_step(s); +} + +// Callback function pointers (invoked by cronet in response to events) +static cronet_bidirectional_stream_callback callbacks = { + on_request_headers_sent, + on_response_headers_received, + on_read_completed, + on_write_completed, + on_response_trailers_received, + on_succeeded, + on_failed, + on_canceled}; + +static void invoke_closing_callback(stream_obj *s) { + grpc_chttp2_incoming_metadata_buffer_publish(&s->imb, + s->recv_trailing_metadata); + if (s->callback_list[CB_RECV_TRAILING_METADATA]) { + enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]); + } +} + +static void set_recv_state(stream_obj *s, enum recv_state state) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]); + } + s->cronet_recv_state = state; +} + +// This is invoked from perform_stream_op, and all on_xxxx callbacks. +static void next_recv_step(stream_obj *s, enum e_caller caller) { + gpr_mu_lock(&s->recv_mu); + switch (s->cronet_recv_state) { + case CRONET_RECV_IDLE: + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE"); + } + if (caller == PERFORM_STREAM_OP || + caller == ON_RESPONSE_HEADERS_RECEIVED) { + if (s->read_closed && s->response_trailers_received) { + invoke_closing_callback(s); + set_recv_state(s, CRONET_RECV_CLOSED); + } else if (s->response_headers_received == true && + s->read_requested == true) { + set_recv_state(s, CRONET_RECV_READ_LENGTH); + s->total_read_bytes = s->remaining_read_bytes = + GRPC_HEADER_SIZE_IN_BYTES; + GPR_ASSERT(s->read_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + } + cronet_bidirectional_stream_read(s->cbs, s->read_buffer, + s->remaining_read_bytes); + } + } + break; + case CRONET_RECV_READ_LENGTH: + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH"); + } + if (caller == ON_READ_COMPLETE) { + if (s->read_closed) { + invoke_closing_callback(s); + enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); + set_recv_state(s, CRONET_RECV_CLOSED); + } else { + GPR_ASSERT(s->remaining_read_bytes == 0); + set_recv_state(s, CRONET_RECV_READ_DATA); + s->total_read_bytes = s->remaining_read_bytes = + parse_grpc_header((const uint8_t *)s->read_buffer); + s->read_buffer = + gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes); + GPR_ASSERT(s->read_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + } + cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer, + s->remaining_read_bytes); + } + } + break; + case CRONET_RECV_READ_DATA: + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA"); + } + if (caller == ON_READ_COMPLETE) { + if (s->remaining_read_bytes > 0) { + int offset = s->total_read_bytes - s->remaining_read_bytes; + GPR_ASSERT(s->read_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()"); + } + cronet_bidirectional_stream_read( + s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes); + } else { + gpr_slice_buffer_init(&s->read_slice_buffer); + uint8_t *p = (uint8_t *)s->read_buffer; + process_recv_message(s, p); + set_recv_state(s, CRONET_RECV_IDLE); + enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]); + } + } + break; + case CRONET_RECV_CLOSED: + break; + default: + GPR_ASSERT(0); // Should not reach here + break; + } + gpr_mu_unlock(&s->recv_mu); +} + +// This function takes the data from s->write_slice_buffer and assembles into +// a contiguous byte stream with 5 byte gRPC header prepended. +static void create_grpc_frame(stream_obj *s) { + gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer); + uint8_t *raw_data = GPR_SLICE_START_PTR(slice); + size_t length = GPR_SLICE_LENGTH(slice); + s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES; + s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size); + uint8_t *p = (uint8_t *)s->write_buffer; + // Append 5 byte header + *p++ = 0; + *p++ = (uint8_t)(length >> 24); + *p++ = (uint8_t)(length >> 16); + *p++ = (uint8_t)(length >> 8); + *p++ = (uint8_t)(length); + // append actual data + memcpy(p, raw_data, length); +} + +static void do_write(stream_obj *s) { + gpr_slice_buffer *sb = &s->write_slice_buffer; + GPR_ASSERT(sb->count <= 1); + if (sb->count > 0) { + create_grpc_frame(s); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + } + cronet_bidirectional_stream_write(s->cbs, s->write_buffer, + (int)s->write_buffer_size, false); + } +} + +// +static void next_send_step(stream_obj *s) { + switch (s->cronet_send_state) { + case CRONET_SEND_IDLE: + GPR_ASSERT( + s->cbs); // cronet_bidirectional_stream is not initialized yet. + s->cronet_send_state = CRONET_REQ_STARTED; + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url); + } + cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST", + &s->header_array, false); + // we no longer need the memory that was allocated earlier. + gpr_free(s->header_array.headers); + break; + case CRONET_SEND_HEADER: + do_write(s); + s->cronet_send_state = CRONET_WRITE; + break; + case CRONET_WRITE_COMPLETED: + do_write(s); + break; + default: + GPR_ASSERT(0); + break; + } +} + +static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head, + const char *host, + stream_obj *s) { + grpc_linked_mdelem *curr = head; + // Walk the linked list and get number of header fields + uint32_t num_headers_available = 0; + while (curr != NULL) { + curr = curr->next; + num_headers_available++; + } + // Allocate enough memory + s->headers = (cronet_bidirectional_stream_header *)gpr_malloc( + sizeof(cronet_bidirectional_stream_header) * num_headers_available); + + // Walk the linked list again, this time copying the header fields. + // s->num_headers + // can be less than num_headers_available, as some headers are not used for + // cronet + curr = head; + s->num_headers = 0; + while (s->num_headers < num_headers_available) { + grpc_mdelem *mdelem = curr->md; + curr = curr->next; + const char *key = grpc_mdstr_as_c_string(mdelem->key); + const char *value = grpc_mdstr_as_c_string(mdelem->value); + if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 || + strcmp(key, ":authority") == 0) { + // Cronet populates these fields on its own. + continue; + } + if (strcmp(key, ":path") == 0) { + // Create URL by appending :path value to the hostname + gpr_asprintf(&s->url, "https://%s%s", host, value); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "extracted URL = %s", s->url); + } + continue; + } + s->headers[s->num_headers].key = key; + s->headers[s->num_headers].value = value; + s->num_headers++; + if (curr == NULL) { + break; + } + } +} + +static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_transport_stream_op *op) { + grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; + GPR_ASSERT(ct->engine); + stream_obj *s = (stream_obj *)gs; + if (op->recv_trailing_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, + "perform_stream_op - recv_trailing_metadata: on_complete=%p", + op->on_complete); + } + s->recv_trailing_metadata = op->recv_trailing_metadata; + GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]); + s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete; + } + if (op->recv_message) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p", + op->on_complete); + } + s->recv_message = (grpc_byte_buffer **)op->recv_message; + GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]); + GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]); + s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready; + s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete; + s->read_requested = true; + next_recv_step(s, PERFORM_STREAM_OP); + } + if (op->recv_initial_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p", + op->on_complete); + } + s->recv_initial_metadata = op->recv_initial_metadata; + GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]); + GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]); + s->callback_list[CB_RECV_INITIAL_METADATA][0] = + op->recv_initial_metadata_ready; + s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete; + } + if (op->send_initial_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, + "perform_stream_op - send_initial_metadata: on_complete=%p", + op->on_complete); + } + s->num_headers = 0; + convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head, + ct->host, s); + s->header_array.count = s->num_headers; + s->header_array.capacity = s->num_headers; + s->header_array.headers = s->headers; + GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]); + s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete; + } + if (op->send_message) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p", + op->on_complete); + } + grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice, + op->send_message->length, NULL); + // Check that compression flag is not ON. We don't support compression yet. + // TODO (makdharma): add compression support + GPR_ASSERT(op->send_message->flags == 0); + gpr_slice_buffer_add(&s->write_slice_buffer, s->slice); + if (s->cbs == NULL) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create"); + } + s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks); + GPR_ASSERT(s->cbs); + s->read_closed = false; + s->response_trailers_received = false; + s->response_headers_received = false; + s->cronet_send_state = CRONET_SEND_IDLE; + s->cronet_recv_state = CRONET_RECV_IDLE; + } + GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]); + s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete; + next_send_step(s); + } + if (op->send_trailing_metadata) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, + "perform_stream_op - send_trailing_metadata: on_complete=%p", + op->on_complete); + } + GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]); + s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete; + if (s->cbs) { + // Send an "empty" write to the far end to signal that we're done. + // This will induce the server to send down trailers. + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write"); + } + cronet_bidirectional_stream_write(s->cbs, "abc", 0, true); + } else { + // We never created a stream. This was probably an empty request. + invoke_closing_callback(s); + } + } +} + +static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_stream_refcount *refcount, + const void *server_data) { + stream_obj *s = (stream_obj *)gs; + memset(s->callback_list, 0, sizeof(s->callback_list)); + s->cbs = NULL; + gpr_mu_init(&s->recv_mu); + s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); + s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES); + gpr_slice_buffer_init(&s->write_slice_buffer); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "cronet_transport - init_stream"); + } + return 0; +} + +static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, void *and_free_memory) { + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "Destroy stream"); + } + stream_obj *s = (stream_obj *)gs; + s->cbs = NULL; + gpr_free(s->read_buffer); + gpr_free(s->write_buffer); + gpr_free(s->url); + gpr_mu_destroy(&s->recv_mu); + if (and_free_memory) { + gpr_free(and_free_memory); + } +} + +static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { + grpc_cronet_transport *ct = (grpc_cronet_transport *)gt; + gpr_free(ct->host); + if (grpc_cronet_trace) { + gpr_log(GPR_DEBUG, "Destroy transport"); + } +} + +const grpc_transport_vtable grpc_cronet_vtable = { + sizeof(stream_obj), "cronet_http", init_stream, + set_pollset_do_nothing, perform_stream_op, NULL, + destroy_stream, destroy_transport, NULL}; diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 28d2d78d00..893cf0700e 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -170,7 +170,7 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( if (a == NULL) return 0; for (i = 0; i < a->num_args; ++i) { if (a->args[i].type == GRPC_ARG_INTEGER && - !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) { + !strcmp(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, a->args[i].key)) { return (grpc_compression_algorithm)a->args[i].value.integer; break; } @@ -182,7 +182,7 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm( grpc_channel_args *a, grpc_compression_algorithm algorithm) { grpc_arg tmp; tmp.type = GRPC_ARG_INTEGER; - tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG; + tmp.key = GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM; tmp.value.integer = algorithm; return grpc_channel_args_copy_and_add(a, &tmp, 1); } @@ -196,7 +196,8 @@ static int find_compression_algorithm_states_bitset(const grpc_channel_args *a, size_t i; for (i = 0; i < a->num_args; ++i) { if (a->args[i].type == GRPC_ARG_INTEGER && - !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) { + !strcmp(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET, + a->args[i].key)) { *states_arg = &a->args[i].value.integer; return 1; /* GPR_TRUE */ } @@ -222,7 +223,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( /* create a new arg */ grpc_arg tmp; tmp.type = GRPC_ARG_INTEGER; - tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG; + tmp.key = GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET; /* all enabled by default */ tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; if (state != 0) { diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 5510c79b18..0e548c61b8 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -47,7 +47,7 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" -int grpc_compress_filter_trace = 0; +int grpc_compression_trace = 0; typedef struct call_data { gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ @@ -171,7 +171,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, did_compress = grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { - if (grpc_compress_filter_trace) { + if (grpc_compression_trace) { char *algo_name; const size_t before_size = calld->slices.length; const size_t after_size = tmp.length; @@ -185,12 +185,14 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, gpr_slice_buffer_swap(&calld->slices, &tmp); calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { - if (grpc_compress_filter_trace) { + if (grpc_compression_trace) { char *algo_name; GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm, &algo_name)); - gpr_log(GPR_DEBUG, "Algorithm '%s' enabled but decided not to compress.", - algo_name); + gpr_log( + GPR_DEBUG, + "Algorithm '%s' enabled but decided not to compress. Input size: %d", + algo_name, calld->slices.length); } } diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h index cf5879d82e..0ce5d08837 100644 --- a/src/core/lib/channel/compress_filter.h +++ b/src/core/lib/channel/compress_filter.h @@ -38,7 +38,7 @@ #define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request" -extern int grpc_compress_filter_trace; +extern int grpc_compression_trace; /** Compression filter for outgoing data. * diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index a7efb5e73e..09b2ed40d1 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -161,8 +161,9 @@ static int add_header(grpc_http_parser *parser) { cur++; } if (cur == end) { - if (grpc_http1_trace) + if (grpc_http1_trace) { gpr_log(GPR_ERROR, "Didn't find ':' in header string"); + } goto error; } GPR_ASSERT(cur >= beg); diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 3c8127e1a8..aeb6e28665 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -790,7 +790,6 @@ static void pollset_kick(grpc_pollset *p, static void pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); - grpc_wakeup_fd_global_init(); grpc_wakeup_fd_init(&grpc_global_wakeup_fd); } @@ -798,7 +797,6 @@ static void pollset_global_shutdown(void) { grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); gpr_tls_destroy(&g_current_thread_poller); gpr_tls_destroy(&g_current_thread_worker); - grpc_wakeup_fd_global_destroy(); } static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c new file mode 100644 index 0000000000..e91ae40212 --- /dev/null +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -0,0 +1,1212 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_SOCKET + +#include "src/core/lib/iomgr/ev_poll_posix.h" + +#include <assert.h> +#include <errno.h> +#include <poll.h> +#include <string.h> +#include <sys/socket.h> +#include <unistd.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/tls.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/block_annotate.h" + +/******************************************************************************* + * FD declarations + */ + +typedef struct grpc_fd_watcher { + struct grpc_fd_watcher *next; + struct grpc_fd_watcher *prev; + grpc_pollset *pollset; + grpc_pollset_worker *worker; + grpc_fd *fd; +} grpc_fd_watcher; + +struct grpc_fd { + int fd; + /* refst format: + bit0: 1=active/0=orphaned + bit1-n: refcount + meaning that mostly we ref by two to avoid altering the orphaned bit, + and just unref by 1 when we're ready to flag the object as orphaned */ + gpr_atm refst; + + gpr_mu mu; + int shutdown; + int closed; + int released; + + /* The watcher list. + + The following watcher related fields are protected by watcher_mu. + + An fd_watcher is an ephemeral object created when an fd wants to + begin polling, and destroyed after the poll. + + It denotes the fd's interest in whether to read poll or write poll + or both or neither on this fd. + + If a watcher is asked to poll for reads or writes, the read_watcher + or write_watcher fields are set respectively. A watcher may be asked + to poll for both, in which case both fields will be set. + + read_watcher and write_watcher may be NULL if no watcher has been + asked to poll for reads or writes. + + If an fd_watcher is not asked to poll for reads or writes, it's added + to a linked list of inactive watchers, rooted at inactive_watcher_root. + If at a later time there becomes need of a poller to poll, one of + the inactive pollers may be kicked out of their poll loops to take + that responsibility. */ + grpc_fd_watcher inactive_watcher_root; + grpc_fd_watcher *read_watcher; + grpc_fd_watcher *write_watcher; + + grpc_closure *read_closure; + grpc_closure *write_closure; + + grpc_closure *on_done_closure; + + grpc_iomgr_object iomgr_object; +}; + +/* Begin polling on an fd. + Registers that the given pollset is interested in this fd - so that if read + or writability interest changes, the pollset can be kicked to pick up that + new interest. + Return value is: + (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0) + i.e. a combination of read_mask and write_mask determined by the fd's current + interest in said events. + Polling strategies that do not need to alter their behavior depending on the + fd's current interest (such as epoll) do not need to call this function. + MUST NOT be called with a pollset lock taken */ +static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, + grpc_pollset_worker *worker, uint32_t read_mask, + uint32_t write_mask, grpc_fd_watcher *rec); +/* Complete polling previously started with fd_begin_poll + MUST NOT be called with a pollset lock taken + if got_read or got_write are 1, also does the become_{readable,writable} as + appropriate. */ +static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, + int got_read, int got_write); + +/* Return 1 if this fd is orphaned, 0 otherwise */ +static bool fd_is_orphaned(grpc_fd *fd); + +/* Reference counting for fds */ +/*#define GRPC_FD_REF_COUNT_DEBUG*/ +#ifdef GRPC_FD_REF_COUNT_DEBUG +static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line); +static void fd_unref(grpc_fd *fd, const char *reason, const char *file, + int line); +#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__) +#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__) +#else +static void fd_ref(grpc_fd *fd); +static void fd_unref(grpc_fd *fd); +#define GRPC_FD_REF(fd, reason) fd_ref(fd) +#define GRPC_FD_UNREF(fd, reason) fd_unref(fd) +#endif + +#define CLOSURE_NOT_READY ((grpc_closure *)0) +#define CLOSURE_READY ((grpc_closure *)1) + +/******************************************************************************* + * pollset declarations + */ + +typedef struct grpc_cached_wakeup_fd { + grpc_wakeup_fd fd; + struct grpc_cached_wakeup_fd *next; +} grpc_cached_wakeup_fd; + +struct grpc_pollset_worker { + grpc_cached_wakeup_fd *wakeup_fd; + int reevaluate_polling_on_wakeup; + int kicked_specifically; + struct grpc_pollset_worker *next; + struct grpc_pollset_worker *prev; +}; + +struct grpc_pollset { + gpr_mu mu; + grpc_pollset_worker root_worker; + int in_flight_cbs; + int shutting_down; + int called_shutdown; + int kicked_without_pollers; + grpc_closure *shutdown_done; + grpc_closure_list idle_jobs; + /* all polled fds */ + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; + /* fds that have been removed from the pollset explicitly */ + size_t del_count; + size_t del_capacity; + grpc_fd **dels; + /* Local cache of eventfds for workers */ + grpc_cached_wakeup_fd *local_wakeup_cache; +}; + +/* Add an fd to a pollset */ +static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + struct grpc_fd *fd); + +static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, grpc_fd *fd); + +/* Convert a timespec to milliseconds: + - very small or negative poll times are clamped to zero to do a + non-blocking poll (which becomes spin polling) + - other small values are rounded up to one millisecond + - longer than a millisecond polls are rounded up to the next nearest + millisecond to avoid spinning + - infinite timeouts are converted to -1 */ +static int poll_deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now); + +/* Allow kick to wakeup the currently polling worker */ +#define GRPC_POLLSET_CAN_KICK_SELF 1 +/* Force the wakee to repoll when awoken */ +#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 +/* As per pollset_kick, with an extended set of flags (defined above) + -- mostly for fd_posix's use. */ +static void pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags); + +/* Return 1 if the pollset has active threads in pollset_work (pollset must + * be locked) */ +static int pollset_has_workers(grpc_pollset *pollset); + +/******************************************************************************* + * pollset_set definitions + */ + +struct grpc_pollset_set { + gpr_mu mu; + + size_t pollset_count; + size_t pollset_capacity; + grpc_pollset **pollsets; + + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; +}; + +/******************************************************************************* + * fd_posix.c + */ + +#ifdef GRPC_FD_REF_COUNT_DEBUG +#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) +#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__) +static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, + int line) { + gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, + gpr_atm_no_barrier_load(&fd->refst), + gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); +#else +#define REF_BY(fd, n, reason) ref_by(fd, n) +#define UNREF_BY(fd, n, reason) unref_by(fd, n) +static void ref_by(grpc_fd *fd, int n) { +#endif + GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0); +} + +#ifdef GRPC_FD_REF_COUNT_DEBUG +static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file, + int line) { + gpr_atm old; + gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n, + gpr_atm_no_barrier_load(&fd->refst), + gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); +#else +static void unref_by(grpc_fd *fd, int n) { + gpr_atm old; +#endif + old = gpr_atm_full_fetch_add(&fd->refst, -n); + if (old == n) { + gpr_mu_destroy(&fd->mu); + grpc_iomgr_unregister_object(&fd->iomgr_object); + gpr_free(fd); + } else { + GPR_ASSERT(old > n); + } +} + +static grpc_fd *fd_create(int fd, const char *name) { + grpc_fd *r = gpr_malloc(sizeof(*r)); + gpr_mu_init(&r->mu); + gpr_atm_rel_store(&r->refst, 1); + r->shutdown = 0; + r->read_closure = CLOSURE_NOT_READY; + r->write_closure = CLOSURE_NOT_READY; + r->fd = fd; + r->inactive_watcher_root.next = r->inactive_watcher_root.prev = + &r->inactive_watcher_root; + r->read_watcher = r->write_watcher = NULL; + r->on_done_closure = NULL; + r->closed = 0; + r->released = 0; + + char *name2; + gpr_asprintf(&name2, "%s fd=%d", name, fd); + grpc_iomgr_register_object(&r->iomgr_object, name2); + gpr_free(name2); +#ifdef GRPC_FD_REF_COUNT_DEBUG + gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name); +#endif + return r; +} + +static bool fd_is_orphaned(grpc_fd *fd) { + return (gpr_atm_acq_load(&fd->refst) & 1) == 0; +} + +static void pollset_kick_locked(grpc_fd_watcher *watcher) { + gpr_mu_lock(&watcher->pollset->mu); + GPR_ASSERT(watcher->worker); + pollset_kick_ext(watcher->pollset, watcher->worker, + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + gpr_mu_unlock(&watcher->pollset->mu); +} + +static void maybe_wake_one_watcher_locked(grpc_fd *fd) { + if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { + pollset_kick_locked(fd->inactive_watcher_root.next); + } else if (fd->read_watcher) { + pollset_kick_locked(fd->read_watcher); + } else if (fd->write_watcher) { + pollset_kick_locked(fd->write_watcher); + } +} + +static void wake_all_watchers_locked(grpc_fd *fd) { + grpc_fd_watcher *watcher; + for (watcher = fd->inactive_watcher_root.next; + watcher != &fd->inactive_watcher_root; watcher = watcher->next) { + pollset_kick_locked(watcher); + } + if (fd->read_watcher) { + pollset_kick_locked(fd->read_watcher); + } + if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { + pollset_kick_locked(fd->write_watcher); + } +} + +static int has_watchers(grpc_fd *fd) { + return fd->read_watcher != NULL || fd->write_watcher != NULL || + fd->inactive_watcher_root.next != &fd->inactive_watcher_root; +} + +static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { + fd->closed = 1; + if (!fd->released) { + close(fd->fd); + } + grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL); +} + +static int fd_wrapped_fd(grpc_fd *fd) { + if (fd->released || fd->closed) { + return -1; + } else { + return fd->fd; + } +} + +static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure *on_done, int *release_fd, + const char *reason) { + fd->on_done_closure = on_done; + fd->released = release_fd != NULL; + if (!fd->released) { + shutdown(fd->fd, SHUT_RDWR); + } else { + *release_fd = fd->fd; + } + gpr_mu_lock(&fd->mu); + REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ + if (!has_watchers(fd)) { + close_fd_locked(exec_ctx, fd); + } else { + wake_all_watchers_locked(fd); + } + gpr_mu_unlock(&fd->mu); + UNREF_BY(fd, 2, reason); /* drop the reference */ +} + +/* increment refcount by two to avoid changing the orphan bit */ +#ifdef GRPC_FD_REF_COUNT_DEBUG +static void fd_ref(grpc_fd *fd, const char *reason, const char *file, + int line) { + ref_by(fd, 2, reason, file, line); +} + +static void fd_unref(grpc_fd *fd, const char *reason, const char *file, + int line) { + unref_by(fd, 2, reason, file, line); +} +#else +static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); } + +static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } +#endif + +static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure **st, grpc_closure *closure) { + if (*st == CLOSURE_NOT_READY) { + /* not ready ==> switch to a waiting state by setting the closure */ + *st = closure; + } else if (*st == CLOSURE_READY) { + /* already ready ==> queue the closure to run immediately */ + *st = CLOSURE_NOT_READY; + grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL); + maybe_wake_one_watcher_locked(fd); + } else { + /* upcallptr was set to a different closure. This is an error! */ + gpr_log(GPR_ERROR, + "User called a notify_on function with a previous callback still " + "pending"); + abort(); + } +} + +/* returns 1 if state becomes not ready */ +static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure **st) { + if (*st == CLOSURE_READY) { + /* duplicate ready ==> ignore */ + return 0; + } else if (*st == CLOSURE_NOT_READY) { + /* not ready, and not waiting ==> flag ready */ + *st = CLOSURE_READY; + return 0; + } else { + /* waiting ==> queue closure */ + grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL); + *st = CLOSURE_NOT_READY; + return 1; + } +} + +static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { + gpr_mu_lock(&fd->mu); + GPR_ASSERT(!fd->shutdown); + fd->shutdown = 1; + set_ready_locked(exec_ctx, fd, &fd->read_closure); + set_ready_locked(exec_ctx, fd, &fd->write_closure); + gpr_mu_unlock(&fd->mu); +} + +static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure *closure) { + gpr_mu_lock(&fd->mu); + notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); + gpr_mu_unlock(&fd->mu); +} + +static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + grpc_closure *closure) { + gpr_mu_lock(&fd->mu); + notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); + gpr_mu_unlock(&fd->mu); +} + +static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, + grpc_pollset_worker *worker, uint32_t read_mask, + uint32_t write_mask, grpc_fd_watcher *watcher) { + uint32_t mask = 0; + grpc_closure *cur; + int requested; + /* keep track of pollers that have requested our events, in case they change + */ + GRPC_FD_REF(fd, "poll"); + + gpr_mu_lock(&fd->mu); + + /* if we are shutdown, then don't add to the watcher set */ + if (fd->shutdown) { + watcher->fd = NULL; + watcher->pollset = NULL; + watcher->worker = NULL; + gpr_mu_unlock(&fd->mu); + GRPC_FD_UNREF(fd, "poll"); + return 0; + } + + /* if there is nobody polling for read, but we need to, then start doing so */ + cur = fd->read_closure; + requested = cur != CLOSURE_READY; + if (read_mask && fd->read_watcher == NULL && requested) { + fd->read_watcher = watcher; + mask |= read_mask; + } + /* if there is nobody polling for write, but we need to, then start doing so + */ + cur = fd->write_closure; + requested = cur != CLOSURE_READY; + if (write_mask && fd->write_watcher == NULL && requested) { + fd->write_watcher = watcher; + mask |= write_mask; + } + /* if not polling, remember this watcher in case we need someone to later */ + if (mask == 0 && worker != NULL) { + watcher->next = &fd->inactive_watcher_root; + watcher->prev = watcher->next->prev; + watcher->next->prev = watcher->prev->next = watcher; + } + watcher->pollset = pollset; + watcher->worker = worker; + watcher->fd = fd; + gpr_mu_unlock(&fd->mu); + + return mask; +} + +static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, + int got_read, int got_write) { + int was_polling = 0; + int kick = 0; + grpc_fd *fd = watcher->fd; + + if (fd == NULL) { + return; + } + + gpr_mu_lock(&fd->mu); + + if (watcher == fd->read_watcher) { + /* remove read watcher, kick if we still need a read */ + was_polling = 1; + if (!got_read) { + kick = 1; + } + fd->read_watcher = NULL; + } + if (watcher == fd->write_watcher) { + /* remove write watcher, kick if we still need a write */ + was_polling = 1; + if (!got_write) { + kick = 1; + } + fd->write_watcher = NULL; + } + if (!was_polling && watcher->worker != NULL) { + /* remove from inactive list */ + watcher->next->prev = watcher->prev; + watcher->prev->next = watcher->next; + } + if (got_read) { + if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) { + kick = 1; + } + } + if (got_write) { + if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) { + kick = 1; + } + } + if (kick) { + maybe_wake_one_watcher_locked(fd); + } + if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { + close_fd_locked(exec_ctx, fd); + } + gpr_mu_unlock(&fd->mu); + + GRPC_FD_UNREF(fd, "poll"); +} + +/******************************************************************************* + * pollset_posix.c + */ + +GPR_TLS_DECL(g_current_thread_poller); +GPR_TLS_DECL(g_current_thread_worker); + +static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev->next = worker->next; + worker->next->prev = worker->prev; +} + +static int pollset_has_workers(grpc_pollset *p) { + return p->root_worker.next != &p->root_worker; +} + +static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { + if (pollset_has_workers(p)) { + grpc_pollset_worker *w = p->root_worker.next; + remove_worker(p, w); + return w; + } else { + return NULL; + } +} + +static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->next = &p->root_worker; + worker->prev = worker->next->prev; + worker->prev->next = worker->next->prev = worker; +} + +static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev = &p->root_worker; + worker->next = worker->prev->next; + worker->prev->next = worker->next->prev = worker; +} + +static void pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) { + GPR_TIMER_BEGIN("pollset_kick_ext", 0); + + /* pollset->mu already held */ + if (specific_worker != NULL) { + if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0); + GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); + for (specific_worker = p->root_worker.next; + specific_worker != &p->root_worker; + specific_worker = specific_worker->next) { + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } + p->kicked_without_pollers = 1; + GPR_TIMER_END("pollset_kick_ext.broadcast", 0); + } else if (gpr_tls_get(&g_current_thread_worker) != + (intptr_t)specific_worker) { + GPR_TIMER_MARK("different_thread_worker", 0); + if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { + specific_worker->reevaluate_polling_on_wakeup = 1; + } + specific_worker->kicked_specifically = 1; + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { + GPR_TIMER_MARK("kick_yoself", 0); + if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { + specific_worker->reevaluate_polling_on_wakeup = 1; + } + specific_worker->kicked_specifically = 1; + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } + } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { + GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); + GPR_TIMER_MARK("kick_anonymous", 0); + specific_worker = pop_front_worker(p); + if (specific_worker != NULL) { + if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { + GPR_TIMER_MARK("kick_anonymous_not_self", 0); + push_back_worker(p, specific_worker); + specific_worker = pop_front_worker(p); + if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && + gpr_tls_get(&g_current_thread_worker) == + (intptr_t)specific_worker) { + push_back_worker(p, specific_worker); + specific_worker = NULL; + } + } + if (specific_worker != NULL) { + GPR_TIMER_MARK("finally_kick", 0); + push_back_worker(p, specific_worker); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + } + } else { + GPR_TIMER_MARK("kicked_no_pollers", 0); + p->kicked_without_pollers = 1; + } + } + + GPR_TIMER_END("pollset_kick_ext", 0); +} + +static void pollset_kick(grpc_pollset *p, + grpc_pollset_worker *specific_worker) { + pollset_kick_ext(p, specific_worker, 0); +} + +/* global state management */ + +static void pollset_global_init(void) { + gpr_tls_init(&g_current_thread_poller); + gpr_tls_init(&g_current_thread_worker); + grpc_wakeup_fd_init(&grpc_global_wakeup_fd); +} + +static void pollset_global_shutdown(void) { + grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); + gpr_tls_destroy(&g_current_thread_poller); + gpr_tls_destroy(&g_current_thread_worker); +} + +static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } + +/* main interface */ + +static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + gpr_mu_init(&pollset->mu); + *mu = &pollset->mu; + pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; + pollset->in_flight_cbs = 0; + pollset->shutting_down = 0; + pollset->called_shutdown = 0; + pollset->kicked_without_pollers = 0; + pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; + pollset->local_wakeup_cache = NULL; + pollset->kicked_without_pollers = 0; + pollset->fd_count = 0; + pollset->fd_capacity = 0; + pollset->del_count = 0; + pollset->del_capacity = 0; + pollset->fds = NULL; + pollset->dels = NULL; +} + +static void pollset_destroy(grpc_pollset *pollset) { + GPR_ASSERT(pollset->in_flight_cbs == 0); + GPR_ASSERT(!pollset_has_workers(pollset)); + GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); + while (pollset->local_wakeup_cache) { + grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; + grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); + gpr_free(pollset->local_wakeup_cache); + pollset->local_wakeup_cache = next; + } + gpr_free(pollset->fds); + gpr_free(pollset->dels); + gpr_mu_destroy(&pollset->mu); +} + +static void pollset_reset(grpc_pollset *pollset) { + GPR_ASSERT(pollset->shutting_down); + GPR_ASSERT(pollset->in_flight_cbs == 0); + GPR_ASSERT(!pollset_has_workers(pollset)); + GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); + GPR_ASSERT(pollset->fd_count == 0); + GPR_ASSERT(pollset->del_count == 0); + pollset->shutting_down = 0; + pollset->called_shutdown = 0; + pollset->kicked_without_pollers = 0; +} + +static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd) { + gpr_mu_lock(&pollset->mu); + size_t i; + /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ + for (i = 0; i < pollset->fd_count; i++) { + if (pollset->fds[i] == fd) goto exit; + } + if (pollset->fd_count == pollset->fd_capacity) { + pollset->fd_capacity = + GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2); + pollset->fds = + gpr_realloc(pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity); + } + pollset->fds[pollset->fd_count++] = fd; + GRPC_FD_REF(fd, "multipoller"); + pollset_kick(pollset, NULL); +exit: + gpr_mu_unlock(&pollset->mu); +} + +static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); + size_t i; + for (i = 0; i < pollset->fd_count; i++) { + GRPC_FD_UNREF(pollset->fds[i], "multipoller"); + } + for (i = 0; i < pollset->del_count; i++) { + GRPC_FD_UNREF(pollset->dels[i], "multipoller_del"); + } + pollset->fd_count = 0; + pollset->del_count = 0; + grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL); +} + +static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker_hdl, gpr_timespec now, + gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + + /* pollset->mu already held */ + int added_worker = 0; + int locked = 1; + int queued_work = 0; + int keep_polling = 0; + GPR_TIMER_BEGIN("pollset_work", 0); + /* this must happen before we (potentially) drop pollset->mu */ + worker.next = worker.prev = NULL; + worker.reevaluate_polling_on_wakeup = 0; + if (pollset->local_wakeup_cache != NULL) { + worker.wakeup_fd = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd->next; + } else { + worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); + grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + } + worker.kicked_specifically = 0; + /* If there's work waiting for the pollset to be idle, and the + pollset is idle, then do that work */ + if (!pollset_has_workers(pollset) && + !grpc_closure_list_empty(pollset->idle_jobs)) { + GPR_TIMER_MARK("pollset_work.idle_jobs", 0); + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); + goto done; + } + /* If we're shutting down then we don't execute any extended work */ + if (pollset->shutting_down) { + GPR_TIMER_MARK("pollset_work.shutting_down", 0); + goto done; + } + /* Give do_promote priority so we don't starve it out */ + if (pollset->in_flight_cbs) { + GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0); + gpr_mu_unlock(&pollset->mu); + locked = 0; + goto done; + } + /* Start polling, and keep doing so while we're being asked to + re-evaluate our pollers (this allows poll() based pollers to + ensure they don't miss wakeups) */ + keep_polling = 1; + gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); + while (keep_polling) { + keep_polling = 0; + if (!pollset->kicked_without_pollers) { + if (!added_worker) { + push_front_worker(pollset, &worker); + added_worker = 1; + gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); + } + GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); +#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) +#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) + + int timeout; + int r; + size_t i, j, fd_count; + nfds_t pfd_count; + /* TODO(ctiller): inline some elements to avoid an allocation */ + grpc_fd_watcher *watchers; + struct pollfd *pfds; + + timeout = poll_deadline_to_millis_timeout(deadline, now); + /* TODO(ctiller): perform just one malloc here if we exceed the inline + * case */ + pfds = gpr_malloc(sizeof(*pfds) * (pollset->fd_count + 2)); + watchers = gpr_malloc(sizeof(*watchers) * (pollset->fd_count + 2)); + fd_count = 0; + pfd_count = 2; + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); + pfds[0].events = POLLIN; + pfds[0].revents = 0; + pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd); + pfds[1].events = POLLIN; + pfds[1].revents = 0; + for (i = 0; i < pollset->fd_count; i++) { + int remove = fd_is_orphaned(pollset->fds[i]); + for (j = 0; !remove && j < pollset->del_count; j++) { + if (pollset->fds[i] == pollset->dels[j]) remove = 1; + } + if (remove) { + GRPC_FD_UNREF(pollset->fds[i], "multipoller"); + } else { + pollset->fds[fd_count++] = pollset->fds[i]; + watchers[pfd_count].fd = pollset->fds[i]; + GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start"); + pfds[pfd_count].fd = pollset->fds[i]->fd; + pfds[pfd_count].revents = 0; + pfd_count++; + } + } + for (j = 0; j < pollset->del_count; j++) { + GRPC_FD_UNREF(pollset->dels[j], "multipoller_del"); + } + pollset->del_count = 0; + pollset->fd_count = fd_count; + gpr_mu_unlock(&pollset->mu); + + for (i = 2; i < pfd_count; i++) { + grpc_fd *fd = watchers[i].fd; + pfds[i].events = (short)fd_begin_poll(fd, pollset, &worker, POLLIN, + POLLOUT, &watchers[i]); + GRPC_FD_UNREF(fd, "multipoller_start"); + } + + /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid + even going into the blocking annotation if possible */ + GRPC_SCHEDULING_START_BLOCKING_REGION; + r = grpc_poll_function(pfds, pfd_count, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; + + if (r < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } + for (i = 2; i < pfd_count; i++) { + fd_end_poll(exec_ctx, &watchers[i], 0, 0); + } + } else if (r == 0) { + for (i = 2; i < pfd_count; i++) { + fd_end_poll(exec_ctx, &watchers[i], 0, 0); + } + } else { + if (pfds[0].revents & POLLIN_CHECK) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } + if (pfds[1].revents & POLLIN_CHECK) { + grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd); + } + for (i = 2; i < pfd_count; i++) { + if (watchers[i].fd == NULL) { + fd_end_poll(exec_ctx, &watchers[i], 0, 0); + } else { + fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, + pfds[i].revents & POLLOUT_CHECK); + } + } + } + + gpr_free(pfds); + gpr_free(watchers); + GPR_TIMER_END("maybe_work_and_unlock", 0); + locked = 0; + } else { + GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0); + pollset->kicked_without_pollers = 0; + } + /* Finished execution - start cleaning up. + Note that we may arrive here from outside the enclosing while() loop. + In that case we won't loop though as we haven't added worker to the + worker list, which means nobody could ask us to re-evaluate polling). */ + done: + if (!locked) { + queued_work |= grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->mu); + locked = 1; + } + /* If we're forced to re-evaluate polling (via pollset_kick with + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force + a loop */ + if (worker.reevaluate_polling_on_wakeup) { + worker.reevaluate_polling_on_wakeup = 0; + pollset->kicked_without_pollers = 0; + if (queued_work || worker.kicked_specifically) { + /* If there's queued work on the list, then set the deadline to be + immediate so we get back out of the polling loop quickly */ + deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); + } + keep_polling = 1; + } + if (keep_polling) { + now = gpr_now(now.clock_type); + } + } + gpr_tls_set(&g_current_thread_poller, 0); + if (added_worker) { + remove_worker(pollset, &worker); + gpr_tls_set(&g_current_thread_worker, 0); + } + /* release wakeup fd to the local pool */ + worker.wakeup_fd->next = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd; + /* check shutdown conditions */ + if (pollset->shutting_down) { + if (pollset_has_workers(pollset)) { + pollset_kick(pollset, NULL); + } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { + pollset->called_shutdown = 1; + gpr_mu_unlock(&pollset->mu); + finish_shutdown(exec_ctx, pollset); + grpc_exec_ctx_flush(exec_ctx); + /* Continuing to access pollset here is safe -- it is the caller's + * responsibility to not destroy when it has outstanding calls to + * pollset_work. + * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ + gpr_mu_lock(&pollset->mu); + } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); + gpr_mu_unlock(&pollset->mu); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->mu); + } + } + *worker_hdl = NULL; + GPR_TIMER_END("pollset_work", 0); +} + +static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure) { + GPR_ASSERT(!pollset->shutting_down); + pollset->shutting_down = 1; + pollset->shutdown_done = closure; + pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); + if (!pollset_has_workers(pollset)) { + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); + } + if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && + !pollset_has_workers(pollset)) { + pollset->called_shutdown = 1; + finish_shutdown(exec_ctx, pollset); + } +} + +static int poll_deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now) { + gpr_timespec timeout; + static const int64_t max_spin_polling_us = 10; + if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { + return -1; + } + if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( + max_spin_polling_us, + GPR_TIMESPAN))) <= 0) { + return 0; + } + timeout = gpr_time_sub(deadline, now); + return gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); +} + +/******************************************************************************* + * pollset_set_posix.c + */ + +static grpc_pollset_set *pollset_set_create(void) { + grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); + memset(pollset_set, 0, sizeof(*pollset_set)); + gpr_mu_init(&pollset_set->mu); + return pollset_set; +} + +static void pollset_set_destroy(grpc_pollset_set *pollset_set) { + size_t i; + gpr_mu_destroy(&pollset_set->mu); + for (i = 0; i < pollset_set->fd_count; i++) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); + } + gpr_free(pollset_set->pollsets); + gpr_free(pollset_set->pollset_sets); + gpr_free(pollset_set->fds); + gpr_free(pollset_set); +} + +static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset) { + size_t i, j; + gpr_mu_lock(&pollset_set->mu); + if (pollset_set->pollset_count == pollset_set->pollset_capacity) { + pollset_set->pollset_capacity = + GPR_MAX(8, 2 * pollset_set->pollset_capacity); + pollset_set->pollsets = + gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity * + sizeof(*pollset_set->pollsets)); + } + pollset_set->pollsets[pollset_set->pollset_count++] = pollset; + for (i = 0, j = 0; i < pollset_set->fd_count; i++) { + if (fd_is_orphaned(pollset_set->fds[i])) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); + } else { + pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); + pollset_set->fds[j++] = pollset_set->fds[i]; + } + } + pollset_set->fd_count = j; + gpr_mu_unlock(&pollset_set->mu); +} + +static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset) { + size_t i; + gpr_mu_lock(&pollset_set->mu); + for (i = 0; i < pollset_set->pollset_count; i++) { + if (pollset_set->pollsets[i] == pollset) { + pollset_set->pollset_count--; + GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i], + pollset_set->pollsets[pollset_set->pollset_count]); + break; + } + } + gpr_mu_unlock(&pollset_set->mu); +} + +static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i, j; + gpr_mu_lock(&bag->mu); + if (bag->pollset_set_count == bag->pollset_set_capacity) { + bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); + bag->pollset_sets = + gpr_realloc(bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + } + bag->pollset_sets[bag->pollset_set_count++] = item; + for (i = 0, j = 0; i < bag->fd_count; i++) { + if (fd_is_orphaned(bag->fds[i])) { + GRPC_FD_UNREF(bag->fds[i], "pollset_set"); + } else { + pollset_set_add_fd(exec_ctx, item, bag->fds[i]); + bag->fds[j++] = bag->fds[i]; + } + } + bag->fd_count = j; + gpr_mu_unlock(&bag->mu); +} + +static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i; + gpr_mu_lock(&bag->mu); + for (i = 0; i < bag->pollset_set_count; i++) { + if (bag->pollset_sets[i] == item) { + bag->pollset_set_count--; + GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], + bag->pollset_sets[bag->pollset_set_count]); + break; + } + } + gpr_mu_unlock(&bag->mu); +} + +static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, grpc_fd *fd) { + size_t i; + gpr_mu_lock(&pollset_set->mu); + if (pollset_set->fd_count == pollset_set->fd_capacity) { + pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity); + pollset_set->fds = gpr_realloc( + pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds)); + } + GRPC_FD_REF(fd, "pollset_set"); + pollset_set->fds[pollset_set->fd_count++] = fd; + for (i = 0; i < pollset_set->pollset_count; i++) { + pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); + } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } + gpr_mu_unlock(&pollset_set->mu); +} + +static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, grpc_fd *fd) { + size_t i; + gpr_mu_lock(&pollset_set->mu); + for (i = 0; i < pollset_set->fd_count; i++) { + if (pollset_set->fds[i] == fd) { + pollset_set->fd_count--; + GPR_SWAP(grpc_fd *, pollset_set->fds[i], + pollset_set->fds[pollset_set->fd_count]); + GRPC_FD_UNREF(fd, "pollset_set"); + break; + } + } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } + gpr_mu_unlock(&pollset_set->mu); +} + +/******************************************************************************* + * event engine binding + */ + +static void shutdown_engine(void) { pollset_global_shutdown(); } + +static const grpc_event_engine_vtable vtable = { + .pollset_size = sizeof(grpc_pollset), + + .fd_create = fd_create, + .fd_wrapped_fd = fd_wrapped_fd, + .fd_orphan = fd_orphan, + .fd_shutdown = fd_shutdown, + .fd_notify_on_read = fd_notify_on_read, + .fd_notify_on_write = fd_notify_on_write, + + .pollset_init = pollset_init, + .pollset_shutdown = pollset_shutdown, + .pollset_reset = pollset_reset, + .pollset_destroy = pollset_destroy, + .pollset_work = pollset_work, + .pollset_kick = pollset_kick, + .pollset_add_fd = pollset_add_fd, + + .pollset_set_create = pollset_set_create, + .pollset_set_destroy = pollset_set_destroy, + .pollset_set_add_pollset = pollset_set_add_pollset, + .pollset_set_del_pollset = pollset_set_del_pollset, + .pollset_set_add_pollset_set = pollset_set_add_pollset_set, + .pollset_set_del_pollset_set = pollset_set_del_pollset_set, + .pollset_set_add_fd = pollset_set_add_fd, + .pollset_set_del_fd = pollset_set_del_fd, + + .kick_poller = kick_poller, + + .shutdown_engine = shutdown_engine, +}; + +const grpc_event_engine_vtable *grpc_init_poll_posix(void) { + pollset_global_init(); + return &vtable; +} + +#endif diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h new file mode 100644 index 0000000000..291736a2db --- /dev/null +++ b/src/core/lib/iomgr/ev_poll_posix.h @@ -0,0 +1,41 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H +#define GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H + +#include "src/core/lib/iomgr/ev_posix.h" + +const grpc_event_engine_vtable *grpc_init_poll_posix(void); + +#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */ diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 7df1751352..a7dfc9552d 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -37,23 +37,104 @@ #include "src/core/lib/iomgr/ev_posix.h" +#include <string.h> + +#include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> #include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h" +#include "src/core/lib/iomgr/ev_poll_posix.h" +#include "src/core/lib/support/env.h" + +/** Default poll() function - a pointer so that it can be overridden by some + * tests */ +grpc_poll_function_type grpc_poll_function = poll; static const grpc_event_engine_vtable *g_event_engine; -grpc_poll_function_type grpc_poll_function = poll; +typedef const grpc_event_engine_vtable *(*event_engine_factory_fn)(void); + +typedef struct { + const char *name; + event_engine_factory_fn factory; +} event_engine_factory; + +static const event_engine_factory g_factories[] = { + {"poll", grpc_init_poll_posix}, {"legacy", grpc_init_poll_and_epoll_posix}, +}; + +static void add(const char *beg, const char *end, char ***ss, size_t *ns) { + size_t n = *ns; + size_t np = n + 1; + char *s; + size_t len; + GPR_ASSERT(end >= beg); + len = (size_t)(end - beg); + s = gpr_malloc(len + 1); + memcpy(s, beg, len); + s[len] = 0; + *ss = gpr_realloc(*ss, sizeof(char **) * np); + (*ss)[n] = s; + *ns = np; +} + +static void split(const char *s, char ***ss, size_t *ns) { + const char *c = strchr(s, ','); + if (c == NULL) { + add(s, s + strlen(s), ss, ns); + } else { + add(s, c, ss, ns); + split(c + 1, ss, ns); + } +} + +static bool is(const char *want, const char *have) { + return 0 == strcmp(want, "all") || 0 == strcmp(want, have); +} + +static void try_engine(const char *engine) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) { + if (is(engine, g_factories[i].name)) { + if ((g_event_engine = g_factories[i].factory())) { + gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name); + return; + } + } + } +} void grpc_event_engine_init(void) { - if ((g_event_engine = grpc_init_poll_and_epoll_posix())) { - return; + char *s = gpr_getenv("GRPC_POLL_STRATEGY"); + if (s == NULL) { + s = gpr_strdup("all"); + } + + char **strings = NULL; + size_t nstrings = 0; + split(s, &strings, &nstrings); + + for (size_t i = 0; g_event_engine == NULL && i < nstrings; i++) { + try_engine(strings[i]); + } + + for (size_t i = 0; i < nstrings; i++) { + gpr_free(strings[i]); + } + gpr_free(strings); + gpr_free(s); + + if (g_event_engine == NULL) { + gpr_log(GPR_ERROR, "No event engine could be initialized"); + abort(); } - gpr_log(GPR_ERROR, "No event engine could be initialized"); - abort(); } -void grpc_event_engine_shutdown(void) { g_event_engine->shutdown_engine(); } +void grpc_event_engine_shutdown(void) { + g_event_engine->shutdown_engine(); + g_event_engine = NULL; +} grpc_fd *grpc_fd_create(int fd, const char *name) { return g_event_engine->fd_create(fd, name); diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 2146c7dd1f..e451479073 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -39,6 +39,22 @@ #include "src/core/lib/profiling/timers.h" +bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) { + if (!exec_ctx->cached_ready_to_finish) { + exec_ctx->cached_ready_to_finish = exec_ctx->check_ready_to_finish( + exec_ctx, exec_ctx->check_ready_to_finish_arg); + } + return exec_ctx->cached_ready_to_finish; +} + +bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) { + return false; +} + +bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) { + return true; +} + #ifndef GRPC_EXECUTION_CONTEXT_SANITIZER bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { bool did_something = 0; @@ -61,6 +77,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { } void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { + exec_ctx->cached_ready_to_finish = true; grpc_exec_ctx_flush(exec_ctx); } diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 976cc40347..9d47a262f8 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -53,6 +53,9 @@ typedef struct grpc_workqueue grpc_workqueue; * - track a list of work that needs to be delayed until the top of the * call stack (this provides a convenient mechanism to run callbacks * without worrying about locking issues) + * - provide a decision maker (via grpc_exec_ctx_ready_to_finish) that provides + * signal as to whether a borrowed thread should continue to do work or + * should actively try to finish up and get this thread back to its owner * * CONVENTIONS: * Instance of this must ALWAYS be constructed on the stack, never @@ -63,18 +66,26 @@ typedef struct grpc_workqueue grpc_workqueue; */ struct grpc_exec_ctx { grpc_closure_list closure_list; + bool cached_ready_to_finish; + void *check_ready_to_finish_arg; + bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); }; -#define GRPC_EXEC_CTX_INIT \ - { GRPC_CLOSURE_LIST_INIT } +#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ + { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check } #else struct grpc_exec_ctx { - int unused; + bool cached_ready_to_finish; + void *check_ready_to_finish_arg; + bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); }; -#define GRPC_EXEC_CTX_INIT \ - { 0 } +#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ + { false, finish_check_arg, finish_check } #endif +#define GRPC_EXEC_CTX_INIT \ + GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL) + /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. * Returns true if work was performed, false otherwise. */ @@ -86,6 +97,14 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, bool success, grpc_workqueue *offload_target_or_null); +/** Returns true if we'd like to leave this execution context as soon as + possible: useful for deciding whether to do something more or not depending + on outside context */ +bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx); +/** A finish check that is never ready to finish */ +bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored); +/** A finish check that is always ready to finish */ +bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored); /** Add a list of closures to be executed at the next flush/finish point. * Leaves \a list empty. */ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c index 016c501f75..cede97f4c6 100644 --- a/src/core/lib/iomgr/iomgr_posix.c +++ b/src/core/lib/iomgr/iomgr_posix.c @@ -41,12 +41,16 @@ #include "src/core/lib/iomgr/tcp_posix.h" void grpc_iomgr_platform_init(void) { + grpc_wakeup_fd_global_init(); grpc_event_engine_init(); grpc_register_tracer("tcp", &grpc_tcp_trace); } void grpc_iomgr_platform_flush(void) {} -void grpc_iomgr_platform_shutdown(void) { grpc_event_engine_shutdown(); } +void grpc_iomgr_platform_shutdown(void) { + grpc_event_engine_shutdown(); + grpc_wakeup_fd_global_destroy(); +} #endif /* GRPC_POSIX_SOCKET */ diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index df6cf956d9..98ffccd59b 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -81,6 +81,7 @@ typedef struct { grpc_closure read_closure; grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; + grpc_udp_server_orphan_cb orphan_cb; } server_port; /* the overall server */ @@ -168,6 +169,10 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { server_port *sp = &s->ports[i]; sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; + + GPR_ASSERT(sp->orphan_cb); + sp->orphan_cb(sp->emfd); + grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, "udp_listener_shutdown"); } @@ -268,7 +273,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { static int add_socket_to_server(grpc_udp_server *s, int fd, const struct sockaddr *addr, size_t addr_len, - grpc_udp_server_read_cb read_cb) { + grpc_udp_server_read_cb read_cb, + grpc_udp_server_orphan_cb orphan_cb) { server_port *sp; int port; char *addr_str; @@ -292,6 +298,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; sp->read_cb = read_cb; + sp->orphan_cb = orphan_cb; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); gpr_free(name); @@ -301,7 +308,8 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, } int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, - size_t addr_len, grpc_udp_server_read_cb read_cb) { + size_t addr_len, grpc_udp_server_read_cb read_cb, + grpc_udp_server_orphan_cb orphan_cb) { int allocated_port1 = -1; int allocated_port2 = -1; unsigned i; @@ -348,7 +356,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, addr = (struct sockaddr *)&wild6; addr_len = sizeof(wild6); fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); - allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb); + allocated_port1 = + add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -370,7 +379,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, addr = (struct sockaddr *)&addr4_copy; addr_len = sizeof(addr4_copy); } - allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb); + allocated_port2 = + add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb); done: gpr_free(allocated_addr); diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index d8cf957a22..33c5ce11cd 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -48,6 +48,9 @@ typedef struct grpc_udp_server grpc_udp_server; typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, struct grpc_server *server); +/* Called when the grpc_fd is about to be orphaned (and the FD closed). */ +typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd); + /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); @@ -69,7 +72,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); /* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, - size_t addr_len, grpc_udp_server_read_cb read_cb); + size_t addr_len, grpc_udp_server_read_cb read_cb, + grpc_udp_server_orphan_cb orphan_cb); void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, grpc_closure *on_done); diff --git a/src/core/lib/support/string_util_win32.c b/src/core/lib/support/string_util_win32.c index f3cb0c050f..0d7bcdb5aa 100644 --- a/src/core/lib/support/string_util_win32.c +++ b/src/core/lib/support/string_util_win32.c @@ -83,7 +83,7 @@ char *gpr_format_message(int messageid) { DWORD status = FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + NULL, (DWORD)messageid, MAKELANGID(LANG_ENGLISH, SUBLANG_DEFAULT), (LPTSTR)(&tmessage), 0, NULL); if (status == 0) return gpr_strdup("Unable to retrieve error string"); message = gpr_tchar_to_char(tmessage); diff --git a/src/core/lib/surface/byte_buffer_reader.c b/src/core/lib/surface/byte_buffer_reader.c index 809fd5f1fa..c97079f638 100644 --- a/src/core/lib/surface/byte_buffer_reader.c +++ b/src/core/lib/surface/byte_buffer_reader.c @@ -62,12 +62,19 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, case GRPC_BB_RAW: gpr_slice_buffer_init(&decompressed_slices_buffer); if (is_compressed(reader->buffer_in)) { - grpc_msg_decompress(reader->buffer_in->data.raw.compression, - &reader->buffer_in->data.raw.slice_buffer, - &decompressed_slices_buffer); - reader->buffer_out = - grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, - decompressed_slices_buffer.count); + if (grpc_msg_decompress(reader->buffer_in->data.raw.compression, + &reader->buffer_in->data.raw.slice_buffer, + &decompressed_slices_buffer) == 0) { + gpr_log(GPR_ERROR, + "Unexpected error decompressing data for algorithm with enum " + "value '%d'. Reading data as if it were uncompressed.", + reader->buffer_in->data.raw.compression); + reader->buffer_out = reader->buffer_in; + } else { /* all fine */ + reader->buffer_out = + grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices, + decompressed_slices_buffer.count); + } gpr_slice_buffer_destroy(&decompressed_slices_buffer); } else { /* not compressed, use the input buffer as output */ reader->buffer_out = reader->buffer_in; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 9b2b94eedf..c8728fa278 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -261,6 +261,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, call->channel = channel; call->cq = cq; call->parent = parent_call; + /* Always support no compression */ + GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = server_transport_data == NULL; if (call->is_client) { GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); @@ -408,6 +410,7 @@ static void set_status_code(grpc_call *call, status_source source, static void set_compression_algorithm(grpc_call *call, grpc_compression_algorithm algo) { + GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT); call->compression_algorithm = algo; } @@ -828,12 +831,16 @@ static uint32_t decode_status(grpc_mdelem *md) { return status; } -static uint32_t decode_compression(grpc_mdelem *md) { +static grpc_compression_algorithm decode_compression(grpc_mdelem *md) { grpc_compression_algorithm algorithm = grpc_compression_algorithm_from_mdstr(md->value); if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); + gpr_log(GPR_ERROR, + "Invalid incoming compression algorithm: '%s'. Interpreting " + "incoming data as uncompressed.", + md_c_str); + return GRPC_COMPRESS_NONE; } return algorithm; } @@ -1087,6 +1094,24 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; grpc_metadata_batch_filter(md, recv_initial_filter, call); + /* make sure the received grpc-encoding is amongst the ones listed in + * grpc-accept-encoding */ + + GPR_ASSERT(call->encodings_accepted_by_peer != 0); + if (!GPR_BITGET(call->encodings_accepted_by_peer, + call->compression_algorithm)) { + extern int grpc_compression_trace; + if (grpc_compression_trace) { + char *algo_name; + grpc_compression_algorithm_name(call->compression_algorithm, + &algo_name); + gpr_log(GPR_ERROR, + "Compression algorithm (grpc-encoding = '%s') not present in " + "the bitset of accepted encodings (grpc-accept-encodings: " + "'0x%x')", + algo_name, call->encodings_accepted_by_peer); + } + } if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { @@ -1474,7 +1499,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, grpc_call_error err; GRPC_API_TRACE( - "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)", + "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, " + "reserved=%p)", 5, (call, ops, (unsigned long)nops, tag, reserved)); if (reserved != NULL) { diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 57c6897626..1c8b709015 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -164,7 +164,7 @@ void grpc_init(void) { grpc_register_tracer("channel_stack_builder", &grpc_trace_channel_stack_builder); grpc_register_tracer("http1", &grpc_http1_trace); - grpc_register_tracer("compression", &grpc_compress_filter_trace); + grpc_register_tracer("compression", &grpc_compression_trace); grpc_security_pre_init(); grpc_iomgr_init(); grpc_executor_init(); |