diff options
author | Craig Tiller <ctiller@google.com> | 2016-02-25 07:16:24 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-02-25 07:16:24 -0800 |
commit | a3f344201051082f806bd31c02a6d0754a2754ea (patch) | |
tree | 200ff21cbb3c9f988be5fd25810369b5f6de707c /src/core | |
parent | 1a969c86f4ccc048bedb26c0ac24e2788f88343f (diff) | |
parent | c46beaaa29af84f676bde9d013a85dffed1d58c9 (diff) |
Merge branch 'hide-the-pollset-set' into cleaner-posix2
Diffstat (limited to 'src/core')
29 files changed, 1000 insertions, 262 deletions
diff --git a/src/core/census/log.c b/src/core/census/mlog.c index 91b26941b8..a2cc46d3f2 100644 --- a/src/core/census/log.c +++ b/src/core/census/mlog.c @@ -88,7 +88,7 @@ // include the name of the structure, which will be passed as the first // argument. E.g. cl_block_initialize() will initialize a cl_block. -#include "src/core/census/log.h" +#include "src/core/census/mlog.h" #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/cpu.h> diff --git a/src/core/census/log.h b/src/core/census/mlog.h index 05daea066f..aaba9e1535 100644 --- a/src/core/census/log.h +++ b/src/core/census/mlog.h @@ -31,8 +31,10 @@ * */ -#ifndef GRPC_INTERNAL_CORE_CENSUS_LOG_H -#define GRPC_INTERNAL_CORE_CENSUS_LOG_H +/* A very fast in-memory log, optimized for multiple writers. */ + +#ifndef GRPC_INTERNAL_CORE_CENSUS_MLOG_H +#define GRPC_INTERNAL_CORE_CENSUS_MLOG_H #include <grpc/support/port_platform.h> #include <stddef.h> diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 7176c01b05..a96b49ac12 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -78,8 +78,8 @@ typedef struct client_channel_channel_data { int exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack *owning_stack; - /** interested parties */ - grpc_pollset_set interested_parties; + /** interested parties (owned) */ + grpc_pollset_set *interested_parties; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -183,8 +183,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; if (lb_policy != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, - &chand->interested_parties); + grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, + chand->interested_parties); } gpr_mu_lock(&chand->mu_config); @@ -231,9 +231,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, - &old_lb_policy->interested_parties, - &chand->interested_parties); + grpc_pollset_set_del_pollset_set( + exec_ctx, old_lb_policy->interested_parties, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -254,7 +253,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, GPR_ASSERT(op->set_accept_stream == NULL); if (op->bind_pollset != NULL) { - grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, op->bind_pollset); } @@ -284,8 +283,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, - &chand->lb_policy->interested_parties, - &chand->interested_parties); + chand->lb_policy->interested_parties, + chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; } @@ -411,7 +410,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); - grpc_pollset_set_init(&chand->interested_parties); + chand->interested_parties = grpc_pollset_set_create(); } /* Destructor for channel_data */ @@ -425,12 +424,12 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, - &chand->lb_policy->interested_parties, - &chand->interested_parties); + chand->lb_policy->interested_parties, + chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(&chand->interested_parties); + grpc_pollset_set_destroy(chand->interested_parties); gpr_mu_destroy(&chand->mu_config); } @@ -441,9 +440,17 @@ static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } const grpc_channel_filter grpc_client_channel_filter = { - cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data), - init_call_elem, cc_set_pollset, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, cc_get_peer, "client-channel", + cc_start_transport_stream_op, + cc_start_transport_op, + sizeof(call_data), + init_call_elem, + cc_set_pollset, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + cc_get_peer, + "client-channel", }; void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, @@ -501,7 +508,7 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, w->pollset); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); @@ -517,7 +524,7 @@ void grpc_client_channel_watch_connectivity_state( w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; - grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); grpc_closure_init(&w->my_closure, on_external_watch_complete, w); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); diff --git a/src/core/client_config/lb_policies/load_balancer_api.c b/src/core/client_config/lb_policies/load_balancer_api.c new file mode 100644 index 0000000000..a6b5785fe4 --- /dev/null +++ b/src/core/client_config/lb_policies/load_balancer_api.c @@ -0,0 +1,163 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/lb_policies/load_balancer_api.h" +#include "third_party/nanopb/pb_decode.h" +#include "third_party/nanopb/pb_encode.h" + +#include <grpc/support/alloc.h> + +typedef struct decode_serverlist_arg { + int first_pass; + int i; + size_t num_servers; + grpc_grpclb_server **servers; +} decode_serverlist_arg; + +/* invoked once for every Server in ServerList */ +static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, + void **arg) { + decode_serverlist_arg *dec_arg = *arg; + if (dec_arg->first_pass != 0) { /* first pass */ + grpc_grpclb_server server; + if (!pb_decode(stream, grpc_lb_v0_Server_fields, &server)) { + return false; + } + dec_arg->num_servers++; + } else { /* second pass */ + grpc_grpclb_server *server = gpr_malloc(sizeof(grpc_grpclb_server)); + GPR_ASSERT(dec_arg->num_servers > 0); + if (dec_arg->i == 0) { /* first iteration of second pass */ + dec_arg->servers = + gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers); + } + if (!pb_decode(stream, grpc_lb_v0_Server_fields, server)) { + return false; + } + dec_arg->servers[dec_arg->i++] = server; + } + + return true; +} + +grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name) { + grpc_grpclb_request *req = gpr_malloc(sizeof(grpc_grpclb_request)); + + req->has_client_stats = 0; /* TODO(dgq): add support for stats once defined */ + req->has_initial_request = 1; + req->initial_request.has_name = 1; + strncpy(req->initial_request.name, lb_service_name, + GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH); + return req; +} + +gpr_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { + size_t encoded_length; + pb_ostream_t sizestream; + pb_ostream_t outputstream; + gpr_slice slice; + memset(&sizestream, 0, sizeof(pb_ostream_t)); + pb_encode(&sizestream, grpc_lb_v0_LoadBalanceRequest_fields, request); + encoded_length = sizestream.bytes_written; + + slice = gpr_slice_malloc(encoded_length); + outputstream = + pb_ostream_from_buffer(GPR_SLICE_START_PTR(slice), encoded_length); + GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v0_LoadBalanceRequest_fields, + request) != 0); + return slice; +} + +void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { + gpr_free(request); +} + +grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response) { + bool status; + pb_istream_t stream = + pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_response), + GPR_SLICE_LENGTH(encoded_response)); + grpc_grpclb_response *res = gpr_malloc(sizeof(grpc_grpclb_response)); + memset(res, 0, sizeof(*res)); + status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res); + GPR_ASSERT(status == true); + return res; +} + +grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( + gpr_slice encoded_response) { + grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist)); + bool status; + decode_serverlist_arg arg; + pb_istream_t stream = + pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_response), + GPR_SLICE_LENGTH(encoded_response)); + pb_istream_t stream_at_start = stream; + grpc_grpclb_response *res = gpr_malloc(sizeof(grpc_grpclb_response)); + memset(res, 0, sizeof(*res)); + memset(&arg, 0, sizeof(decode_serverlist_arg)); + + res->server_list.servers.funcs.decode = decode_serverlist; + res->server_list.servers.arg = &arg; + arg.first_pass = 1; + status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res); + GPR_ASSERT(status == true); + GPR_ASSERT(arg.num_servers > 0); + + arg.first_pass = 0; + status = + pb_decode(&stream_at_start, grpc_lb_v0_LoadBalanceResponse_fields, res); + GPR_ASSERT(status == true); + GPR_ASSERT(arg.servers != NULL); + + sl->num_servers = arg.num_servers; + sl->servers = arg.servers; + if (res->server_list.has_expiration_interval) { + sl->expiration_interval = res->server_list.expiration_interval; + } + grpc_grpclb_response_destroy(res); + return sl; +} + +void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist) { + size_t i; + for (i = 0; i < serverlist->num_servers; i++) { + gpr_free(serverlist->servers[i]); + } + gpr_free(serverlist->servers); + gpr_free(serverlist); +} + +void grpc_grpclb_response_destroy(grpc_grpclb_response *response) { + gpr_free(response); +} diff --git a/src/core/client_config/lb_policies/load_balancer_api.h b/src/core/client_config/lb_policies/load_balancer_api.h new file mode 100644 index 0000000000..4dbe1d6c22 --- /dev/null +++ b/src/core/client_config/lb_policies/load_balancer_api.h @@ -0,0 +1,85 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICIES_LOAD_BALANCER_API_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICIES_LOAD_BALANCER_API_H + +#include <grpc/support/slice_buffer.h> + +#include "src/core/client_config/lb_policy_factory.h" +#include "src/core/proto/grpc/lb/v0/load_balancer.pb.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128 + +typedef grpc_lb_v0_LoadBalanceRequest grpc_grpclb_request; +typedef grpc_lb_v0_LoadBalanceResponse grpc_grpclb_response; +typedef grpc_lb_v0_Server grpc_grpclb_server; +typedef grpc_lb_v0_Duration grpc_grpclb_duration; +typedef struct grpc_grpclb_serverlist { + grpc_grpclb_server **servers; + size_t num_servers; + grpc_grpclb_duration expiration_interval; +} grpc_grpclb_serverlist; + +/** Create a request for a gRPC LB service under \a lb_service_name */ +grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name); + +/** Protocol Buffers v3-encode \a request */ +gpr_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request); + +/** Destroy \a request */ +void grpc_grpclb_request_destroy(grpc_grpclb_request *request); + +/** Parse (ie, decode) the bytes in \a encoded_response as a \a + * grpc_grpclb_response */ +grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response); + +/** Destroy \a serverlist */ +void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); + +/** Parse the list of servers from an encoded \a grpc_grpclb_response */ +grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( + gpr_slice encoded_response); + +/** Destroy \a response */ +void grpc_grpclb_response_destroy(grpc_grpclb_response *response); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICIES_LOAD_BALANCER_API_H */ diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 459bbebb68..9f38f398d8 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -31,8 +31,8 @@ * */ -#include "src/core/client_config/lb_policy_factory.h" #include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_factory.h" #include <string.h> @@ -119,7 +119,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); @@ -137,7 +137,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); @@ -158,7 +158,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } @@ -195,8 +195,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, - pollset); + grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -253,7 +252,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_connectivity, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, &p->base.interested_parties, + exec_ctx, selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); @@ -278,13 +277,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, &p->base.interested_parties, + exec_ctx, selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: @@ -298,7 +297,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { goto loop; @@ -311,7 +310,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], - &p->base.interested_parties, &p->checking_connectivity, + p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: @@ -379,8 +378,14 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_ping_one, pf_exit_idle, - pf_check_connectivity, pf_notify_on_state_change}; + pf_destroy, + pf_shutdown, + pf_pick, + pf_cancel_pick, + pf_ping_one, + pf_exit_idle, + pf_check_connectivity, + pf_notify_on_state_change}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index b1171c45b0..114ece6e4d 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -260,7 +260,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); @@ -285,7 +285,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { subchannel_data *sd = p->subchannels[i]; sd->connectivity_state = GRPC_CHANNEL_IDLE; grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); } @@ -322,8 +322,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, - pollset); + grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -374,13 +373,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, + grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_CONNECTING: @@ -389,13 +388,13 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, sd->connectivity_state, "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, &p->base.interested_parties, + exec_ctx, sd->subchannel, p->base.interested_parties, &sd->connectivity_state, &sd->connectivity_changed_closure); /* remove from ready list if still present */ @@ -484,8 +483,14 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle, - rr_check_connectivity, rr_notify_on_state_change}; + rr_destroy, + rr_shutdown, + rr_pick, + rr_cancel_pick, + rr_ping_one, + rr_exit_idle, + rr_check_connectivity, + rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index d4672f6b25..5ff623e006 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -39,7 +39,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); - grpc_pollset_set_init(&policy->interested_parties); + policy->interested_parties = grpc_pollset_set_create(); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -93,7 +93,7 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { - grpc_pollset_set_destroy(&policy->interested_parties); + grpc_pollset_set_destroy(policy->interested_parties); policy->vtable->destroy(exec_ctx, policy); } } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index db5238c8ca..4fbb12da39 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -48,7 +48,8 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_atm ref_pair; - grpc_pollset_set interested_parties; + /* owned pointer to interested parties in load balancing decisions */ + grpc_pollset_set *interested_parties; }; struct grpc_lb_policy_vtable { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 6599c75dba..291ad3472c 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -108,7 +108,7 @@ struct grpc_subchannel { /** pollset_set tracking who's interested in a connection being setup */ - grpc_pollset_set pollset_set; + grpc_pollset_set *pollset_set; /** active connection, or null; of type grpc_connected_subchannel */ gpr_atm connected_subchannel; @@ -184,8 +184,8 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -void grpc_connected_subchannel_ref(grpc_connected_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_ref( + grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } @@ -209,7 +209,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_slice_unref(c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); - grpc_pollset_set_destroy(&c->pollset_set); + grpc_pollset_set_destroy(c->pollset_set); grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -226,8 +226,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, return old_val; } -grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_ref( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); @@ -235,8 +235,8 @@ grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c return c; } -grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_weak_ref( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); @@ -326,7 +326,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); - grpc_pollset_set_init(&c->pollset_set); + c->pollset_set = grpc_pollset_set_create(); c->addr_len = args->addr_len; grpc_set_initial_connect_string(&c->addr, &c->addr_len, &c->initial_connect_string); @@ -345,7 +345,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connect_in_args args; - args.interested_parties = &c->pollset_set; + args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; args.deadline = compute_connect_deadline(c); @@ -379,7 +379,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, external_state_watcher *w = arg; grpc_closure *follow_up = w->notify; if (w->pollset_set != NULL) { - grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, + grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set, w->pollset_set); } gpr_mu_lock(&w->subchannel->mu); @@ -415,7 +415,7 @@ void grpc_subchannel_notify_on_state_change( w->notify = notify; grpc_closure_init(&w->closure, on_external_state_watcher_done, w); if (interested_parties != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, + grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set, interested_parties); } GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); @@ -573,7 +573,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( - exec_ctx, con, &c->pollset_set, &sw_subchannel->connectivity_state, + exec_ctx, con, c->pollset_set, &sw_subchannel->connectivity_state, &sw_subchannel->closure); /* signal completion */ @@ -690,8 +690,8 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } -void grpc_subchannel_call_ref(grpc_subchannel_call *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_call_ref( + grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 9751cb03f2..63f3725219 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -31,21 +31,21 @@ * */ -#include "src/core/iomgr/sockaddr.h" #include "src/core/httpcli/httpcli.h" +#include "src/core/iomgr/sockaddr.h" #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include "src/core/httpcli/format_request.h" +#include "src/core/httpcli/parser.h" #include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/tcp_client.h" -#include "src/core/httpcli/format_request.h" -#include "src/core/httpcli/parser.h" #include "src/core/support/string.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> typedef struct { gpr_slice request_text; @@ -85,18 +85,18 @@ const grpc_httpcli_handshaker grpc_httpcli_plaintext = {"http", plaintext_handshake}; void grpc_httpcli_context_init(grpc_httpcli_context *context) { - grpc_pollset_set_init(&context->pollset_set); + context->pollset_set = grpc_pollset_set_create(); } void grpc_httpcli_context_destroy(grpc_httpcli_context *context) { - grpc_pollset_set_destroy(&context->pollset_set); + grpc_pollset_set_destroy(context->pollset_set); } static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req); static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, int success) { - grpc_pollset_set_del_pollset(exec_ctx, &req->context->pollset_set, + grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set, req->pollset); req->on_response(exec_ctx, req->user_data, success ? &req->parser.r : NULL); grpc_httpcli_parser_destroy(&req->parser); @@ -198,7 +198,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req) { addr = &req->addresses->addrs[req->next_address++]; grpc_closure_init(&req->connected, on_connected, req); grpc_tcp_client_connect( - exec_ctx, &req->connected, &req->ep, &req->context->pollset_set, + exec_ctx, &req->connected, &req->ep, req->context->pollset_set, (struct sockaddr *)&addr->addr, addr->len, req->deadline); } @@ -238,7 +238,7 @@ static void internal_request_begin( req->host = gpr_strdup(request->host); req->ssl_host_override = gpr_strdup(request->ssl_host_override); - grpc_pollset_set_add_pollset(exec_ctx, &req->context->pollset_set, + grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set, req->pollset); grpc_resolve_address(request->host, req->handshaker->default_port, on_resolved, req); diff --git a/src/core/httpcli/httpcli.h b/src/core/httpcli/httpcli.h index 30875d71f1..86e17c1d69 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/httpcli/httpcli.h @@ -39,6 +39,7 @@ #include <grpc/support/time.h> #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset_set.h" /* User agent this library reports */ @@ -56,7 +57,7 @@ typedef struct grpc_httpcli_header { TODO(ctiller): allow caching and capturing multiple requests for the same content and combining them */ typedef struct grpc_httpcli_context { - grpc_pollset_set pollset_set; + grpc_pollset_set *pollset_set; } grpc_httpcli_context; typedef struct { diff --git a/src/core/iomgr/ev_poll_and_epoll_posix.c b/src/core/iomgr/ev_poll_and_epoll_posix.c index e9ed7144c4..55f5ec2758 100644 --- a/src/core/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/iomgr/ev_poll_and_epoll_posix.c @@ -1732,4 +1732,147 @@ void grpc_remove_fd_from_all_epoll_sets(int fd) {} #endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ +/******************************************************************************* + * pollset_set_posix.c + */ + +grpc_pollset_set *grpc_pollset_set_create(void) { + grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); + memset(pollset_set, 0, sizeof(*pollset_set)); + gpr_mu_init(&pollset_set->mu); + return pollset_set; +} + +void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { + size_t i; + gpr_mu_destroy(&pollset_set->mu); + for (i = 0; i < pollset_set->fd_count; i++) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); + } + gpr_free(pollset_set->pollsets); + gpr_free(pollset_set->pollset_sets); + gpr_free(pollset_set->fds); + gpr_free(pollset_set); +} + +void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset) { + size_t i, j; + gpr_mu_lock(&pollset_set->mu); + if (pollset_set->pollset_count == pollset_set->pollset_capacity) { + pollset_set->pollset_capacity = + GPR_MAX(8, 2 * pollset_set->pollset_capacity); + pollset_set->pollsets = + gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity * + sizeof(*pollset_set->pollsets)); + } + pollset_set->pollsets[pollset_set->pollset_count++] = pollset; + for (i = 0, j = 0; i < pollset_set->fd_count; i++) { + if (grpc_fd_is_orphaned(pollset_set->fds[i])) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); + } else { + grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); + pollset_set->fds[j++] = pollset_set->fds[i]; + } + } + pollset_set->fd_count = j; + gpr_mu_unlock(&pollset_set->mu); +} + +void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset) { + size_t i; + gpr_mu_lock(&pollset_set->mu); + for (i = 0; i < pollset_set->pollset_count; i++) { + if (pollset_set->pollsets[i] == pollset) { + pollset_set->pollset_count--; + GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i], + pollset_set->pollsets[pollset_set->pollset_count]); + break; + } + } + gpr_mu_unlock(&pollset_set->mu); +} + +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i, j; + gpr_mu_lock(&bag->mu); + if (bag->pollset_set_count == bag->pollset_set_capacity) { + bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); + bag->pollset_sets = + gpr_realloc(bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + } + bag->pollset_sets[bag->pollset_set_count++] = item; + for (i = 0, j = 0; i < bag->fd_count; i++) { + if (grpc_fd_is_orphaned(bag->fds[i])) { + GRPC_FD_UNREF(bag->fds[i], "pollset_set"); + } else { + grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]); + bag->fds[j++] = bag->fds[i]; + } + } + bag->fd_count = j; + gpr_mu_unlock(&bag->mu); +} + +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i; + gpr_mu_lock(&bag->mu); + for (i = 0; i < bag->pollset_set_count; i++) { + if (bag->pollset_sets[i] == item) { + bag->pollset_set_count--; + GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], + bag->pollset_sets[bag->pollset_set_count]); + break; + } + } + gpr_mu_unlock(&bag->mu); +} + +void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, grpc_fd *fd) { + size_t i; + gpr_mu_lock(&pollset_set->mu); + if (pollset_set->fd_count == pollset_set->fd_capacity) { + pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity); + pollset_set->fds = gpr_realloc( + pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds)); + } + GRPC_FD_REF(fd, "pollset_set"); + pollset_set->fds[pollset_set->fd_count++] = fd; + for (i = 0; i < pollset_set->pollset_count; i++) { + grpc_pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); + } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + grpc_pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } + gpr_mu_unlock(&pollset_set->mu); +} + +void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, grpc_fd *fd) { + size_t i; + gpr_mu_lock(&pollset_set->mu); + for (i = 0; i < pollset_set->fd_count; i++) { + if (pollset_set->fds[i] == fd) { + pollset_set->fd_count--; + GPR_SWAP(grpc_fd *, pollset_set->fds[i], + pollset_set->fds[pollset_set->fd_count]); + GRPC_FD_UNREF(fd, "pollset_set"); + break; + } + } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + grpc_pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } + gpr_mu_unlock(&pollset_set->mu); +} + #endif diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h new file mode 100644 index 0000000000..ed7eb5cb7d --- /dev/null +++ b/src/core/iomgr/pollset_posix.h @@ -0,0 +1,153 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H +#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H + +#include <poll.h> + +#include <grpc/support/sync.h> + +#include "src/core/iomgr/exec_ctx.h" +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/wakeup_fd_posix.h" + +typedef struct grpc_pollset_vtable grpc_pollset_vtable; + +/* forward declare only in this file to avoid leaking impl details via + pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not + use the struct tag */ +struct grpc_fd; + +typedef struct grpc_cached_wakeup_fd { + grpc_wakeup_fd fd; + struct grpc_cached_wakeup_fd *next; +} grpc_cached_wakeup_fd; + +struct grpc_pollset_worker { + grpc_cached_wakeup_fd *wakeup_fd; + int reevaluate_polling_on_wakeup; + int kicked_specifically; + struct grpc_pollset_worker *next; + struct grpc_pollset_worker *prev; +}; + +struct grpc_pollset { + /* pollsets under posix can mutate representation as fds are added and + removed. + For example, we may choose a poll() based implementation on linux for + few fds, and an epoll() based implementation for many fds */ + const grpc_pollset_vtable *vtable; + gpr_mu *mu; + grpc_pollset_worker root_worker; + int in_flight_cbs; + int shutting_down; + int called_shutdown; + int kicked_without_pollers; + grpc_closure *shutdown_done; + grpc_closure_list idle_jobs; + union { + int fd; + void *ptr; + } data; + /* Local cache of eventfds for workers */ + grpc_cached_wakeup_fd *local_wakeup_cache; +}; + +struct grpc_pollset_vtable { + void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + struct grpc_fd *fd, int and_unlock_pollset); + void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now); + void (*finish_shutdown)(grpc_pollset *pollset); + void (*destroy)(grpc_pollset *pollset); +}; + +/* Add an fd to a pollset */ +void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + struct grpc_fd *fd); + +/* Returns the fd to listen on for kicks */ +int grpc_kick_read_fd(grpc_pollset *p); +/* Call after polling has been kicked to leave the kicked state */ +void grpc_kick_drain(grpc_pollset *p); + +/* Convert a timespec to milliseconds: + - very small or negative poll times are clamped to zero to do a + non-blocking poll (which becomes spin polling) + - other small values are rounded up to one millisecond + - longer than a millisecond polls are rounded up to the next nearest + millisecond to avoid spinning + - infinite timeouts are converted to -1 */ +int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now); + +/* Allow kick to wakeup the currently polling worker */ +#define GRPC_POLLSET_CAN_KICK_SELF 1 +/* Force the wakee to repoll when awoken */ +#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 +/* As per grpc_pollset_kick, with an extended set of flags (defined above) + -- mostly for fd_posix's use. */ +void grpc_pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags); + +/* turn a pollset into a multipoller: platform specific */ +typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + struct grpc_fd **fds, + size_t fd_count); +extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller; + +void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, struct grpc_fd **fds, + size_t fd_count); + +/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must + * be locked) */ +int grpc_pollset_has_workers(grpc_pollset *pollset); + +void grpc_remove_fd_from_all_epoll_sets(int fd); + +/* override to allow tests to hook poll() usage */ +/* NOTE: Any changes to grpc_poll_function must take place when the gRPC + is certainly not doing any polling anywhere. + Otherwise, there might be a race between changing the variable and actually + doing a polling operation */ +typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); +extern grpc_poll_function_type grpc_poll_function; +extern grpc_wakeup_fd grpc_global_wakeup_fd; + +#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 09c04438f7..9591bf0d32 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -41,15 +41,9 @@ fd's (etc) that have been registered with the set_set to that pollset. Registering fd's automatically adds them to all current pollsets. */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_set_posix.h" -#endif +typedef struct grpc_pollset_set grpc_pollset_set; -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_set_windows.h" -#endif - -void grpc_pollset_set_init(grpc_pollset_set *pollset_set); +grpc_pollset_set *grpc_pollset_set_create(void); void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 0eb4a427b1..9d6f3bdb3d 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -42,142 +42,7 @@ #include <grpc/support/useful.h> #include "src/core/iomgr/ev_posix.h" -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_set_posix.h" -void grpc_pollset_set_init(grpc_pollset_set *pollset_set) { - memset(pollset_set, 0, sizeof(*pollset_set)); - gpr_mu_init(&pollset_set->mu); -} - -void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { - size_t i; - gpr_mu_destroy(&pollset_set->mu); - for (i = 0; i < pollset_set->fd_count; i++) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); - } - gpr_free(pollset_set->pollsets); - gpr_free(pollset_set->pollset_sets); - gpr_free(pollset_set->fds); -} - -void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, - grpc_pollset *pollset) { - size_t i, j; - gpr_mu_lock(&pollset_set->mu); - if (pollset_set->pollset_count == pollset_set->pollset_capacity) { - pollset_set->pollset_capacity = - GPR_MAX(8, 2 * pollset_set->pollset_capacity); - pollset_set->pollsets = - gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity * - sizeof(*pollset_set->pollsets)); - } - pollset_set->pollsets[pollset_set->pollset_count++] = pollset; - for (i = 0, j = 0; i < pollset_set->fd_count; i++) { - if (grpc_fd_is_orphaned(pollset_set->fds[i])) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); - } else { - grpc_pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); - pollset_set->fds[j++] = pollset_set->fds[i]; - } - } - pollset_set->fd_count = j; - gpr_mu_unlock(&pollset_set->mu); -} - -void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, - grpc_pollset *pollset) { - size_t i; - gpr_mu_lock(&pollset_set->mu); - for (i = 0; i < pollset_set->pollset_count; i++) { - if (pollset_set->pollsets[i] == pollset) { - pollset_set->pollset_count--; - GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i], - pollset_set->pollsets[pollset_set->pollset_count]); - break; - } - } - gpr_mu_unlock(&pollset_set->mu); -} - -void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) { - size_t i, j; - gpr_mu_lock(&bag->mu); - if (bag->pollset_set_count == bag->pollset_set_capacity) { - bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); - bag->pollset_sets = - gpr_realloc(bag->pollset_sets, - bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); - } - bag->pollset_sets[bag->pollset_set_count++] = item; - for (i = 0, j = 0; i < bag->fd_count; i++) { - if (grpc_fd_is_orphaned(bag->fds[i])) { - GRPC_FD_UNREF(bag->fds[i], "pollset_set"); - } else { - grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]); - bag->fds[j++] = bag->fds[i]; - } - } - bag->fd_count = j; - gpr_mu_unlock(&bag->mu); -} - -void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) { - size_t i; - gpr_mu_lock(&bag->mu); - for (i = 0; i < bag->pollset_set_count; i++) { - if (bag->pollset_sets[i] == item) { - bag->pollset_set_count--; - GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], - bag->pollset_sets[bag->pollset_set_count]); - break; - } - } - gpr_mu_unlock(&bag->mu); -} - -void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, grpc_fd *fd) { - size_t i; - gpr_mu_lock(&pollset_set->mu); - if (pollset_set->fd_count == pollset_set->fd_capacity) { - pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity); - pollset_set->fds = gpr_realloc( - pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds)); - } - GRPC_FD_REF(fd, "pollset_set"); - pollset_set->fds[pollset_set->fd_count++] = fd; - for (i = 0; i < pollset_set->pollset_count; i++) { - grpc_pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); - } - for (i = 0; i < pollset_set->pollset_set_count; i++) { - grpc_pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); - } - gpr_mu_unlock(&pollset_set->mu); -} - -void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, grpc_fd *fd) { - size_t i; - gpr_mu_lock(&pollset_set->mu); - for (i = 0; i < pollset_set->fd_count; i++) { - if (pollset_set->fds[i] == fd) { - pollset_set->fd_count--; - GPR_SWAP(grpc_fd *, pollset_set->fds[i], - pollset_set->fds[pollset_set->fd_count]); - GRPC_FD_UNREF(fd, "pollset_set"); - break; - } - } - for (i = 0; i < pollset_set->pollset_set_count; i++) { - grpc_pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); - } - gpr_mu_unlock(&pollset_set->mu); -} #endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index a8f78314ed..e881908386 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -37,22 +37,6 @@ #include <grpc/support/sync.h> #include "src/core/iomgr/ev_posix.h" -typedef struct grpc_pollset_set { - gpr_mu mu; - - size_t pollset_count; - size_t pollset_capacity; - grpc_pollset **pollsets; - - size_t pollset_set_count; - size_t pollset_set_capacity; - struct grpc_pollset_set **pollset_sets; - - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; -} grpc_pollset_set; - void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 157b46ec32..9cf8fd4472 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -35,9 +35,9 @@ #ifdef GPR_WINSOCK_SOCKET -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_set_windows.h" -void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {} +grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; } void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h index cada0d2b61..aa5abe9133 100644 --- a/src/core/iomgr/pollset_set_windows.h +++ b/src/core/iomgr/pollset_set_windows.h @@ -34,6 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H -typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set; +#include "src/core/iomgr/pollset_set.h" #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 8f35a46509..bbce23b46a 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -192,7 +192,7 @@ done: remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } - gpr_cv_destroy(&worker->cv); + gpr_cv_destroy(&worker.cv); *worker_hdl = NULL; } diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index c76c2e3b0f..15727856ab 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -42,17 +42,19 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/timer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> + #include "src/core/iomgr/iomgr_posix.h" #include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" +#include "src/core/iomgr/timer.h" #include "src/core/support/string.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/time.h> extern int grpc_tcp_trace; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index fba3563427..e8f73811ce 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -53,6 +53,7 @@ #include "src/core/debug/trace.h" #include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/profiling/timers.h" #include "src/core/support/string.h" @@ -296,7 +297,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) { unwind_slice_idx = tcp->outgoing_slice_idx; unwind_byte_idx = tcp->outgoing_byte_idx; for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && - iov_size != MAX_WRITE_IOVEC; + iov_size != MAX_WRITE_IOVEC; iov_size++) { iov[iov_size].iov_base = GPR_SLICE_START_PTR( @@ -445,7 +446,7 @@ static char *tcp_get_peer(grpc_endpoint *ep) { } static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, + tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, tcp_shutdown, tcp_destroy, tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, diff --git a/src/core/proto/grpc/lb/v0/load_balancer.pb.c b/src/core/proto/grpc/lb/v0/load_balancer.pb.c new file mode 100644 index 0000000000..59aae30cff --- /dev/null +++ b/src/core/proto/grpc/lb/v0/load_balancer.pb.c @@ -0,0 +1,119 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ +/* Automatically generated nanopb constant definitions */ +/* Generated by nanopb-0.3.5-dev */ + +#include "src/core/proto/grpc/lb/v0/load_balancer.pb.h" + +#if PB_PROTO_HEADER_VERSION != 30 +#error Regenerate this file with the current version of nanopb generator. +#endif + + + +const pb_field_t grpc_lb_v0_Duration_fields[3] = { + PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v0_Duration, seconds, seconds, 0), + PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Duration, nanos, seconds, 0), + PB_LAST_FIELD +}; + +const pb_field_t grpc_lb_v0_LoadBalanceRequest_fields[3] = { + PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v0_LoadBalanceRequest, initial_request, initial_request, &grpc_lb_v0_InitialLoadBalanceRequest_fields), + PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_LoadBalanceRequest, client_stats, initial_request, &grpc_lb_v0_ClientStats_fields), + PB_LAST_FIELD +}; + +const pb_field_t grpc_lb_v0_InitialLoadBalanceRequest_fields[2] = { + PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_InitialLoadBalanceRequest, name, name, 0), + PB_LAST_FIELD +}; + +const pb_field_t grpc_lb_v0_ClientStats_fields[4] = { + PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v0_ClientStats, total_requests, total_requests, 0), + PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ClientStats, client_rpc_errors, total_requests, 0), + PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ClientStats, dropped_requests, client_rpc_errors, 0), + PB_LAST_FIELD +}; + +const pb_field_t grpc_lb_v0_LoadBalanceResponse_fields[3] = { + PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v0_LoadBalanceResponse, initial_response, initial_response, &grpc_lb_v0_InitialLoadBalanceResponse_fields), + PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_LoadBalanceResponse, server_list, initial_response, &grpc_lb_v0_ServerList_fields), + PB_LAST_FIELD +}; + +const pb_field_t grpc_lb_v0_InitialLoadBalanceResponse_fields[4] = { + PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_InitialLoadBalanceResponse, client_config, client_config, 0), + PB_FIELD( 2, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v0_InitialLoadBalanceResponse, load_balancer_delegate, client_config, 0), + PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v0_Duration_fields), + PB_LAST_FIELD +}; + +const pb_field_t grpc_lb_v0_ServerList_fields[3] = { + PB_FIELD( 1, MESSAGE , REPEATED, CALLBACK, FIRST, grpc_lb_v0_ServerList, servers, servers, &grpc_lb_v0_Server_fields), + PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ServerList, expiration_interval, servers, &grpc_lb_v0_Duration_fields), + PB_LAST_FIELD +}; + +const pb_field_t grpc_lb_v0_Server_fields[5] = { + PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_Server, ip_address, ip_address, 0), + PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, port, ip_address, 0), + PB_FIELD( 3, BYTES , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, load_balance_token, port, 0), + PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, drop_request, load_balance_token, 0), + PB_LAST_FIELD +}; + + +/* Check that field information fits in pb_field_t */ +#if !defined(PB_FIELD_32BIT) +/* If you get an error here, it means that you need to define PB_FIELD_32BIT + * compile-time option. You can do that in pb.h or on compiler command line. + * + * The reason you need to do this is that some of your messages contain tag + * numbers or field sizes that are larger than what can fit in 8 or 16 bit + * field descriptors. + */ +PB_STATIC_ASSERT((pb_membersize(grpc_lb_v0_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v0_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v0_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v0_Duration_grpc_lb_v0_LoadBalanceRequest_grpc_lb_v0_InitialLoadBalanceRequest_grpc_lb_v0_ClientStats_grpc_lb_v0_LoadBalanceResponse_grpc_lb_v0_InitialLoadBalanceResponse_grpc_lb_v0_ServerList_grpc_lb_v0_Server) +#endif + +#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT) +/* If you get an error here, it means that you need to define PB_FIELD_16BIT + * compile-time option. You can do that in pb.h or on compiler command line. + * + * The reason you need to do this is that some of your messages contain tag + * numbers or field sizes that are larger than what can fit in the default + * 8 bit descriptors. + */ +PB_STATIC_ASSERT((pb_membersize(grpc_lb_v0_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v0_ServerList, servers) < 256 && pb_membersize(grpc_lb_v0_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v0_Duration_grpc_lb_v0_LoadBalanceRequest_grpc_lb_v0_InitialLoadBalanceRequest_grpc_lb_v0_ClientStats_grpc_lb_v0_LoadBalanceResponse_grpc_lb_v0_InitialLoadBalanceResponse_grpc_lb_v0_ServerList_grpc_lb_v0_Server) +#endif + + diff --git a/src/core/proto/grpc/lb/v0/load_balancer.pb.h b/src/core/proto/grpc/lb/v0/load_balancer.pb.h new file mode 100644 index 0000000000..3599f881bb --- /dev/null +++ b/src/core/proto/grpc/lb/v0/load_balancer.pb.h @@ -0,0 +1,182 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ +/* Automatically generated nanopb header */ +/* Generated by nanopb-0.3.5-dev */ + +#ifndef PB_LOAD_BALANCER_PB_H_INCLUDED +#define PB_LOAD_BALANCER_PB_H_INCLUDED +#include "third_party/nanopb/pb.h" +#if PB_PROTO_HEADER_VERSION != 30 +#error Regenerate this file with the current version of nanopb generator. +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/* Struct definitions */ +typedef struct _grpc_lb_v0_ClientStats { + bool has_total_requests; + int64_t total_requests; + bool has_client_rpc_errors; + int64_t client_rpc_errors; + bool has_dropped_requests; + int64_t dropped_requests; +} grpc_lb_v0_ClientStats; + +typedef struct _grpc_lb_v0_Duration { + bool has_seconds; + int64_t seconds; + bool has_nanos; + int32_t nanos; +} grpc_lb_v0_Duration; + +typedef struct _grpc_lb_v0_InitialLoadBalanceRequest { + bool has_name; + char name[128]; +} grpc_lb_v0_InitialLoadBalanceRequest; + +typedef PB_BYTES_ARRAY_T(64) grpc_lb_v0_Server_load_balance_token_t; +typedef struct _grpc_lb_v0_Server { + bool has_ip_address; + char ip_address[46]; + bool has_port; + int32_t port; + bool has_load_balance_token; + grpc_lb_v0_Server_load_balance_token_t load_balance_token; + bool has_drop_request; + bool drop_request; +} grpc_lb_v0_Server; + +typedef struct _grpc_lb_v0_InitialLoadBalanceResponse { + bool has_client_config; + char client_config[64]; + bool has_load_balancer_delegate; + char load_balancer_delegate[64]; + bool has_client_stats_report_interval; + grpc_lb_v0_Duration client_stats_report_interval; +} grpc_lb_v0_InitialLoadBalanceResponse; + +typedef struct _grpc_lb_v0_LoadBalanceRequest { + bool has_initial_request; + grpc_lb_v0_InitialLoadBalanceRequest initial_request; + bool has_client_stats; + grpc_lb_v0_ClientStats client_stats; +} grpc_lb_v0_LoadBalanceRequest; + +typedef struct _grpc_lb_v0_ServerList { + pb_callback_t servers; + bool has_expiration_interval; + grpc_lb_v0_Duration expiration_interval; +} grpc_lb_v0_ServerList; + +typedef struct _grpc_lb_v0_LoadBalanceResponse { + bool has_initial_response; + grpc_lb_v0_InitialLoadBalanceResponse initial_response; + bool has_server_list; + grpc_lb_v0_ServerList server_list; +} grpc_lb_v0_LoadBalanceResponse; + +/* Default values for struct fields */ + +/* Initializer values for message structs */ +#define grpc_lb_v0_Duration_init_default {false, 0, false, 0} +#define grpc_lb_v0_LoadBalanceRequest_init_default {false, grpc_lb_v0_InitialLoadBalanceRequest_init_default, false, grpc_lb_v0_ClientStats_init_default} +#define grpc_lb_v0_InitialLoadBalanceRequest_init_default {false, ""} +#define grpc_lb_v0_ClientStats_init_default {false, 0, false, 0, false, 0} +#define grpc_lb_v0_LoadBalanceResponse_init_default {false, grpc_lb_v0_InitialLoadBalanceResponse_init_default, false, grpc_lb_v0_ServerList_init_default} +#define grpc_lb_v0_InitialLoadBalanceResponse_init_default {false, "", false, "", false, grpc_lb_v0_Duration_init_default} +#define grpc_lb_v0_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v0_Duration_init_default} +#define grpc_lb_v0_Server_init_default {false, "", false, 0, false, {0, {0}}, false, 0} +#define grpc_lb_v0_Duration_init_zero {false, 0, false, 0} +#define grpc_lb_v0_LoadBalanceRequest_init_zero {false, grpc_lb_v0_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v0_ClientStats_init_zero} +#define grpc_lb_v0_InitialLoadBalanceRequest_init_zero {false, ""} +#define grpc_lb_v0_ClientStats_init_zero {false, 0, false, 0, false, 0} +#define grpc_lb_v0_LoadBalanceResponse_init_zero {false, grpc_lb_v0_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v0_ServerList_init_zero} +#define grpc_lb_v0_InitialLoadBalanceResponse_init_zero {false, "", false, "", false, grpc_lb_v0_Duration_init_zero} +#define grpc_lb_v0_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v0_Duration_init_zero} +#define grpc_lb_v0_Server_init_zero {false, "", false, 0, false, {0, {0}}, false, 0} + +/* Field tags (for use in manual encoding/decoding) */ +#define grpc_lb_v0_ClientStats_total_requests_tag 1 +#define grpc_lb_v0_ClientStats_client_rpc_errors_tag 2 +#define grpc_lb_v0_ClientStats_dropped_requests_tag 3 +#define grpc_lb_v0_Duration_seconds_tag 1 +#define grpc_lb_v0_Duration_nanos_tag 2 +#define grpc_lb_v0_InitialLoadBalanceRequest_name_tag 1 +#define grpc_lb_v0_Server_ip_address_tag 1 +#define grpc_lb_v0_Server_port_tag 2 +#define grpc_lb_v0_Server_load_balance_token_tag 3 +#define grpc_lb_v0_Server_drop_request_tag 4 +#define grpc_lb_v0_InitialLoadBalanceResponse_client_config_tag 1 +#define grpc_lb_v0_InitialLoadBalanceResponse_load_balancer_delegate_tag 2 +#define grpc_lb_v0_InitialLoadBalanceResponse_client_stats_report_interval_tag 3 +#define grpc_lb_v0_LoadBalanceRequest_initial_request_tag 1 +#define grpc_lb_v0_LoadBalanceRequest_client_stats_tag 2 +#define grpc_lb_v0_ServerList_servers_tag 1 +#define grpc_lb_v0_ServerList_expiration_interval_tag 3 +#define grpc_lb_v0_LoadBalanceResponse_initial_response_tag 1 +#define grpc_lb_v0_LoadBalanceResponse_server_list_tag 2 + +/* Struct field encoding specification for nanopb */ +extern const pb_field_t grpc_lb_v0_Duration_fields[3]; +extern const pb_field_t grpc_lb_v0_LoadBalanceRequest_fields[3]; +extern const pb_field_t grpc_lb_v0_InitialLoadBalanceRequest_fields[2]; +extern const pb_field_t grpc_lb_v0_ClientStats_fields[4]; +extern const pb_field_t grpc_lb_v0_LoadBalanceResponse_fields[3]; +extern const pb_field_t grpc_lb_v0_InitialLoadBalanceResponse_fields[4]; +extern const pb_field_t grpc_lb_v0_ServerList_fields[3]; +extern const pb_field_t grpc_lb_v0_Server_fields[5]; + +/* Maximum encoded size of messages (where known) */ +#define grpc_lb_v0_Duration_size 22 +#define grpc_lb_v0_LoadBalanceRequest_size 169 +#define grpc_lb_v0_InitialLoadBalanceRequest_size 131 +#define grpc_lb_v0_ClientStats_size 33 +#define grpc_lb_v0_LoadBalanceResponse_size (165 + grpc_lb_v0_ServerList_size) +#define grpc_lb_v0_InitialLoadBalanceResponse_size 156 +#define grpc_lb_v0_Server_size 127 + +/* Message IDs (where set with "msgid" option) */ +#ifdef PB_MSGID + +#define LOAD_BALANCER_MESSAGES \ + + +#endif + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index d0659c7e52..41449fb4a6 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -339,9 +339,10 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(&cc->mu); continue; + } else { + grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, + iteration_deadline); } - grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, - iteration_deadline); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); @@ -456,9 +457,10 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(&cc->mu); continue; + } else { + grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, + iteration_deadline); } - grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, - iteration_deadline); del_plucker(cc, tag, &worker); } done: diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 0e1e2c4265..d76d31be23 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -485,7 +485,8 @@ struct grpc_chttp2_stream { /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ -int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, +int grpc_chttp2_unlocking_check_writes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing, int is_parsing); void grpc_chttp2_perform_writes( @@ -568,8 +569,12 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_chttp2_transport_writing *transport_writing, bool is_window_available); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, + bool is_window_available); +void grpc_chttp2_list_add_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_writing *stream_writing); int grpc_chttp2_list_pop_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 2f31a47cb3..b284c78818 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -316,13 +316,16 @@ int grpc_chttp2_list_pop_check_read_ops( void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { - stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), - STREAM_FROM_WRITING(stream_writing), + grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing); + if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) { + GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled"); + } + stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); } void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_chttp2_transport_writing *transport_writing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, bool is_window_available) { grpc_chttp2_stream *stream; grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); @@ -331,11 +334,22 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport( if (is_window_available) { grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global); } else { - stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + &stream->writing); } + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global, + "chttp2_writing_stalled"); } } +void grpc_chttp2_list_add_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_writing *stream_writing) { + stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), + STREAM_FROM_WRITING(stream_writing), + GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); +} + int grpc_chttp2_list_pop_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index cafecf1046..356fd8174a 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -44,7 +44,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing); int grpc_chttp2_unlocking_check_writes( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, int is_parsing) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_writing *stream_writing; @@ -76,8 +76,8 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); bool is_window_available = transport_writing->outgoing_window > 0; - grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing, - is_window_available); + grpc_chttp2_list_flush_writing_stalled_by_transport( + exec_ctx, transport_writing, is_window_available); /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ @@ -133,8 +133,8 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); } } else { - grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, - stream_writing); + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + stream_writing); } } if (stream_global->send_trailing_metadata) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 617d98875c..b9f511e946 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -598,7 +598,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { GPR_TIMER_BEGIN("unlock", 0); if (!t->writing_active && !t->closed && - grpc_chttp2_unlocking_check_writes(&t->global, &t->writing, + grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing, t->parsing_active)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); @@ -1019,6 +1019,11 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { + while (stream_global->seen_error && + (bs = grpc_chttp2_incoming_frame_queue_pop( + &stream_global->incoming_frames)) != NULL) { + grpc_byte_stream_destroy(exec_ctx, bs); + } if (stream_global->incoming_frames.head != NULL) { *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames); |