diff options
Diffstat (limited to 'src/core/lib')
24 files changed, 195 insertions, 1737 deletions
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 7dbac38414..b4e58b7f29 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -112,7 +112,9 @@ static void hc_mutate_op(grpc_call_element *elem, /* Send : prefixed headers, which have to be before any application layer headers. */ grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->method, - GRPC_MDELEM_METHOD_POST); + op->send_idempotent_request + ? GRPC_MDELEM_METHOD_PUT + : GRPC_MDELEM_METHOD_POST); grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme, channeld->static_scheme); grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers, diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index df99b77ab3..db1a3d5010 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -41,7 +41,7 @@ typedef struct call_data { uint8_t seen_path; - uint8_t seen_post; + uint8_t seen_method; uint8_t sent_status; uint8_t seen_scheme; uint8_t seen_te_trailers; @@ -50,6 +50,7 @@ typedef struct call_data { grpc_linked_mdelem content_type; grpc_metadata_batch *recv_initial_metadata; + bool *recv_idempotent_request; /** Closure to call when finished with the hs_on_recv hook */ grpc_closure *on_done_recv; /** Receive closures are chained: we inject this closure as the on_done_recv @@ -72,11 +73,16 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* Check if it is one of the headers we care about. */ if (md == GRPC_MDELEM_TE_TRAILERS || md == GRPC_MDELEM_METHOD_POST || - md == GRPC_MDELEM_SCHEME_HTTP || md == GRPC_MDELEM_SCHEME_HTTPS || + md == GRPC_MDELEM_METHOD_PUT || md == GRPC_MDELEM_SCHEME_HTTP || + md == GRPC_MDELEM_SCHEME_HTTPS || md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) { /* swallow it */ if (md == GRPC_MDELEM_METHOD_POST) { - calld->seen_post = 1; + calld->seen_method = 1; + *calld->recv_idempotent_request = false; + } else if (md == GRPC_MDELEM_METHOD_PUT) { + calld->seen_method = 1; + *calld->recv_idempotent_request = true; } else if (md->key == GRPC_MDSTR_SCHEME) { calld->seen_scheme = 1; } else if (md == GRPC_MDELEM_TE_TRAILERS) { @@ -142,7 +148,7 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { /* Have we seen the required http2 transport headers? (:method, :scheme, content-type, with :path and :authority covered at the channel level right now) */ - if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && + if (calld->seen_method && calld->seen_scheme && calld->seen_te_trailers && calld->seen_path && calld->seen_authority) { /* do nothing */ } else { @@ -152,7 +158,7 @@ static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { if (!calld->seen_authority) { gpr_log(GPR_ERROR, "Missing :authority header"); } - if (!calld->seen_post) { + if (!calld->seen_method) { gpr_log(GPR_ERROR, "Missing :method header"); } if (!calld->seen_scheme) { @@ -185,7 +191,9 @@ static void hs_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ + GPR_ASSERT(op->recv_idempotent_request != NULL); calld->recv_initial_metadata = op->recv_initial_metadata; + calld->recv_idempotent_request = op->recv_idempotent_request; calld->on_done_recv = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->hs_on_recv; } diff --git a/src/core/lib/client_config/lb_policies/load_balancer_api.c b/src/core/lib/client_config/lb_policies/load_balancer_api.c deleted file mode 100644 index 4cbed200df..0000000000 --- a/src/core/lib/client_config/lb_policies/load_balancer_api.c +++ /dev/null @@ -1,163 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/client_config/lb_policies/load_balancer_api.h" -#include "third_party/nanopb/pb_decode.h" -#include "third_party/nanopb/pb_encode.h" - -#include <grpc/support/alloc.h> - -typedef struct decode_serverlist_arg { - int first_pass; - int i; - size_t num_servers; - grpc_grpclb_server **servers; -} decode_serverlist_arg; - -/* invoked once for every Server in ServerList */ -static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, - void **arg) { - decode_serverlist_arg *dec_arg = *arg; - if (dec_arg->first_pass != 0) { /* first pass */ - grpc_grpclb_server server; - if (!pb_decode(stream, grpc_lb_v0_Server_fields, &server)) { - return false; - } - dec_arg->num_servers++; - } else { /* second pass */ - grpc_grpclb_server *server = gpr_malloc(sizeof(grpc_grpclb_server)); - GPR_ASSERT(dec_arg->num_servers > 0); - if (dec_arg->i == 0) { /* first iteration of second pass */ - dec_arg->servers = - gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers); - } - if (!pb_decode(stream, grpc_lb_v0_Server_fields, server)) { - return false; - } - dec_arg->servers[dec_arg->i++] = server; - } - - return true; -} - -grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name) { - grpc_grpclb_request *req = gpr_malloc(sizeof(grpc_grpclb_request)); - - req->has_client_stats = 0; /* TODO(dgq): add support for stats once defined */ - req->has_initial_request = 1; - req->initial_request.has_name = 1; - strncpy(req->initial_request.name, lb_service_name, - GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH); - return req; -} - -gpr_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { - size_t encoded_length; - pb_ostream_t sizestream; - pb_ostream_t outputstream; - gpr_slice slice; - memset(&sizestream, 0, sizeof(pb_ostream_t)); - pb_encode(&sizestream, grpc_lb_v0_LoadBalanceRequest_fields, request); - encoded_length = sizestream.bytes_written; - - slice = gpr_slice_malloc(encoded_length); - outputstream = - pb_ostream_from_buffer(GPR_SLICE_START_PTR(slice), encoded_length); - GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v0_LoadBalanceRequest_fields, - request) != 0); - return slice; -} - -void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { - gpr_free(request); -} - -grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response) { - bool status; - pb_istream_t stream = - pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_response), - GPR_SLICE_LENGTH(encoded_response)); - grpc_grpclb_response *res = gpr_malloc(sizeof(grpc_grpclb_response)); - memset(res, 0, sizeof(*res)); - status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res); - GPR_ASSERT(status == true); - return res; -} - -grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( - gpr_slice encoded_response) { - grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist)); - bool status; - decode_serverlist_arg arg; - pb_istream_t stream = - pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_response), - GPR_SLICE_LENGTH(encoded_response)); - pb_istream_t stream_at_start = stream; - grpc_grpclb_response *res = gpr_malloc(sizeof(grpc_grpclb_response)); - memset(res, 0, sizeof(*res)); - memset(&arg, 0, sizeof(decode_serverlist_arg)); - - res->server_list.servers.funcs.decode = decode_serverlist; - res->server_list.servers.arg = &arg; - arg.first_pass = 1; - status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res); - GPR_ASSERT(status == true); - GPR_ASSERT(arg.num_servers > 0); - - arg.first_pass = 0; - status = - pb_decode(&stream_at_start, grpc_lb_v0_LoadBalanceResponse_fields, res); - GPR_ASSERT(status == true); - GPR_ASSERT(arg.servers != NULL); - - sl->num_servers = arg.num_servers; - sl->servers = arg.servers; - if (res->server_list.has_expiration_interval) { - sl->expiration_interval = res->server_list.expiration_interval; - } - grpc_grpclb_response_destroy(res); - return sl; -} - -void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist) { - size_t i; - for (i = 0; i < serverlist->num_servers; i++) { - gpr_free(serverlist->servers[i]); - } - gpr_free(serverlist->servers); - gpr_free(serverlist); -} - -void grpc_grpclb_response_destroy(grpc_grpclb_response *response) { - gpr_free(response); -} diff --git a/src/core/lib/client_config/lb_policies/load_balancer_api.h b/src/core/lib/client_config/lb_policies/load_balancer_api.h deleted file mode 100644 index 83299adfa9..0000000000 --- a/src/core/lib/client_config/lb_policies/load_balancer_api.h +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_LOAD_BALANCER_API_H -#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_LOAD_BALANCER_API_H - -#include <grpc/support/slice_buffer.h> - -#include "src/core/lib/client_config/lb_policy_factory.h" -#include "src/core/lib/proto/grpc/lb/v0/load_balancer.pb.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128 - -typedef grpc_lb_v0_LoadBalanceRequest grpc_grpclb_request; -typedef grpc_lb_v0_LoadBalanceResponse grpc_grpclb_response; -typedef grpc_lb_v0_Server grpc_grpclb_server; -typedef grpc_lb_v0_Duration grpc_grpclb_duration; -typedef struct grpc_grpclb_serverlist { - grpc_grpclb_server **servers; - size_t num_servers; - grpc_grpclb_duration expiration_interval; -} grpc_grpclb_serverlist; - -/** Create a request for a gRPC LB service under \a lb_service_name */ -grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name); - -/** Protocol Buffers v3-encode \a request */ -gpr_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request); - -/** Destroy \a request */ -void grpc_grpclb_request_destroy(grpc_grpclb_request *request); - -/** Parse (ie, decode) the bytes in \a encoded_response as a \a - * grpc_grpclb_response */ -grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response); - -/** Destroy \a serverlist */ -void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); - -/** Parse the list of servers from an encoded \a grpc_grpclb_response */ -grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( - gpr_slice encoded_response); - -/** Destroy \a response */ -void grpc_grpclb_response_destroy(grpc_grpclb_response *response); - -#ifdef __cplusplus -} -#endif - -#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_LOAD_BALANCER_API_H */ diff --git a/src/core/lib/client_config/lb_policies/pick_first.c b/src/core/lib/client_config/lb_policies/pick_first.c deleted file mode 100644 index 9ac550d9e7..0000000000 --- a/src/core/lib/client_config/lb_policies/pick_first.c +++ /dev/null @@ -1,444 +0,0 @@ -/* - * - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/client_config/lb_policies/pick_first.h" - -#include <string.h> - -#include <grpc/support/alloc.h> -#include "src/core/lib/client_config/lb_policy_factory.h" -#include "src/core/lib/transport/connectivity_state.h" - -typedef struct pending_pick { - struct pending_pick *next; - grpc_pollset *pollset; - grpc_connected_subchannel **target; - grpc_closure *on_complete; -} pending_pick; - -typedef struct { - /** base policy: must be first */ - grpc_lb_policy base; - /** all our subchannels */ - grpc_subchannel **subchannels; - size_t num_subchannels; - - grpc_closure connectivity_changed; - - /** the selected channel (a grpc_connected_subchannel) */ - gpr_atm selected; - - /** mutex protecting remaining members */ - gpr_mu mu; - /** have we started picking? */ - int started_picking; - /** are we shut down? */ - int shutdown; - /** which subchannel are we watching? */ - size_t checking_subchannel; - /** what is the connectivity of that channel? */ - grpc_connectivity_state checking_connectivity; - /** list of picks that are waiting on connectivity */ - pending_pick *pending_picks; - - /** our connectivity state tracker */ - grpc_connectivity_state_tracker state_tracker; -} pick_first_lb_policy; - -#define GET_SELECTED(p) \ - ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected)) - -void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connected_subchannel *selected = GET_SELECTED(p); - size_t i; - GPR_ASSERT(p->pending_picks == NULL); - for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); - } - if (selected != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first"); - } - grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); - gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); - gpr_free(p); -} - -void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - grpc_connected_subchannel *selected; - gpr_mu_lock(&p->mu); - selected = GET_SELECTED(p); - p->shutdown = 1; - pp = p->pending_picks; - p->pending_picks = NULL; - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); - /* cancel subscription */ - if (selected != NULL) { - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, NULL, NULL, &p->connectivity_changed); - } else { - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, - &p->connectivity_changed); - } - gpr_mu_unlock(&p->mu); - while (pp != NULL) { - pending_pick *next = pp->next; - *pp->target = NULL; - grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, - pp->pollset); - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); - gpr_free(pp); - pp = next; - } -} - -static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - gpr_mu_lock(&p->mu); - pp = p->pending_picks; - p->pending_picks = NULL; - while (pp != NULL) { - pending_pick *next = pp->next; - if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, - pp->pollset); - *target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); - gpr_free(pp); - } else { - pp->next = p->pending_picks; - p->pending_picks = pp; - } - pp = next; - } - gpr_mu_unlock(&p->mu); -} - -static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { - p->started_picking = 1; - p->checking_subchannel = 0; - p->checking_connectivity = GRPC_CHANNEL_IDLE; - GRPC_LB_POLICY_WEAK_REF(&p->base, "pick_first_connectivity"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); -} - -void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); - if (!p->started_picking) { - start_picking(exec_ctx, p); - } - gpr_mu_unlock(&p->mu); -} - -int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, - grpc_connected_subchannel **target, grpc_closure *on_complete) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; - - /* Check atomically for a selected channel */ - grpc_connected_subchannel *selected = GET_SELECTED(p); - if (selected != NULL) { - *target = selected; - return 1; - } - - /* No subchannel selected yet, so acquire lock and then attempt again */ - gpr_mu_lock(&p->mu); - selected = GET_SELECTED(p); - if (selected) { - gpr_mu_unlock(&p->mu); - *target = selected; - return 1; - } else { - if (!p->started_picking) { - start_picking(exec_ctx, p); - } - grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); - pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->pollset = pollset; - pp->target = target; - pp->on_complete = on_complete; - p->pending_picks = pp; - gpr_mu_unlock(&p->mu); - return 0; - } -} - -static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_success) { - pick_first_lb_policy *p = arg; - size_t i; - size_t num_subchannels = p->num_subchannels; - grpc_subchannel **subchannels; - - gpr_mu_lock(&p->mu); - subchannels = p->subchannels; - p->num_subchannels = 0; - p->subchannels = NULL; - gpr_mu_unlock(&p->mu); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); - - for (i = 0; i < num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); - } - - gpr_free(subchannels); -} - -static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_success) { - pick_first_lb_policy *p = arg; - grpc_subchannel *selected_subchannel; - pending_pick *pp; - grpc_connected_subchannel *selected; - - gpr_mu_lock(&p->mu); - - selected = GET_SELECTED(p); - - if (p->shutdown) { - gpr_mu_unlock(&p->mu); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); - return; - } else if (selected != NULL) { - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - /* if the selected channel goes bad, we're done */ - p->checking_connectivity = GRPC_CHANNEL_FATAL_FAILURE; - } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - p->checking_connectivity, "selected_changed"); - if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); - } else { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); - } - } else { - loop: - switch (p->checking_connectivity) { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_READY, "connecting_ready"); - selected_subchannel = p->subchannels[p->checking_subchannel]; - selected = - grpc_subchannel_get_connected_subchannel(selected_subchannel); - GPR_ASSERT(selected != NULL); - GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first"); - /* drop the pick list: we are connected now */ - GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); - gpr_atm_rel_store(&p->selected, (gpr_atm)selected); - grpc_exec_ctx_enqueue( - exec_ctx, grpc_closure_create(destroy_subchannels, p), true, NULL); - /* update any calls that were waiting for a pick */ - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = selected; - grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, - pp->pollset); - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); - gpr_free(pp); - } - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, p->base.interested_parties, - &p->checking_connectivity, &p->connectivity_changed); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure"); - p->checking_subchannel = - (p->checking_subchannel + 1) % p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel]); - if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - } else { - goto loop; - } - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_CONNECTING, - "connecting_changed"); - grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[p->checking_subchannel], - p->base.interested_parties, &p->checking_connectivity, - &p->connectivity_changed); - break; - case GRPC_CHANNEL_FATAL_FAILURE: - p->num_subchannels--; - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], - "pick_first"); - if (p->num_subchannels == 0) { - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, - "no_more_channels"); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); - gpr_free(pp); - } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, - "pick_first_connectivity"); - } else { - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "subchannel_failed"); - p->checking_subchannel %= p->num_subchannels; - p->checking_connectivity = grpc_subchannel_check_connectivity( - p->subchannels[p->checking_subchannel]); - goto loop; - } - } - } - - gpr_mu_unlock(&p->mu); -} - -static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state st; - gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker); - gpr_mu_unlock(&p->mu); - return st; -} - -void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); - grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, - current, notify); - gpr_mu_unlock(&p->mu); -} - -void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { - pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connected_subchannel *selected = GET_SELECTED(p); - if (selected) { - grpc_connected_subchannel_ping(exec_ctx, selected, closure); - } else { - grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); - } -} - -static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, - pf_shutdown, - pf_pick, - pf_cancel_pick, - pf_ping_one, - pf_exit_idle, - pf_check_connectivity, - pf_notify_on_state_change}; - -static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} - -static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} - -static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - GPR_ASSERT(args->addresses != NULL); - GPR_ASSERT(args->subchannel_factory != NULL); - - if (args->addresses->naddrs == 0) return NULL; - - pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - memset(p, 0, sizeof(*p)); - - p->subchannels = - gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); - grpc_subchannel_args sc_args; - size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; - - grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( - exec_ctx, args->subchannel_factory, &sc_args); - - if (subchannel != NULL) { - p->subchannels[subchannel_idx++] = subchannel; - } - } - if (subchannel_idx == 0) { - gpr_free(p->subchannels); - gpr_free(p); - return NULL; - } - p->num_subchannels = subchannel_idx; - - grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); - grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); - gpr_mu_init(&p->mu); - return &p->base; -} - -static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = { - pick_first_factory_ref, pick_first_factory_unref, create_pick_first, - "pick_first"}; - -static grpc_lb_policy_factory pick_first_lb_policy_factory = { - &pick_first_factory_vtable}; - -grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() { - return &pick_first_lb_policy_factory; -} diff --git a/src/core/lib/client_config/lb_policies/pick_first.h b/src/core/lib/client_config/lb_policies/pick_first.h deleted file mode 100644 index dba86ea7ad..0000000000 --- a/src/core/lib/client_config/lb_policies/pick_first.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_PICK_FIRST_H -#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_PICK_FIRST_H - -#include "src/core/lib/client_config/lb_policy_factory.h" - -/** Returns a load balancing factory for the pick first policy, which picks up - * the first subchannel from \a subchannels to succesfully connect */ -grpc_lb_policy_factory *grpc_pick_first_lb_factory_create(); - -#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_PICK_FIRST_H */ diff --git a/src/core/lib/client_config/lb_policies/round_robin.c b/src/core/lib/client_config/lb_policies/round_robin.c deleted file mode 100644 index a4bc2c4786..0000000000 --- a/src/core/lib/client_config/lb_policies/round_robin.c +++ /dev/null @@ -1,563 +0,0 @@ -/* - * - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/client_config/lb_policies/round_robin.h" - -#include <string.h> - -#include <grpc/support/alloc.h> -#include "src/core/lib/transport/connectivity_state.h" - -typedef struct round_robin_lb_policy round_robin_lb_policy; - -int grpc_lb_round_robin_trace = 0; - -/** List of entities waiting for a pick. - * - * Once a pick is available, \a target is updated and \a on_complete called. */ -typedef struct pending_pick { - struct pending_pick *next; - grpc_pollset *pollset; - grpc_connected_subchannel **target; - grpc_closure *on_complete; -} pending_pick; - -/** List of subchannels in a connectivity READY state */ -typedef struct ready_list { - grpc_subchannel *subchannel; - struct ready_list *next; - struct ready_list *prev; -} ready_list; - -typedef struct { - /** index within policy->subchannels */ - size_t index; - /** backpointer to owning policy */ - round_robin_lb_policy *policy; - /** subchannel itself */ - grpc_subchannel *subchannel; - /** notification that connectivity has changed on subchannel */ - grpc_closure connectivity_changed_closure; - /** this subchannels current position in subchannel->ready_list */ - ready_list *ready_list_node; - /** last observed connectivity */ - grpc_connectivity_state connectivity_state; -} subchannel_data; - -struct round_robin_lb_policy { - /** base policy: must be first */ - grpc_lb_policy base; - - /** all our subchannels */ - size_t num_subchannels; - subchannel_data **subchannels; - - /** mutex protecting remaining members */ - gpr_mu mu; - /** have we started picking? */ - int started_picking; - /** are we shutting down? */ - int shutdown; - /** List of picks that are waiting on connectivity */ - pending_pick *pending_picks; - - /** our connectivity state tracker */ - grpc_connectivity_state_tracker state_tracker; - - /** (Dummy) root of the doubly linked list containing READY subchannels */ - ready_list ready_list; - /** Last pick from the ready list. */ - ready_list *ready_list_last_pick; -}; - -/** Returns the next subchannel from the connected list or NULL if the list is - * empty. - * - * Note that this function does *not* advance p->ready_list_last_pick. Use \a - * advance_last_picked_locked() for that. */ -static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) { - ready_list *selected; - selected = p->ready_list_last_pick->next; - - while (selected != NULL) { - if (selected == &p->ready_list) { - GPR_ASSERT(selected->subchannel == NULL); - /* skip dummy root */ - selected = selected->next; - } else { - GPR_ASSERT(selected->subchannel != NULL); - return selected; - } - } - return NULL; -} - -/** Advance the \a ready_list picking head. */ -static void advance_last_picked_locked(round_robin_lb_policy *p) { - if (p->ready_list_last_pick->next != NULL) { /* non-empty list */ - p->ready_list_last_pick = p->ready_list_last_pick->next; - if (p->ready_list_last_pick == &p->ready_list) { - /* skip dummy root */ - p->ready_list_last_pick = p->ready_list_last_pick->next; - } - } else { /* should be an empty list */ - GPR_ASSERT(p->ready_list_last_pick == &p->ready_list); - } - - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", - p->ready_list_last_pick, p->ready_list_last_pick->subchannel); - } -} - -/** Prepends (relative to the root at p->ready_list) the connected subchannel \a - * csc to the list of ready subchannels. */ -static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, - grpc_subchannel *sc) { - ready_list *new_elem = gpr_malloc(sizeof(ready_list)); - new_elem->subchannel = sc; - if (p->ready_list.prev == NULL) { - /* first element */ - new_elem->next = &p->ready_list; - new_elem->prev = &p->ready_list; - p->ready_list.next = new_elem; - p->ready_list.prev = new_elem; - } else { - new_elem->next = &p->ready_list; - new_elem->prev = p->ready_list.prev; - p->ready_list.prev->next = new_elem; - p->ready_list.prev = new_elem; - } - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc); - } - return new_elem; -} - -/** Removes \a node from the list of connected subchannels */ -static void remove_disconnected_sc_locked(round_robin_lb_policy *p, - ready_list *node) { - if (node == NULL) { - return; - } - if (node == p->ready_list_last_pick) { - /* If removing the lastly picked node, reset the last pick pointer to the - * dummy root of the list */ - p->ready_list_last_pick = &p->ready_list; - } - - /* removing last item */ - if (node->next == &p->ready_list && node->prev == &p->ready_list) { - GPR_ASSERT(p->ready_list.next == node); - GPR_ASSERT(p->ready_list.prev == node); - p->ready_list.next = NULL; - p->ready_list.prev = NULL; - } else { - node->prev->next = node->next; - node->next->prev = node->prev; - } - - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node, - node->subchannel); - } - - node->next = NULL; - node->prev = NULL; - node->subchannel = NULL; - - gpr_free(node); -} - -void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - size_t i; - ready_list *elem; - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); - gpr_free(sd); - } - - grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); - gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); - - elem = p->ready_list.next; - while (elem != NULL && elem != &p->ready_list) { - ready_list *tmp; - tmp = elem->next; - elem->next = NULL; - elem->prev = NULL; - elem->subchannel = NULL; - gpr_free(elem); - elem = tmp; - } - gpr_free(p); -} - -void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - size_t i; - - gpr_mu_lock(&p->mu); - - p->shutdown = 1; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); - gpr_free(pp); - } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "shutdown"); - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, - &sd->connectivity_changed_closure); - } - gpr_mu_unlock(&p->mu); -} - -static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - gpr_mu_lock(&p->mu); - pp = p->pending_picks; - p->pending_picks = NULL; - while (pp != NULL) { - pending_pick *next = pp->next; - if (pp->target == target) { - grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, - pp->pollset); - *target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); - gpr_free(pp); - } else { - pp->next = p->pending_picks; - p->pending_picks = pp; - } - pp = next; - } - gpr_mu_unlock(&p->mu); -} - -static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { - size_t i; - p->started_picking = 1; - - gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p, - p->num_subchannels); - - for (i = 0; i < p->num_subchannels; i++) { - subchannel_data *sd = p->subchannels[i]; - sd->connectivity_state = GRPC_CHANNEL_IDLE; - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - GRPC_LB_POLICY_WEAK_REF(&p->base, "round_robin_connectivity"); - } -} - -void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - gpr_mu_lock(&p->mu); - if (!p->started_picking) { - start_picking(exec_ctx, p); - } - gpr_mu_unlock(&p->mu); -} - -int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, - grpc_metadata_batch *initial_metadata, - grpc_connected_subchannel **target, grpc_closure *on_complete) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - pending_pick *pp; - ready_list *selected; - gpr_mu_lock(&p->mu); - if ((selected = peek_next_connected_locked(p))) { - gpr_mu_unlock(&p->mu); - *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, - "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", - selected->subchannel, selected); - } - /* only advance the last picked pointer if the selection was used */ - advance_last_picked_locked(p); - return 1; - } else { - if (!p->started_picking) { - start_picking(exec_ctx, p); - } - grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset); - pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->pollset = pollset; - pp->target = target; - pp->on_complete = on_complete; - p->pending_picks = pp; - gpr_mu_unlock(&p->mu); - return 0; - } -} - -static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_success) { - subchannel_data *sd = arg; - round_robin_lb_policy *p = sd->policy; - pending_pick *pp; - ready_list *selected; - - int unref = 0; - - gpr_mu_lock(&p->mu); - - if (p->shutdown) { - unref = 1; - } else { - switch (sd->connectivity_state) { - case GRPC_CHANNEL_READY: - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_READY, "connecting_ready"); - /* add the newly connected subchannel to the list of connected ones. - * Note that it goes to the "end of the line". */ - sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel); - /* at this point we know there's at least one suitable subchannel. Go - * ahead and pick one and notify the pending suitors in - * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - selected = peek_next_connected_locked(p); - if (p->pending_picks != NULL) { - /* if the selected subchannel is going to be used for the pending - * picks, update the last picked pointer */ - advance_last_picked_locked(p); - } - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = - grpc_subchannel_get_connected_subchannel(selected->subchannel); - if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", - selected->subchannel, selected); - } - grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, - pp->pollset); - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); - gpr_free(pp); - } - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_CONNECTING: - case GRPC_CHANNEL_IDLE: - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - sd->connectivity_state, - "connecting_changed"); - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - break; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - /* renew state notification */ - grpc_subchannel_notify_on_state_change( - exec_ctx, sd->subchannel, p->base.interested_parties, - &sd->connectivity_state, &sd->connectivity_changed_closure); - - /* remove from ready list if still present */ - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure"); - break; - case GRPC_CHANNEL_FATAL_FAILURE: - if (sd->ready_list_node != NULL) { - remove_disconnected_sc_locked(p, sd->ready_list_node); - sd->ready_list_node = NULL; - } - - p->num_subchannels--; - GPR_SWAP(subchannel_data *, p->subchannels[sd->index], - p->subchannels[p->num_subchannels]); - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); - p->subchannels[sd->index]->index = sd->index; - gpr_free(sd); - - unref = 1; - if (p->num_subchannels == 0) { - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, - "no_more_channels"); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); - gpr_free(pp); - } - } else { - grpc_connectivity_state_set(exec_ctx, &p->state_tracker, - GRPC_CHANNEL_TRANSIENT_FAILURE, - "subchannel_failed"); - } - } /* switch */ - } /* !unref */ - - gpr_mu_unlock(&p->mu); - - if (unref) { - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity"); - } -} - -static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - grpc_connectivity_state st; - gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker); - gpr_mu_unlock(&p->mu); - return st; -} - -static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - gpr_mu_lock(&p->mu); - grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, - current, notify); - gpr_mu_unlock(&p->mu); -} - -static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - ready_list *selected; - grpc_connected_subchannel *target; - gpr_mu_lock(&p->mu); - if ((selected = peek_next_connected_locked(p))) { - gpr_mu_unlock(&p->mu); - target = grpc_subchannel_get_connected_subchannel(selected->subchannel); - grpc_connected_subchannel_ping(exec_ctx, target, closure); - } else { - gpr_mu_unlock(&p->mu); - grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); - } -} - -static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, - rr_shutdown, - rr_pick, - rr_cancel_pick, - rr_ping_one, - rr_exit_idle, - rr_check_connectivity, - rr_notify_on_state_change}; - -static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} - -static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} - -static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - GPR_ASSERT(args->addresses != NULL); - GPR_ASSERT(args->subchannel_factory != NULL); - - round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); - memset(p, 0, sizeof(*p)); - - p->subchannels = - gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs); - - grpc_subchannel_args sc_args; - size_t subchannel_idx = 0; - for (size_t i = 0; i < args->addresses->naddrs; i++) { - memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr); - sc_args.addr_len = (size_t)args->addresses->addrs[i].len; - - grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( - exec_ctx, args->subchannel_factory, &sc_args); - - if (subchannel != NULL) { - subchannel_data *sd = gpr_malloc(sizeof(*sd)); - memset(sd, 0, sizeof(*sd)); - p->subchannels[subchannel_idx] = sd; - sd->policy = p; - sd->index = subchannel_idx; - sd->subchannel = subchannel; - ++subchannel_idx; - grpc_closure_init(&sd->connectivity_changed_closure, - rr_connectivity_changed, sd); - } - } - if (subchannel_idx == 0) { - gpr_free(p->subchannels); - gpr_free(p); - return NULL; - } - p->num_subchannels = subchannel_idx; - - /* The (dummy node) root of the ready list */ - p->ready_list.subchannel = NULL; - p->ready_list.prev = NULL; - p->ready_list.next = NULL; - p->ready_list_last_pick = &p->ready_list; - - grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); - grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, - "round_robin"); - gpr_mu_init(&p->mu); - return &p->base; -} - -static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = { - round_robin_factory_ref, round_robin_factory_unref, create_round_robin, - "round_robin"}; - -static grpc_lb_policy_factory round_robin_lb_policy_factory = { - &round_robin_factory_vtable}; - -grpc_lb_policy_factory *grpc_round_robin_lb_factory_create() { - return &round_robin_lb_policy_factory; -} diff --git a/src/core/lib/client_config/lb_policies/round_robin.h b/src/core/lib/client_config/lb_policies/round_robin.h deleted file mode 100644 index 52db1caa0c..0000000000 --- a/src/core/lib/client_config/lb_policies/round_robin.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_ROUND_ROBIN_H -#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_ROUND_ROBIN_H - -#include "src/core/lib/client_config/lb_policy.h" - -extern int grpc_lb_round_robin_trace; - -#include "src/core/lib/client_config/lb_policy_factory.h" - -/** Returns a load balancing factory for the round robin policy */ -grpc_lb_policy_factory *grpc_round_robin_lb_factory_create(); - -#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICIES_ROUND_ROBIN_H */ diff --git a/src/core/lib/client_config/lb_policy_registry.c b/src/core/lib/client_config/lb_policy_registry.c index f703e630a0..d1dc502b9a 100644 --- a/src/core/lib/client_config/lb_policy_registry.c +++ b/src/core/lib/client_config/lb_policy_registry.c @@ -40,12 +40,7 @@ static grpc_lb_policy_factory *g_all_of_the_lb_policies[MAX_POLICIES]; static int g_number_of_lb_policies = 0; -static grpc_lb_policy_factory *g_default_lb_policy_factory; - -void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory) { - g_number_of_lb_policies = 0; - g_default_lb_policy_factory = default_factory; -} +void grpc_lb_policy_registry_init(void) { g_number_of_lb_policies = 0; } void grpc_lb_policy_registry_shutdown(void) { int i; diff --git a/src/core/lib/client_config/lb_policy_registry.h b/src/core/lib/client_config/lb_policy_registry.h index 789854bd20..1ecf7fe39f 100644 --- a/src/core/lib/client_config/lb_policy_registry.h +++ b/src/core/lib/client_config/lb_policy_registry.h @@ -39,7 +39,7 @@ /** Initialize the registry and set \a default_factory as the factory to be * returned when no name is provided in a lookup */ -void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory); +void grpc_lb_policy_registry_init(void); void grpc_lb_policy_registry_shutdown(void); /** Register a LB policy factory. */ diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index c8da5b1347..952cf28345 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -31,6 +31,10 @@ * */ +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_SOCKET + #include "src/core/lib/iomgr/ev_posix.h" #include <string.h> @@ -242,3 +246,5 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, } void grpc_kick_poller(void) { g_event_engine->kick_poller(); } + +#endif // GPR_POSIX_SOCKET diff --git a/src/core/lib/iomgr/iomgr_windows.c b/src/core/lib/iomgr/iomgr_windows.c index af7e616394..f3c752e72a 100644 --- a/src/core/lib/iomgr/iomgr_windows.c +++ b/src/core/lib/iomgr/iomgr_windows.c @@ -41,6 +41,7 @@ #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/pollset_windows.h" #include "src/core/lib/iomgr/socket_windows.h" /* Windows' io manager is going to be fully designed using IO completion @@ -61,11 +62,13 @@ static void winsock_shutdown(void) { void grpc_iomgr_platform_init(void) { winsock_init(); grpc_iocp_init(); + grpc_pollset_global_init(); } void grpc_iomgr_platform_flush(void) { grpc_iocp_flush(); } void grpc_iomgr_platform_shutdown(void) { + grpc_pollset_global_shutdown(); grpc_iocp_shutdown(); winsock_shutdown(); } diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 6b339127a8..acb2bac634 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -47,7 +47,7 @@ gpr_mu grpc_polling_mu; static grpc_pollset_worker *g_active_poller; static grpc_pollset_worker g_global_root_worker; -void grpc_pollset_global_init() { +void grpc_pollset_global_init(void) { gpr_mu_init(&grpc_polling_mu); g_active_poller = NULL; g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = @@ -55,7 +55,7 @@ void grpc_pollset_global_init() { &g_global_root_worker; } -void grpc_pollset_global_shutdown() { gpr_mu_destroy(&grpc_polling_mu); } +void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); } static void remove_worker(grpc_pollset_worker *worker, grpc_pollset_worker_link_type type) { diff --git a/src/core/lib/iomgr/pollset_windows.h b/src/core/lib/iomgr/pollset_windows.h index fa9553ffea..1833dcf035 100644 --- a/src/core/lib/iomgr/pollset_windows.h +++ b/src/core/lib/iomgr/pollset_windows.h @@ -72,4 +72,7 @@ struct grpc_pollset { grpc_closure *on_shutdown; }; +void grpc_pollset_global_init(void); +void grpc_pollset_global_shutdown(void); + #endif /* GRPC_CORE_LIB_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/lib/iomgr/unix_sockets_posix_noop.c b/src/core/lib/iomgr/unix_sockets_posix_noop.c index 06f6ee05e7..43e006e15e 100644 --- a/src/core/lib/iomgr/unix_sockets_posix_noop.c +++ b/src/core/lib/iomgr/unix_sockets_posix_noop.c @@ -35,7 +35,12 @@ #ifndef GPR_HAVE_UNIX_SOCKET -void grpc_create_socketpair_if_unix(int sv[2]) {} +void grpc_create_socketpair_if_unix(int sv[2]) { + // TODO: Either implement this for the non-Unix socket case or make + // sure that it is never called in any such case. Until then, leave an + // assertion to notify if this gets called inadvertently + GPR_ASSERT(0); +} grpc_resolved_addresses *grpc_resolve_unix_domain_address(const char *name) { return NULL; diff --git a/src/core/lib/proto/grpc/lb/v0/load_balancer.pb.c b/src/core/lib/proto/grpc/lb/v0/load_balancer.pb.c deleted file mode 100644 index 8f82141f96..0000000000 --- a/src/core/lib/proto/grpc/lb/v0/load_balancer.pb.c +++ /dev/null @@ -1,119 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ -/* Automatically generated nanopb constant definitions */ -/* Generated by nanopb-0.3.5-dev */ - -#include "src/core/lib/proto/grpc/lb/v0/load_balancer.pb.h" - -#if PB_PROTO_HEADER_VERSION != 30 -#error Regenerate this file with the current version of nanopb generator. -#endif - - - -const pb_field_t grpc_lb_v0_Duration_fields[3] = { - PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v0_Duration, seconds, seconds, 0), - PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Duration, nanos, seconds, 0), - PB_LAST_FIELD -}; - -const pb_field_t grpc_lb_v0_LoadBalanceRequest_fields[3] = { - PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v0_LoadBalanceRequest, initial_request, initial_request, &grpc_lb_v0_InitialLoadBalanceRequest_fields), - PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_LoadBalanceRequest, client_stats, initial_request, &grpc_lb_v0_ClientStats_fields), - PB_LAST_FIELD -}; - -const pb_field_t grpc_lb_v0_InitialLoadBalanceRequest_fields[2] = { - PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_InitialLoadBalanceRequest, name, name, 0), - PB_LAST_FIELD -}; - -const pb_field_t grpc_lb_v0_ClientStats_fields[4] = { - PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v0_ClientStats, total_requests, total_requests, 0), - PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ClientStats, client_rpc_errors, total_requests, 0), - PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ClientStats, dropped_requests, client_rpc_errors, 0), - PB_LAST_FIELD -}; - -const pb_field_t grpc_lb_v0_LoadBalanceResponse_fields[3] = { - PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v0_LoadBalanceResponse, initial_response, initial_response, &grpc_lb_v0_InitialLoadBalanceResponse_fields), - PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_LoadBalanceResponse, server_list, initial_response, &grpc_lb_v0_ServerList_fields), - PB_LAST_FIELD -}; - -const pb_field_t grpc_lb_v0_InitialLoadBalanceResponse_fields[4] = { - PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_InitialLoadBalanceResponse, client_config, client_config, 0), - PB_FIELD( 2, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v0_InitialLoadBalanceResponse, load_balancer_delegate, client_config, 0), - PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v0_Duration_fields), - PB_LAST_FIELD -}; - -const pb_field_t grpc_lb_v0_ServerList_fields[3] = { - PB_FIELD( 1, MESSAGE , REPEATED, CALLBACK, FIRST, grpc_lb_v0_ServerList, servers, servers, &grpc_lb_v0_Server_fields), - PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ServerList, expiration_interval, servers, &grpc_lb_v0_Duration_fields), - PB_LAST_FIELD -}; - -const pb_field_t grpc_lb_v0_Server_fields[5] = { - PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_Server, ip_address, ip_address, 0), - PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, port, ip_address, 0), - PB_FIELD( 3, BYTES , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, load_balance_token, port, 0), - PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, drop_request, load_balance_token, 0), - PB_LAST_FIELD -}; - - -/* Check that field information fits in pb_field_t */ -#if !defined(PB_FIELD_32BIT) -/* If you get an error here, it means that you need to define PB_FIELD_32BIT - * compile-time option. You can do that in pb.h or on compiler command line. - * - * The reason you need to do this is that some of your messages contain tag - * numbers or field sizes that are larger than what can fit in 8 or 16 bit - * field descriptors. - */ -PB_STATIC_ASSERT((pb_membersize(grpc_lb_v0_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v0_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v0_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v0_Duration_grpc_lb_v0_LoadBalanceRequest_grpc_lb_v0_InitialLoadBalanceRequest_grpc_lb_v0_ClientStats_grpc_lb_v0_LoadBalanceResponse_grpc_lb_v0_InitialLoadBalanceResponse_grpc_lb_v0_ServerList_grpc_lb_v0_Server) -#endif - -#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT) -/* If you get an error here, it means that you need to define PB_FIELD_16BIT - * compile-time option. You can do that in pb.h or on compiler command line. - * - * The reason you need to do this is that some of your messages contain tag - * numbers or field sizes that are larger than what can fit in the default - * 8 bit descriptors. - */ -PB_STATIC_ASSERT((pb_membersize(grpc_lb_v0_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v0_ServerList, servers) < 256 && pb_membersize(grpc_lb_v0_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v0_Duration_grpc_lb_v0_LoadBalanceRequest_grpc_lb_v0_InitialLoadBalanceRequest_grpc_lb_v0_ClientStats_grpc_lb_v0_LoadBalanceResponse_grpc_lb_v0_InitialLoadBalanceResponse_grpc_lb_v0_ServerList_grpc_lb_v0_Server) -#endif - - diff --git a/src/core/lib/proto/grpc/lb/v0/load_balancer.pb.h b/src/core/lib/proto/grpc/lb/v0/load_balancer.pb.h deleted file mode 100644 index 3599f881bb..0000000000 --- a/src/core/lib/proto/grpc/lb/v0/load_balancer.pb.h +++ /dev/null @@ -1,182 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ -/* Automatically generated nanopb header */ -/* Generated by nanopb-0.3.5-dev */ - -#ifndef PB_LOAD_BALANCER_PB_H_INCLUDED -#define PB_LOAD_BALANCER_PB_H_INCLUDED -#include "third_party/nanopb/pb.h" -#if PB_PROTO_HEADER_VERSION != 30 -#error Regenerate this file with the current version of nanopb generator. -#endif - -#ifdef __cplusplus -extern "C" { -#endif - -/* Struct definitions */ -typedef struct _grpc_lb_v0_ClientStats { - bool has_total_requests; - int64_t total_requests; - bool has_client_rpc_errors; - int64_t client_rpc_errors; - bool has_dropped_requests; - int64_t dropped_requests; -} grpc_lb_v0_ClientStats; - -typedef struct _grpc_lb_v0_Duration { - bool has_seconds; - int64_t seconds; - bool has_nanos; - int32_t nanos; -} grpc_lb_v0_Duration; - -typedef struct _grpc_lb_v0_InitialLoadBalanceRequest { - bool has_name; - char name[128]; -} grpc_lb_v0_InitialLoadBalanceRequest; - -typedef PB_BYTES_ARRAY_T(64) grpc_lb_v0_Server_load_balance_token_t; -typedef struct _grpc_lb_v0_Server { - bool has_ip_address; - char ip_address[46]; - bool has_port; - int32_t port; - bool has_load_balance_token; - grpc_lb_v0_Server_load_balance_token_t load_balance_token; - bool has_drop_request; - bool drop_request; -} grpc_lb_v0_Server; - -typedef struct _grpc_lb_v0_InitialLoadBalanceResponse { - bool has_client_config; - char client_config[64]; - bool has_load_balancer_delegate; - char load_balancer_delegate[64]; - bool has_client_stats_report_interval; - grpc_lb_v0_Duration client_stats_report_interval; -} grpc_lb_v0_InitialLoadBalanceResponse; - -typedef struct _grpc_lb_v0_LoadBalanceRequest { - bool has_initial_request; - grpc_lb_v0_InitialLoadBalanceRequest initial_request; - bool has_client_stats; - grpc_lb_v0_ClientStats client_stats; -} grpc_lb_v0_LoadBalanceRequest; - -typedef struct _grpc_lb_v0_ServerList { - pb_callback_t servers; - bool has_expiration_interval; - grpc_lb_v0_Duration expiration_interval; -} grpc_lb_v0_ServerList; - -typedef struct _grpc_lb_v0_LoadBalanceResponse { - bool has_initial_response; - grpc_lb_v0_InitialLoadBalanceResponse initial_response; - bool has_server_list; - grpc_lb_v0_ServerList server_list; -} grpc_lb_v0_LoadBalanceResponse; - -/* Default values for struct fields */ - -/* Initializer values for message structs */ -#define grpc_lb_v0_Duration_init_default {false, 0, false, 0} -#define grpc_lb_v0_LoadBalanceRequest_init_default {false, grpc_lb_v0_InitialLoadBalanceRequest_init_default, false, grpc_lb_v0_ClientStats_init_default} -#define grpc_lb_v0_InitialLoadBalanceRequest_init_default {false, ""} -#define grpc_lb_v0_ClientStats_init_default {false, 0, false, 0, false, 0} -#define grpc_lb_v0_LoadBalanceResponse_init_default {false, grpc_lb_v0_InitialLoadBalanceResponse_init_default, false, grpc_lb_v0_ServerList_init_default} -#define grpc_lb_v0_InitialLoadBalanceResponse_init_default {false, "", false, "", false, grpc_lb_v0_Duration_init_default} -#define grpc_lb_v0_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v0_Duration_init_default} -#define grpc_lb_v0_Server_init_default {false, "", false, 0, false, {0, {0}}, false, 0} -#define grpc_lb_v0_Duration_init_zero {false, 0, false, 0} -#define grpc_lb_v0_LoadBalanceRequest_init_zero {false, grpc_lb_v0_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v0_ClientStats_init_zero} -#define grpc_lb_v0_InitialLoadBalanceRequest_init_zero {false, ""} -#define grpc_lb_v0_ClientStats_init_zero {false, 0, false, 0, false, 0} -#define grpc_lb_v0_LoadBalanceResponse_init_zero {false, grpc_lb_v0_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v0_ServerList_init_zero} -#define grpc_lb_v0_InitialLoadBalanceResponse_init_zero {false, "", false, "", false, grpc_lb_v0_Duration_init_zero} -#define grpc_lb_v0_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v0_Duration_init_zero} -#define grpc_lb_v0_Server_init_zero {false, "", false, 0, false, {0, {0}}, false, 0} - -/* Field tags (for use in manual encoding/decoding) */ -#define grpc_lb_v0_ClientStats_total_requests_tag 1 -#define grpc_lb_v0_ClientStats_client_rpc_errors_tag 2 -#define grpc_lb_v0_ClientStats_dropped_requests_tag 3 -#define grpc_lb_v0_Duration_seconds_tag 1 -#define grpc_lb_v0_Duration_nanos_tag 2 -#define grpc_lb_v0_InitialLoadBalanceRequest_name_tag 1 -#define grpc_lb_v0_Server_ip_address_tag 1 -#define grpc_lb_v0_Server_port_tag 2 -#define grpc_lb_v0_Server_load_balance_token_tag 3 -#define grpc_lb_v0_Server_drop_request_tag 4 -#define grpc_lb_v0_InitialLoadBalanceResponse_client_config_tag 1 -#define grpc_lb_v0_InitialLoadBalanceResponse_load_balancer_delegate_tag 2 -#define grpc_lb_v0_InitialLoadBalanceResponse_client_stats_report_interval_tag 3 -#define grpc_lb_v0_LoadBalanceRequest_initial_request_tag 1 -#define grpc_lb_v0_LoadBalanceRequest_client_stats_tag 2 -#define grpc_lb_v0_ServerList_servers_tag 1 -#define grpc_lb_v0_ServerList_expiration_interval_tag 3 -#define grpc_lb_v0_LoadBalanceResponse_initial_response_tag 1 -#define grpc_lb_v0_LoadBalanceResponse_server_list_tag 2 - -/* Struct field encoding specification for nanopb */ -extern const pb_field_t grpc_lb_v0_Duration_fields[3]; -extern const pb_field_t grpc_lb_v0_LoadBalanceRequest_fields[3]; -extern const pb_field_t grpc_lb_v0_InitialLoadBalanceRequest_fields[2]; -extern const pb_field_t grpc_lb_v0_ClientStats_fields[4]; -extern const pb_field_t grpc_lb_v0_LoadBalanceResponse_fields[3]; -extern const pb_field_t grpc_lb_v0_InitialLoadBalanceResponse_fields[4]; -extern const pb_field_t grpc_lb_v0_ServerList_fields[3]; -extern const pb_field_t grpc_lb_v0_Server_fields[5]; - -/* Maximum encoded size of messages (where known) */ -#define grpc_lb_v0_Duration_size 22 -#define grpc_lb_v0_LoadBalanceRequest_size 169 -#define grpc_lb_v0_InitialLoadBalanceRequest_size 131 -#define grpc_lb_v0_ClientStats_size 33 -#define grpc_lb_v0_LoadBalanceResponse_size (165 + grpc_lb_v0_ServerList_size) -#define grpc_lb_v0_InitialLoadBalanceResponse_size 156 -#define grpc_lb_v0_Server_size 127 - -/* Message IDs (where set with "msgid" option) */ -#ifdef PB_MSGID - -#define LOAD_BALANCER_MESSAGES \ - - -#endif - -#ifdef __cplusplus -} /* extern "C" */ -#endif - -#endif diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index d63a4a7401..77ad410b50 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -174,6 +174,9 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; + /* Call stats: only valid after trailing metadata received */ + grpc_transport_stream_stats stats; + /* Compression algorithm for the call */ grpc_compression_algorithm compression_algorithm; /* Supported encodings (compression algorithms), a bitset */ @@ -909,7 +912,7 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { *(int *)dest = (status != GRPC_STATUS_OK); } -static int are_write_flags_valid(uint32_t flags) { +static bool are_write_flags_valid(uint32_t flags) { /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ const uint32_t allowed_write_positions = (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); @@ -917,6 +920,15 @@ static int are_write_flags_valid(uint32_t flags) { return !(flags & invalid_positions); } +static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) { + /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */ + uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK; + if (!is_client) { + invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; + } + return !(flags & invalid_positions); +} + static batch_control *allocate_batch_control(grpc_call *call) { size_t i; for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) { @@ -1196,7 +1208,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: /* Flag validation: currently allow no flags */ - if (op->flags != 0) { + if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } @@ -1220,6 +1232,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->metadata_batch[0][0].deadline = call->send_deadline; stream_op.send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; + stream_op.send_idempotent_request = + (op->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) != 0; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1371,6 +1385,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->recv_final_op = 1; stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + stream_op.collect_stats = &call->stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: /* Flag validation: currently allow no flags */ @@ -1392,6 +1407,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->recv_final_op = 1; stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + stream_op.collect_stats = &call->stats; break; } } diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index dbfc8a9336..3ccc21fb31 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -48,8 +48,6 @@ #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/client_config/lb_policies/pick_first.h" -#include "src/core/lib/client_config/lb_policies/round_robin.h" #include "src/core/lib/client_config/lb_policy_registry.h" #include "src/core/lib/client_config/resolver_registry.h" #include "src/core/lib/client_config/resolvers/dns_resolver.h" @@ -75,6 +73,9 @@ #define GRPC_DEFAULT_NAME_PREFIX "dns:///" #endif +/* (generated) built in registry of plugins */ +extern void grpc_register_built_in_plugins(void); + #define MAX_PLUGINS 128 static gpr_once g_basic_init = GPR_ONCE_INIT; @@ -83,6 +84,7 @@ static int g_initializations; static void do_basic_init(void) { gpr_mu_init(&g_init_mu); + grpc_register_built_in_plugins(); /* TODO(ctiller): ideally remove this strict linkage */ grpc_register_plugin(census_grpc_plugin_init, census_grpc_plugin_destroy); g_initializations = 0; @@ -165,14 +167,12 @@ void grpc_init(void) { gpr_time_init(); grpc_mdctx_global_init(); grpc_channel_init_init(); - grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create()); - grpc_register_lb_policy(grpc_pick_first_lb_factory_create()); - grpc_register_lb_policy(grpc_round_robin_lb_factory_create()); + grpc_lb_policy_registry_init(); grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX); grpc_register_resolver_type(grpc_dns_resolver_factory_create()); grpc_register_resolver_type(grpc_ipv4_resolver_factory_create()); grpc_register_resolver_type(grpc_ipv6_resolver_factory_create()); -#ifdef GPR_POSIX_SOCKET +#ifdef GPR_HAVE_UNIX_SOCKET grpc_register_resolver_type(grpc_unix_resolver_factory_create()); #endif grpc_register_tracer("api", &grpc_api_trace); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 080734e9d5..55e5e49e11 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -100,6 +100,7 @@ typedef struct requested_call { typedef struct channel_registered_method { registered_method *server_registered_method; + uint32_t flags; grpc_mdstr *method; grpc_mdstr *host; } channel_registered_method; @@ -152,6 +153,7 @@ struct call_data { grpc_completion_queue *cq_new; grpc_metadata_batch *recv_initial_metadata; + bool recv_idempotent_request; grpc_metadata_array initial_metadata; grpc_closure got_initial_metadata; @@ -171,6 +173,7 @@ struct request_matcher { struct registered_method { char *method; char *host; + uint32_t flags; request_matcher request_matcher; registered_method *next; }; @@ -468,6 +471,9 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; + if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && + !calld->recv_idempotent_request) + continue; finish_start_new_rpc(exec_ctx, server, elem, &rm->server_registered_method->request_matcher); return; @@ -480,6 +486,9 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if (!rm) break; if (rm->host != NULL) continue; if (rm->method != calld->path) continue; + if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && + !calld->recv_idempotent_request) + continue; finish_start_new_rpc(exec_ctx, server, elem, &rm->server_registered_method->request_matcher); return; @@ -598,9 +607,11 @@ static void server_mutate_op(grpc_call_element *elem, call_data *calld = elem->call_data; if (op->recv_initial_metadata != NULL) { + GPR_ASSERT(op->recv_idempotent_request == NULL); calld->recv_initial_metadata = op->recv_initial_metadata; calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata; + op->recv_idempotent_request = &calld->recv_idempotent_request; } } @@ -830,10 +841,12 @@ static int streq(const char *a, const char *b) { } void *grpc_server_register_method(grpc_server *server, const char *method, - const char *host) { + const char *host, uint32_t flags) { registered_method *m; - GRPC_API_TRACE("grpc_server_register_method(server=%p, method=%s, host=%s)", - 3, (server, method, host)); + GRPC_API_TRACE( + "grpc_server_register_method(server=%p, method=%s, host=%s, " + "flags=0x%08x)", + 4, (server, method, host, flags)); if (!method) { gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL"); @@ -846,12 +859,18 @@ void *grpc_server_register_method(grpc_server *server, const char *method, return NULL; } } + if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) { + gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x", + flags); + return NULL; + } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); request_matcher_init(&m->request_matcher, server->max_requested_calls); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; + m->flags = flags; server->registered_methods = m; return m; } @@ -930,6 +949,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, if (probes > max_probes) max_probes = probes; crm = &chand->registered_methods[(hash + probes) % slots]; crm->server_registered_method = rm; + crm->flags = rm->flags; crm->host = host; crm->method = method; } @@ -1247,6 +1267,10 @@ static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); rc->data.batch.details->deadline = calld->deadline; + rc->data.batch.details->flags = + 0 | (calld->recv_idempotent_request + ? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST + : 0); break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c index eda277b3dc..bde1fafc88 100644 --- a/src/core/lib/transport/static_metadata.c +++ b/src/core/lib/transport/static_metadata.c @@ -1,5 +1,4 @@ /* - * * Copyright 2015-2016, Google Inc. * All rights reserved. * @@ -28,19 +27,16 @@ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * */ /* * WARNING: Auto-generated code. * * To make changes to this file, change - * tools/codegen/core/gen_static_metadata.py, - * and then re-run it. + * tools/codegen/core/gen_static_metadata.py, and then re-run it. * * See metadata.h for an explanation of the interface here, and metadata.c for - * an - * explanation of what's going on. + * an explanation of what's going on. */ #include "src/core/lib/transport/static_metadata.h" @@ -52,7 +48,7 @@ uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 8, 6, 2, 4, 8, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; const uint8_t grpc_static_metadata_elem_indices[GRPC_STATIC_MDELEM_COUNT * 2] = {11, 35, 10, 35, 12, 35, 12, 49, 13, 35, 14, 35, 15, 35, 16, 35, 17, 35, @@ -60,10 +56,10 @@ const uint8_t grpc_static_metadata_elem_indices[GRPC_STATIC_MDELEM_COUNT * 2] = 30, 18, 30, 35, 31, 35, 32, 35, 36, 35, 37, 35, 38, 35, 39, 35, 42, 33, 42, 34, 42, 48, 42, 53, 42, 54, 42, 55, 42, 56, 43, 33, 43, 48, 43, 53, 46, 0, 46, 1, 46, 2, 50, 35, 57, 35, 58, 35, 59, 35, 60, 35, 61, 35, - 62, 35, 63, 35, 64, 35, 65, 35, 66, 40, 66, 68, 67, 78, 67, 79, 69, 35, - 70, 35, 71, 35, 72, 35, 73, 35, 74, 35, 75, 41, 75, 51, 75, 52, 76, 35, - 77, 35, 80, 3, 80, 4, 80, 5, 80, 6, 80, 7, 80, 8, 80, 9, 81, 35, - 82, 83, 84, 35, 85, 35, 86, 35, 87, 35, 88, 35}; + 62, 35, 63, 35, 64, 35, 65, 35, 66, 40, 66, 68, 66, 71, 67, 79, 67, 80, + 69, 35, 70, 35, 72, 35, 73, 35, 74, 35, 75, 35, 76, 41, 76, 51, 76, 52, + 77, 35, 78, 35, 81, 3, 81, 4, 81, 5, 81, 6, 81, 7, 81, 8, 81, 9, + 82, 35, 83, 84, 85, 35, 86, 35, 87, 35, 88, 35, 89, 35}; const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = { "0", @@ -137,6 +133,7 @@ const char *const grpc_static_metadata_strings[GRPC_STATIC_MDSTR_COUNT] = { "POST", "proxy-authenticate", "proxy-authorization", + "PUT", "range", "referer", "refresh", diff --git a/src/core/lib/transport/static_metadata.h b/src/core/lib/transport/static_metadata.h index aff136a6d2..a05553a870 100644 --- a/src/core/lib/transport/static_metadata.h +++ b/src/core/lib/transport/static_metadata.h @@ -1,5 +1,4 @@ /* - * * Copyright 2015-2016, Google Inc. * All rights reserved. * @@ -28,19 +27,16 @@ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * */ /* * WARNING: Auto-generated code. * * To make changes to this file, change - * tools/codegen/core/gen_static_metadata.py, - * and then re-run it. + * tools/codegen/core/gen_static_metadata.py, and then re-run it. * * See metadata.h for an explanation of the interface here, and metadata.c for - * an - * explanation of what's going on. + * an explanation of what's going on. */ #ifndef GRPC_CORE_LIB_TRANSPORT_STATIC_METADATA_H @@ -48,7 +44,7 @@ #include "src/core/lib/transport/metadata.h" -#define GRPC_STATIC_MDSTR_COUNT 89 +#define GRPC_STATIC_MDSTR_COUNT 90 extern grpc_mdstr grpc_static_mdstr_table[GRPC_STATIC_MDSTR_COUNT]; /* "0" */ #define GRPC_MDSTR_0 (&grpc_static_mdstr_table[0]) @@ -193,44 +189,46 @@ extern grpc_mdstr grpc_static_mdstr_table[GRPC_STATIC_MDSTR_COUNT]; #define GRPC_MDSTR_PROXY_AUTHENTICATE (&grpc_static_mdstr_table[69]) /* "proxy-authorization" */ #define GRPC_MDSTR_PROXY_AUTHORIZATION (&grpc_static_mdstr_table[70]) +/* "PUT" */ +#define GRPC_MDSTR_PUT (&grpc_static_mdstr_table[71]) /* "range" */ -#define GRPC_MDSTR_RANGE (&grpc_static_mdstr_table[71]) +#define GRPC_MDSTR_RANGE (&grpc_static_mdstr_table[72]) /* "referer" */ -#define GRPC_MDSTR_REFERER (&grpc_static_mdstr_table[72]) +#define GRPC_MDSTR_REFERER (&grpc_static_mdstr_table[73]) /* "refresh" */ -#define GRPC_MDSTR_REFRESH (&grpc_static_mdstr_table[73]) +#define GRPC_MDSTR_REFRESH (&grpc_static_mdstr_table[74]) /* "retry-after" */ -#define GRPC_MDSTR_RETRY_AFTER (&grpc_static_mdstr_table[74]) +#define GRPC_MDSTR_RETRY_AFTER (&grpc_static_mdstr_table[75]) /* ":scheme" */ -#define GRPC_MDSTR_SCHEME (&grpc_static_mdstr_table[75]) +#define GRPC_MDSTR_SCHEME (&grpc_static_mdstr_table[76]) /* "server" */ -#define GRPC_MDSTR_SERVER (&grpc_static_mdstr_table[76]) +#define GRPC_MDSTR_SERVER (&grpc_static_mdstr_table[77]) /* "set-cookie" */ -#define GRPC_MDSTR_SET_COOKIE (&grpc_static_mdstr_table[77]) +#define GRPC_MDSTR_SET_COOKIE (&grpc_static_mdstr_table[78]) /* "/" */ -#define GRPC_MDSTR_SLASH (&grpc_static_mdstr_table[78]) +#define GRPC_MDSTR_SLASH (&grpc_static_mdstr_table[79]) /* "/index.html" */ -#define GRPC_MDSTR_SLASH_INDEX_DOT_HTML (&grpc_static_mdstr_table[79]) +#define GRPC_MDSTR_SLASH_INDEX_DOT_HTML (&grpc_static_mdstr_table[80]) /* ":status" */ -#define GRPC_MDSTR_STATUS (&grpc_static_mdstr_table[80]) +#define GRPC_MDSTR_STATUS (&grpc_static_mdstr_table[81]) /* "strict-transport-security" */ -#define GRPC_MDSTR_STRICT_TRANSPORT_SECURITY (&grpc_static_mdstr_table[81]) +#define GRPC_MDSTR_STRICT_TRANSPORT_SECURITY (&grpc_static_mdstr_table[82]) /* "te" */ -#define GRPC_MDSTR_TE (&grpc_static_mdstr_table[82]) +#define GRPC_MDSTR_TE (&grpc_static_mdstr_table[83]) /* "trailers" */ -#define GRPC_MDSTR_TRAILERS (&grpc_static_mdstr_table[83]) +#define GRPC_MDSTR_TRAILERS (&grpc_static_mdstr_table[84]) /* "transfer-encoding" */ -#define GRPC_MDSTR_TRANSFER_ENCODING (&grpc_static_mdstr_table[84]) +#define GRPC_MDSTR_TRANSFER_ENCODING (&grpc_static_mdstr_table[85]) /* "user-agent" */ -#define GRPC_MDSTR_USER_AGENT (&grpc_static_mdstr_table[85]) +#define GRPC_MDSTR_USER_AGENT (&grpc_static_mdstr_table[86]) /* "vary" */ -#define GRPC_MDSTR_VARY (&grpc_static_mdstr_table[86]) +#define GRPC_MDSTR_VARY (&grpc_static_mdstr_table[87]) /* "via" */ -#define GRPC_MDSTR_VIA (&grpc_static_mdstr_table[87]) +#define GRPC_MDSTR_VIA (&grpc_static_mdstr_table[88]) /* "www-authenticate" */ -#define GRPC_MDSTR_WWW_AUTHENTICATE (&grpc_static_mdstr_table[88]) +#define GRPC_MDSTR_WWW_AUTHENTICATE (&grpc_static_mdstr_table[89]) -#define GRPC_STATIC_MDELEM_COUNT 78 +#define GRPC_STATIC_MDELEM_COUNT 79 extern grpc_mdelem grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT]; extern uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT]; /* "accept-charset": "" */ @@ -343,61 +341,63 @@ extern uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT]; #define GRPC_MDELEM_METHOD_GET (&grpc_static_mdelem_table[49]) /* ":method": "POST" */ #define GRPC_MDELEM_METHOD_POST (&grpc_static_mdelem_table[50]) +/* ":method": "PUT" */ +#define GRPC_MDELEM_METHOD_PUT (&grpc_static_mdelem_table[51]) /* ":path": "/" */ -#define GRPC_MDELEM_PATH_SLASH (&grpc_static_mdelem_table[51]) +#define GRPC_MDELEM_PATH_SLASH (&grpc_static_mdelem_table[52]) /* ":path": "/index.html" */ -#define GRPC_MDELEM_PATH_SLASH_INDEX_DOT_HTML (&grpc_static_mdelem_table[52]) +#define GRPC_MDELEM_PATH_SLASH_INDEX_DOT_HTML (&grpc_static_mdelem_table[53]) /* "proxy-authenticate": "" */ -#define GRPC_MDELEM_PROXY_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[53]) +#define GRPC_MDELEM_PROXY_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[54]) /* "proxy-authorization": "" */ -#define GRPC_MDELEM_PROXY_AUTHORIZATION_EMPTY (&grpc_static_mdelem_table[54]) +#define GRPC_MDELEM_PROXY_AUTHORIZATION_EMPTY (&grpc_static_mdelem_table[55]) /* "range": "" */ -#define GRPC_MDELEM_RANGE_EMPTY (&grpc_static_mdelem_table[55]) +#define GRPC_MDELEM_RANGE_EMPTY (&grpc_static_mdelem_table[56]) /* "referer": "" */ -#define GRPC_MDELEM_REFERER_EMPTY (&grpc_static_mdelem_table[56]) +#define GRPC_MDELEM_REFERER_EMPTY (&grpc_static_mdelem_table[57]) /* "refresh": "" */ -#define GRPC_MDELEM_REFRESH_EMPTY (&grpc_static_mdelem_table[57]) +#define GRPC_MDELEM_REFRESH_EMPTY (&grpc_static_mdelem_table[58]) /* "retry-after": "" */ -#define GRPC_MDELEM_RETRY_AFTER_EMPTY (&grpc_static_mdelem_table[58]) +#define GRPC_MDELEM_RETRY_AFTER_EMPTY (&grpc_static_mdelem_table[59]) /* ":scheme": "grpc" */ -#define GRPC_MDELEM_SCHEME_GRPC (&grpc_static_mdelem_table[59]) +#define GRPC_MDELEM_SCHEME_GRPC (&grpc_static_mdelem_table[60]) /* ":scheme": "http" */ -#define GRPC_MDELEM_SCHEME_HTTP (&grpc_static_mdelem_table[60]) +#define GRPC_MDELEM_SCHEME_HTTP (&grpc_static_mdelem_table[61]) /* ":scheme": "https" */ -#define GRPC_MDELEM_SCHEME_HTTPS (&grpc_static_mdelem_table[61]) +#define GRPC_MDELEM_SCHEME_HTTPS (&grpc_static_mdelem_table[62]) /* "server": "" */ -#define GRPC_MDELEM_SERVER_EMPTY (&grpc_static_mdelem_table[62]) +#define GRPC_MDELEM_SERVER_EMPTY (&grpc_static_mdelem_table[63]) /* "set-cookie": "" */ -#define GRPC_MDELEM_SET_COOKIE_EMPTY (&grpc_static_mdelem_table[63]) +#define GRPC_MDELEM_SET_COOKIE_EMPTY (&grpc_static_mdelem_table[64]) /* ":status": "200" */ -#define GRPC_MDELEM_STATUS_200 (&grpc_static_mdelem_table[64]) +#define GRPC_MDELEM_STATUS_200 (&grpc_static_mdelem_table[65]) /* ":status": "204" */ -#define GRPC_MDELEM_STATUS_204 (&grpc_static_mdelem_table[65]) +#define GRPC_MDELEM_STATUS_204 (&grpc_static_mdelem_table[66]) /* ":status": "206" */ -#define GRPC_MDELEM_STATUS_206 (&grpc_static_mdelem_table[66]) +#define GRPC_MDELEM_STATUS_206 (&grpc_static_mdelem_table[67]) /* ":status": "304" */ -#define GRPC_MDELEM_STATUS_304 (&grpc_static_mdelem_table[67]) +#define GRPC_MDELEM_STATUS_304 (&grpc_static_mdelem_table[68]) /* ":status": "400" */ -#define GRPC_MDELEM_STATUS_400 (&grpc_static_mdelem_table[68]) +#define GRPC_MDELEM_STATUS_400 (&grpc_static_mdelem_table[69]) /* ":status": "404" */ -#define GRPC_MDELEM_STATUS_404 (&grpc_static_mdelem_table[69]) +#define GRPC_MDELEM_STATUS_404 (&grpc_static_mdelem_table[70]) /* ":status": "500" */ -#define GRPC_MDELEM_STATUS_500 (&grpc_static_mdelem_table[70]) +#define GRPC_MDELEM_STATUS_500 (&grpc_static_mdelem_table[71]) /* "strict-transport-security": "" */ #define GRPC_MDELEM_STRICT_TRANSPORT_SECURITY_EMPTY \ - (&grpc_static_mdelem_table[71]) + (&grpc_static_mdelem_table[72]) /* "te": "trailers" */ -#define GRPC_MDELEM_TE_TRAILERS (&grpc_static_mdelem_table[72]) +#define GRPC_MDELEM_TE_TRAILERS (&grpc_static_mdelem_table[73]) /* "transfer-encoding": "" */ -#define GRPC_MDELEM_TRANSFER_ENCODING_EMPTY (&grpc_static_mdelem_table[73]) +#define GRPC_MDELEM_TRANSFER_ENCODING_EMPTY (&grpc_static_mdelem_table[74]) /* "user-agent": "" */ -#define GRPC_MDELEM_USER_AGENT_EMPTY (&grpc_static_mdelem_table[74]) +#define GRPC_MDELEM_USER_AGENT_EMPTY (&grpc_static_mdelem_table[75]) /* "vary": "" */ -#define GRPC_MDELEM_VARY_EMPTY (&grpc_static_mdelem_table[75]) +#define GRPC_MDELEM_VARY_EMPTY (&grpc_static_mdelem_table[76]) /* "via": "" */ -#define GRPC_MDELEM_VIA_EMPTY (&grpc_static_mdelem_table[76]) +#define GRPC_MDELEM_VIA_EMPTY (&grpc_static_mdelem_table[77]) /* "www-authenticate": "" */ -#define GRPC_MDELEM_WWW_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[77]) +#define GRPC_MDELEM_WWW_AUTHENTICATE_EMPTY (&grpc_static_mdelem_table[78]) extern const uint8_t grpc_static_metadata_elem_indices[GRPC_STATIC_MDELEM_COUNT * 2]; diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 18256aae5e..411e4f140a 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -35,6 +35,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/atm.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include "src/core/lib/transport/transport_impl.h" #ifdef GRPC_STREAM_REFCOUNT_DEBUG @@ -76,6 +77,24 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, grpc_closure_init(&refcount->destroy, cb, cb_arg); } +static void move64(uint64_t *from, uint64_t *to) { + *to += *from; + *from = 0; +} + +void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, + grpc_transport_one_way_stats *to) { + move64(&from->framing_bytes, &to->framing_bytes); + move64(&from->data_bytes, &to->data_bytes); + move64(&from->header_bytes, &to->header_bytes); +} + +void grpc_transport_move_stats(grpc_transport_stream_stats *from, + grpc_transport_stream_stats *to) { + grpc_transport_move_one_way_stats(&from->incoming, &to->incoming); + grpc_transport_move_one_way_stats(&from->outgoing, &to->outgoing); +} + size_t grpc_transport_stream_size(grpc_transport *transport) { return transport->vtable->sizeof_stream; } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index e98cfe9515..6dd782c19e 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -78,11 +78,32 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount); grpc_stream_ref_init(rc, ir, cb, cb_arg) #endif +typedef struct { + uint64_t framing_bytes; + uint64_t data_bytes; + uint64_t header_bytes; +} grpc_transport_one_way_stats; + +typedef struct grpc_transport_stream_stats { + grpc_transport_one_way_stats incoming; + grpc_transport_one_way_stats outgoing; +} grpc_transport_stream_stats; + +void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, + grpc_transport_one_way_stats *to); + +void grpc_transport_move_stats(grpc_transport_stream_stats *from, + grpc_transport_stream_stats *to); + /* Transport stream op: a set of operations to perform on a transport against a single stream */ typedef struct grpc_transport_stream_op { - /** Send initial metadata to the peer, from the provided metadata batch. */ + /** Send initial metadata to the peer, from the provided metadata batch. + idempotent_request MUST be set if this is non-null */ grpc_metadata_batch *send_initial_metadata; + /** Iff send_initial_metadata != NULL, flags if this is an idempotent request + or not */ + bool send_idempotent_request; /** Send trailing metadata to the peer, from the provided metadata batch. */ grpc_metadata_batch *send_trailing_metadata; @@ -92,6 +113,7 @@ typedef struct grpc_transport_stream_op { /** Receive initial metadata from the stream, into provided metadata batch. */ grpc_metadata_batch *recv_initial_metadata; + bool *recv_idempotent_request; /** Should be enqueued when initial metadata is ready to be processed. */ grpc_closure *recv_initial_metadata_ready; @@ -104,6 +126,9 @@ typedef struct grpc_transport_stream_op { */ grpc_metadata_batch *recv_trailing_metadata; + /** Collect any stats into provided buffer, zero internal stat counters */ + grpc_transport_stream_stats *collect_stats; + /** Should be enqueued when all requested operations (excluding recv_message and recv_initial_metadata which have their own closures) in a given batch have been completed. */ |