aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/lb_policy
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/lb_policy')
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.c14
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.h10
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h182
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c (renamed from src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c)59
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h178
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c110
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c127
7 files changed, 368 insertions, 312 deletions
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
index 459d6d9954..59b89997dd 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
@@ -50,7 +50,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
decode_serverlist_arg *dec_arg = *arg;
if (dec_arg->first_pass != 0) { /* first pass */
grpc_grpclb_server server;
- if (!pb_decode(stream, grpc_lb_v0_Server_fields, &server)) {
+ if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
return false;
}
dec_arg->num_servers++;
@@ -61,7 +61,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
dec_arg->servers =
gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
}
- if (!pb_decode(stream, grpc_lb_v0_Server_fields, server)) {
+ if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
return false;
}
dec_arg->servers[dec_arg->i++] = server;
@@ -87,13 +87,13 @@ gpr_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) {
pb_ostream_t outputstream;
gpr_slice slice;
memset(&sizestream, 0, sizeof(pb_ostream_t));
- pb_encode(&sizestream, grpc_lb_v0_LoadBalanceRequest_fields, request);
+ pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request);
encoded_length = sizestream.bytes_written;
slice = gpr_slice_malloc(encoded_length);
outputstream =
pb_ostream_from_buffer(GPR_SLICE_START_PTR(slice), encoded_length);
- GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v0_LoadBalanceRequest_fields,
+ GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields,
request) != 0);
return slice;
}
@@ -109,7 +109,7 @@ grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response) {
GPR_SLICE_LENGTH(encoded_response));
grpc_grpclb_response *res = gpr_malloc(sizeof(grpc_grpclb_response));
memset(res, 0, sizeof(*res));
- status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res);
+ status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, res);
if (!status) {
grpc_grpclb_response_destroy(res);
return NULL;
@@ -132,7 +132,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
res->server_list.servers.funcs.decode = decode_serverlist;
res->server_list.servers.arg = &arg;
arg.first_pass = 1;
- status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res);
+ status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, res);
if (!status) {
grpc_grpclb_response_destroy(res);
return NULL;
@@ -140,7 +140,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
arg.first_pass = 0;
status =
- pb_decode(&stream_at_start, grpc_lb_v0_LoadBalanceResponse_fields, res);
+ pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, res);
if (!status) {
grpc_grpclb_response_destroy(res);
return NULL;
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
index 968f7d278a..71b5616d0c 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
@@ -37,7 +37,7 @@
#include <grpc/support/slice_buffer.h>
#include "src/core/ext/client_config/lb_policy_factory.h"
-#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h"
+#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
#ifdef __cplusplus
extern "C" {
@@ -45,10 +45,10 @@ extern "C" {
#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
-typedef grpc_lb_v0_LoadBalanceRequest grpc_grpclb_request;
-typedef grpc_lb_v0_LoadBalanceResponse grpc_grpclb_response;
-typedef grpc_lb_v0_Server grpc_grpclb_server;
-typedef grpc_lb_v0_Duration grpc_grpclb_duration;
+typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
+typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response;
+typedef grpc_lb_v1_Server grpc_grpclb_server;
+typedef grpc_lb_v1_Duration grpc_grpclb_duration;
typedef struct grpc_grpclb_serverlist {
grpc_grpclb_server **servers;
size_t num_servers;
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h
deleted file mode 100644
index 3599f881bb..0000000000
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-/* Automatically generated nanopb header */
-/* Generated by nanopb-0.3.5-dev */
-
-#ifndef PB_LOAD_BALANCER_PB_H_INCLUDED
-#define PB_LOAD_BALANCER_PB_H_INCLUDED
-#include "third_party/nanopb/pb.h"
-#if PB_PROTO_HEADER_VERSION != 30
-#error Regenerate this file with the current version of nanopb generator.
-#endif
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/* Struct definitions */
-typedef struct _grpc_lb_v0_ClientStats {
- bool has_total_requests;
- int64_t total_requests;
- bool has_client_rpc_errors;
- int64_t client_rpc_errors;
- bool has_dropped_requests;
- int64_t dropped_requests;
-} grpc_lb_v0_ClientStats;
-
-typedef struct _grpc_lb_v0_Duration {
- bool has_seconds;
- int64_t seconds;
- bool has_nanos;
- int32_t nanos;
-} grpc_lb_v0_Duration;
-
-typedef struct _grpc_lb_v0_InitialLoadBalanceRequest {
- bool has_name;
- char name[128];
-} grpc_lb_v0_InitialLoadBalanceRequest;
-
-typedef PB_BYTES_ARRAY_T(64) grpc_lb_v0_Server_load_balance_token_t;
-typedef struct _grpc_lb_v0_Server {
- bool has_ip_address;
- char ip_address[46];
- bool has_port;
- int32_t port;
- bool has_load_balance_token;
- grpc_lb_v0_Server_load_balance_token_t load_balance_token;
- bool has_drop_request;
- bool drop_request;
-} grpc_lb_v0_Server;
-
-typedef struct _grpc_lb_v0_InitialLoadBalanceResponse {
- bool has_client_config;
- char client_config[64];
- bool has_load_balancer_delegate;
- char load_balancer_delegate[64];
- bool has_client_stats_report_interval;
- grpc_lb_v0_Duration client_stats_report_interval;
-} grpc_lb_v0_InitialLoadBalanceResponse;
-
-typedef struct _grpc_lb_v0_LoadBalanceRequest {
- bool has_initial_request;
- grpc_lb_v0_InitialLoadBalanceRequest initial_request;
- bool has_client_stats;
- grpc_lb_v0_ClientStats client_stats;
-} grpc_lb_v0_LoadBalanceRequest;
-
-typedef struct _grpc_lb_v0_ServerList {
- pb_callback_t servers;
- bool has_expiration_interval;
- grpc_lb_v0_Duration expiration_interval;
-} grpc_lb_v0_ServerList;
-
-typedef struct _grpc_lb_v0_LoadBalanceResponse {
- bool has_initial_response;
- grpc_lb_v0_InitialLoadBalanceResponse initial_response;
- bool has_server_list;
- grpc_lb_v0_ServerList server_list;
-} grpc_lb_v0_LoadBalanceResponse;
-
-/* Default values for struct fields */
-
-/* Initializer values for message structs */
-#define grpc_lb_v0_Duration_init_default {false, 0, false, 0}
-#define grpc_lb_v0_LoadBalanceRequest_init_default {false, grpc_lb_v0_InitialLoadBalanceRequest_init_default, false, grpc_lb_v0_ClientStats_init_default}
-#define grpc_lb_v0_InitialLoadBalanceRequest_init_default {false, ""}
-#define grpc_lb_v0_ClientStats_init_default {false, 0, false, 0, false, 0}
-#define grpc_lb_v0_LoadBalanceResponse_init_default {false, grpc_lb_v0_InitialLoadBalanceResponse_init_default, false, grpc_lb_v0_ServerList_init_default}
-#define grpc_lb_v0_InitialLoadBalanceResponse_init_default {false, "", false, "", false, grpc_lb_v0_Duration_init_default}
-#define grpc_lb_v0_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v0_Duration_init_default}
-#define grpc_lb_v0_Server_init_default {false, "", false, 0, false, {0, {0}}, false, 0}
-#define grpc_lb_v0_Duration_init_zero {false, 0, false, 0}
-#define grpc_lb_v0_LoadBalanceRequest_init_zero {false, grpc_lb_v0_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v0_ClientStats_init_zero}
-#define grpc_lb_v0_InitialLoadBalanceRequest_init_zero {false, ""}
-#define grpc_lb_v0_ClientStats_init_zero {false, 0, false, 0, false, 0}
-#define grpc_lb_v0_LoadBalanceResponse_init_zero {false, grpc_lb_v0_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v0_ServerList_init_zero}
-#define grpc_lb_v0_InitialLoadBalanceResponse_init_zero {false, "", false, "", false, grpc_lb_v0_Duration_init_zero}
-#define grpc_lb_v0_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v0_Duration_init_zero}
-#define grpc_lb_v0_Server_init_zero {false, "", false, 0, false, {0, {0}}, false, 0}
-
-/* Field tags (for use in manual encoding/decoding) */
-#define grpc_lb_v0_ClientStats_total_requests_tag 1
-#define grpc_lb_v0_ClientStats_client_rpc_errors_tag 2
-#define grpc_lb_v0_ClientStats_dropped_requests_tag 3
-#define grpc_lb_v0_Duration_seconds_tag 1
-#define grpc_lb_v0_Duration_nanos_tag 2
-#define grpc_lb_v0_InitialLoadBalanceRequest_name_tag 1
-#define grpc_lb_v0_Server_ip_address_tag 1
-#define grpc_lb_v0_Server_port_tag 2
-#define grpc_lb_v0_Server_load_balance_token_tag 3
-#define grpc_lb_v0_Server_drop_request_tag 4
-#define grpc_lb_v0_InitialLoadBalanceResponse_client_config_tag 1
-#define grpc_lb_v0_InitialLoadBalanceResponse_load_balancer_delegate_tag 2
-#define grpc_lb_v0_InitialLoadBalanceResponse_client_stats_report_interval_tag 3
-#define grpc_lb_v0_LoadBalanceRequest_initial_request_tag 1
-#define grpc_lb_v0_LoadBalanceRequest_client_stats_tag 2
-#define grpc_lb_v0_ServerList_servers_tag 1
-#define grpc_lb_v0_ServerList_expiration_interval_tag 3
-#define grpc_lb_v0_LoadBalanceResponse_initial_response_tag 1
-#define grpc_lb_v0_LoadBalanceResponse_server_list_tag 2
-
-/* Struct field encoding specification for nanopb */
-extern const pb_field_t grpc_lb_v0_Duration_fields[3];
-extern const pb_field_t grpc_lb_v0_LoadBalanceRequest_fields[3];
-extern const pb_field_t grpc_lb_v0_InitialLoadBalanceRequest_fields[2];
-extern const pb_field_t grpc_lb_v0_ClientStats_fields[4];
-extern const pb_field_t grpc_lb_v0_LoadBalanceResponse_fields[3];
-extern const pb_field_t grpc_lb_v0_InitialLoadBalanceResponse_fields[4];
-extern const pb_field_t grpc_lb_v0_ServerList_fields[3];
-extern const pb_field_t grpc_lb_v0_Server_fields[5];
-
-/* Maximum encoded size of messages (where known) */
-#define grpc_lb_v0_Duration_size 22
-#define grpc_lb_v0_LoadBalanceRequest_size 169
-#define grpc_lb_v0_InitialLoadBalanceRequest_size 131
-#define grpc_lb_v0_ClientStats_size 33
-#define grpc_lb_v0_LoadBalanceResponse_size (165 + grpc_lb_v0_ServerList_size)
-#define grpc_lb_v0_InitialLoadBalanceResponse_size 156
-#define grpc_lb_v0_Server_size 127
-
-/* Message IDs (where set with "msgid" option) */
-#ifdef PB_MSGID
-
-#define LOAD_BALANCER_MESSAGES \
-
-
-#endif
-
-#ifdef __cplusplus
-} /* extern "C" */
-#endif
-
-#endif
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index 9719673181..52e11c40bb 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -33,7 +33,7 @@
/* Automatically generated nanopb constant definitions */
/* Generated by nanopb-0.3.5-dev */
-#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h"
+#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
@@ -41,54 +41,53 @@
-const pb_field_t grpc_lb_v0_Duration_fields[3] = {
- PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v0_Duration, seconds, seconds, 0),
- PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Duration, nanos, seconds, 0),
+const pb_field_t grpc_lb_v1_Duration_fields[3] = {
+ PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Duration, seconds, seconds, 0),
+ PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Duration, nanos, seconds, 0),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_LoadBalanceRequest_fields[3] = {
- PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v0_LoadBalanceRequest, initial_request, initial_request, &grpc_lb_v0_InitialLoadBalanceRequest_fields),
- PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_LoadBalanceRequest, client_stats, initial_request, &grpc_lb_v0_ClientStats_fields),
+const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3] = {
+ PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_LoadBalanceRequest, initial_request, initial_request, &grpc_lb_v1_InitialLoadBalanceRequest_fields),
+ PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_LoadBalanceRequest, client_stats, initial_request, &grpc_lb_v1_ClientStats_fields),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_InitialLoadBalanceRequest_fields[2] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_InitialLoadBalanceRequest, name, name, 0),
+const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = {
+ PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceRequest, name, name, 0),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_ClientStats_fields[4] = {
- PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v0_ClientStats, total_requests, total_requests, 0),
- PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ClientStats, client_rpc_errors, total_requests, 0),
- PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ClientStats, dropped_requests, client_rpc_errors, 0),
+const pb_field_t grpc_lb_v1_ClientStats_fields[4] = {
+ PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, total_requests, total_requests, 0),
+ PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, client_rpc_errors, total_requests, 0),
+ PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, dropped_requests, client_rpc_errors, 0),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_LoadBalanceResponse_fields[3] = {
- PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v0_LoadBalanceResponse, initial_response, initial_response, &grpc_lb_v0_InitialLoadBalanceResponse_fields),
- PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_LoadBalanceResponse, server_list, initial_response, &grpc_lb_v0_ServerList_fields),
+const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3] = {
+ PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_LoadBalanceResponse, initial_response, initial_response, &grpc_lb_v1_InitialLoadBalanceResponse_fields),
+ PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_LoadBalanceResponse, server_list, initial_response, &grpc_lb_v1_ServerList_fields),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_InitialLoadBalanceResponse_fields[4] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_InitialLoadBalanceResponse, client_config, client_config, 0),
- PB_FIELD( 2, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v0_InitialLoadBalanceResponse, load_balancer_delegate, client_config, 0),
- PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v0_Duration_fields),
+const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = {
+ PB_FIELD( 2, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
+ PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_ServerList_fields[3] = {
- PB_FIELD( 1, MESSAGE , REPEATED, CALLBACK, FIRST, grpc_lb_v0_ServerList, servers, servers, &grpc_lb_v0_Server_fields),
- PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ServerList, expiration_interval, servers, &grpc_lb_v0_Duration_fields),
+const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
+ PB_FIELD( 1, MESSAGE , REPEATED, CALLBACK, FIRST, grpc_lb_v1_ServerList, servers, servers, &grpc_lb_v1_Server_fields),
+ PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ServerList, expiration_interval, servers, &grpc_lb_v1_Duration_fields),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_Server_fields[5] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_Server, ip_address, ip_address, 0),
- PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, port, ip_address, 0),
- PB_FIELD( 3, BYTES , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, load_balance_token, port, 0),
- PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, drop_request, load_balance_token, 0),
+const pb_field_t grpc_lb_v1_Server_fields[5] = {
+ PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
+ PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
+ PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
+ PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0),
PB_LAST_FIELD
};
@@ -102,7 +101,7 @@ const pb_field_t grpc_lb_v0_Server_fields[5] = {
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v0_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v0_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v0_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v0_Duration_grpc_lb_v0_LoadBalanceRequest_grpc_lb_v0_InitialLoadBalanceRequest_grpc_lb_v0_ClientStats_grpc_lb_v0_LoadBalanceResponse_grpc_lb_v0_InitialLoadBalanceResponse_grpc_lb_v0_ServerList_grpc_lb_v0_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@@ -113,7 +112,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v0_LoadBalanceRequest, initial_request)
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v0_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v0_ServerList, servers) < 256 && pb_membersize(grpc_lb_v0_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v0_Duration_grpc_lb_v0_LoadBalanceRequest_grpc_lb_v0_InitialLoadBalanceRequest_grpc_lb_v0_ClientStats_grpc_lb_v0_LoadBalanceResponse_grpc_lb_v0_InitialLoadBalanceResponse_grpc_lb_v0_ServerList_grpc_lb_v0_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
new file mode 100644
index 0000000000..46fe588f72
--- /dev/null
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -0,0 +1,178 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+/* Automatically generated nanopb header */
+/* Generated by nanopb-0.3.5-dev */
+
+#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
+#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_PROTO_GRPC_LB_V1_LOAD_BALANCER_PB_H
+#include "third_party/nanopb/pb.h"
+#if PB_PROTO_HEADER_VERSION != 30
+#error Regenerate this file with the current version of nanopb generator.
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Struct definitions */
+typedef struct _grpc_lb_v1_ClientStats {
+ bool has_total_requests;
+ int64_t total_requests;
+ bool has_client_rpc_errors;
+ int64_t client_rpc_errors;
+ bool has_dropped_requests;
+ int64_t dropped_requests;
+} grpc_lb_v1_ClientStats;
+
+typedef struct _grpc_lb_v1_Duration {
+ bool has_seconds;
+ int64_t seconds;
+ bool has_nanos;
+ int32_t nanos;
+} grpc_lb_v1_Duration;
+
+typedef struct _grpc_lb_v1_InitialLoadBalanceRequest {
+ bool has_name;
+ char name[128];
+} grpc_lb_v1_InitialLoadBalanceRequest;
+
+typedef struct _grpc_lb_v1_Server {
+ bool has_ip_address;
+ char ip_address[46];
+ bool has_port;
+ int32_t port;
+ bool has_load_balance_token;
+ char load_balance_token[64];
+ bool has_drop_request;
+ bool drop_request;
+} grpc_lb_v1_Server;
+
+typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
+ bool has_load_balancer_delegate;
+ char load_balancer_delegate[64];
+ bool has_client_stats_report_interval;
+ grpc_lb_v1_Duration client_stats_report_interval;
+} grpc_lb_v1_InitialLoadBalanceResponse;
+
+typedef struct _grpc_lb_v1_LoadBalanceRequest {
+ bool has_initial_request;
+ grpc_lb_v1_InitialLoadBalanceRequest initial_request;
+ bool has_client_stats;
+ grpc_lb_v1_ClientStats client_stats;
+} grpc_lb_v1_LoadBalanceRequest;
+
+typedef struct _grpc_lb_v1_ServerList {
+ pb_callback_t servers;
+ bool has_expiration_interval;
+ grpc_lb_v1_Duration expiration_interval;
+} grpc_lb_v1_ServerList;
+
+typedef struct _grpc_lb_v1_LoadBalanceResponse {
+ bool has_initial_response;
+ grpc_lb_v1_InitialLoadBalanceResponse initial_response;
+ bool has_server_list;
+ grpc_lb_v1_ServerList server_list;
+} grpc_lb_v1_LoadBalanceResponse;
+
+/* Default values for struct fields */
+
+/* Initializer values for message structs */
+#define grpc_lb_v1_Duration_init_default {false, 0, false, 0}
+#define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default}
+#define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""}
+#define grpc_lb_v1_ClientStats_init_default {false, 0, false, 0, false, 0}
+#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
+#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
+#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
+#define grpc_lb_v1_Server_init_default {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
+#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
+#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
+#define grpc_lb_v1_ClientStats_init_zero {false, 0, false, 0, false, 0}
+#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
+#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
+#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
+#define grpc_lb_v1_Server_init_zero {false, "", false, 0, false, "", false, 0}
+
+/* Field tags (for use in manual encoding/decoding) */
+#define grpc_lb_v1_ClientStats_total_requests_tag 1
+#define grpc_lb_v1_ClientStats_client_rpc_errors_tag 2
+#define grpc_lb_v1_ClientStats_dropped_requests_tag 3
+#define grpc_lb_v1_Duration_seconds_tag 1
+#define grpc_lb_v1_Duration_nanos_tag 2
+#define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1
+#define grpc_lb_v1_Server_ip_address_tag 1
+#define grpc_lb_v1_Server_port_tag 2
+#define grpc_lb_v1_Server_load_balance_token_tag 3
+#define grpc_lb_v1_Server_drop_request_tag 4
+#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 2
+#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 3
+#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
+#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
+#define grpc_lb_v1_ServerList_servers_tag 1
+#define grpc_lb_v1_ServerList_expiration_interval_tag 3
+#define grpc_lb_v1_LoadBalanceResponse_initial_response_tag 1
+#define grpc_lb_v1_LoadBalanceResponse_server_list_tag 2
+
+/* Struct field encoding specification for nanopb */
+extern const pb_field_t grpc_lb_v1_Duration_fields[3];
+extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3];
+extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2];
+extern const pb_field_t grpc_lb_v1_ClientStats_fields[4];
+extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
+extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
+extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
+extern const pb_field_t grpc_lb_v1_Server_fields[5];
+
+/* Maximum encoded size of messages (where known) */
+#define grpc_lb_v1_Duration_size 22
+#define grpc_lb_v1_LoadBalanceRequest_size 169
+#define grpc_lb_v1_InitialLoadBalanceRequest_size 131
+#define grpc_lb_v1_ClientStats_size 33
+#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
+#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
+#define grpc_lb_v1_Server_size 127
+
+/* Message IDs (where set with "msgid" option) */
+#ifdef PB_MSGID
+
+#define LOAD_BALANCER_MESSAGES \
+
+
+#endif
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+
+#endif
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 0d215cd196..9decf70692 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -39,7 +39,7 @@
typedef struct pending_pick {
struct pending_pick *next;
- grpc_pollset *pollset;
+ grpc_polling_entity *pollent;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
@@ -103,8 +103,9 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE("Channel shutdown"), "shutdown");
/* cancel subscription */
if (selected != NULL) {
grpc_connected_subchannel_notify_on_state_change(
@@ -118,9 +119,9 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
pp = next;
}
@@ -136,10 +137,11 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -162,9 +164,10 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -196,7 +199,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ grpc_polling_entity *pollent,
+ grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
@@ -221,10 +225,11 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollset = pollset;
+ pp->pollent = pollent;
pp->target = target;
pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
@@ -235,7 +240,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
pick_first_lb_policy *p = arg;
size_t i;
size_t num_subchannels = p->num_subchannels;
@@ -256,12 +261,14 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
}
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
grpc_connected_subchannel *selected;
+ GRPC_ERROR_REF(error);
+
gpr_mu_lock(&p->mu);
selected = GET_SELECTED(p);
@@ -269,15 +276,17 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (p->shutdown) {
gpr_mu_unlock(&p->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
+ GRPC_ERROR_UNREF(error);
return;
} else if (selected != NULL) {
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* if the selected channel goes bad, we're done */
- p->checking_connectivity = GRPC_CHANNEL_FATAL_FAILURE;
+ p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN;
}
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- p->checking_connectivity, "selected_changed");
- if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
+ p->checking_connectivity, GRPC_ERROR_REF(error),
+ "selected_changed");
+ if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) {
grpc_connected_subchannel_notify_on_state_change(
exec_ctx, selected, p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
@@ -289,7 +298,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
switch (p->checking_connectivity) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, "connecting_ready");
+ GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
+ "connecting_ready");
selected_subchannel = p->subchannels[p->checking_subchannel];
selected =
grpc_subchannel_get_connected_subchannel(selected_subchannel);
@@ -298,15 +308,16 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
- grpc_exec_ctx_enqueue(
- exec_ctx, grpc_closure_create(destroy_subchannels, p), true, NULL);
+ grpc_exec_ctx_sched(exec_ctx,
+ grpc_closure_create(destroy_subchannels, p),
+ GRPC_ERROR_NONE, NULL);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
@@ -318,12 +329,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
(p->checking_subchannel + 1) % p->num_subchannels;
if (p->checking_subchannel == 0) {
/* only trigger transient failure when we've tried all alternatives */
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "connecting_transient_failure");
}
+ GRPC_ERROR_UNREF(error);
p->checking_connectivity = grpc_subchannel_check_connectivity(
- p->subchannels[p->checking_subchannel]);
+ p->subchannels[p->checking_subchannel], &error);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
@@ -335,53 +347,60 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_CONNECTING,
- "connecting_changed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_CONNECTING,
+ GRPC_ERROR_REF(error), "connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
p->base.interested_parties, &p->checking_connectivity,
&p->connectivity_changed);
break;
- case GRPC_CHANNEL_FATAL_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN:
p->num_subchannels--;
GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
p->subchannels[p->num_subchannels]);
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
"pick_first");
if (p->num_subchannels == 0) {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE,
- "no_more_channels");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_REFERENCING("Pick first exhausted channels",
+ &error, 1),
+ "no_more_channels");
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
+ NULL);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
"pick_first_connectivity");
} else {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "subchannel_failed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "subchannel_failed");
p->checking_subchannel %= p->num_subchannels;
+ GRPC_ERROR_UNREF(error);
p->checking_connectivity = grpc_subchannel_check_connectivity(
- p->subchannels[p->checking_subchannel]);
+ p->subchannels[p->checking_subchannel], &error);
goto loop;
}
}
}
gpr_mu_unlock(&p->mu);
+
+ GRPC_ERROR_UNREF(error);
}
static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol) {
+ grpc_lb_policy *pol,
+ grpc_error **error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker);
+ st = grpc_connectivity_state_check(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
}
@@ -404,7 +423,8 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (selected) {
grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"),
+ NULL);
}
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index dcdc0c6285..7bcf608ab9 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -31,6 +31,34 @@
*
*/
+/** Round Robin Policy.
+ *
+ * This policy keeps:
+ * - A circular list of ready (connected) subchannels, the *readylist*. An empty
+ * readylist consists solely of its root (dummy) node.
+ * - A pointer to the last element picked from the readylist, the *lastpick*.
+ * Initially set to point to the readylist's root.
+ *
+ * Behavior:
+ * - When a subchannel connects, it's *prepended* to the readylist's root node.
+ * Ie, if readylist = A <-> B <-> ROOT <-> C
+ * ^ ^
+ * |____________________|
+ * and subchannel D becomes connected, the addition of D to the readylist
+ * results in readylist = A <-> B <-> D <-> ROOT <-> C
+ * ^ ^
+ * |__________________________|
+ * - When a subchannel disconnects, it's removed from the readylist. If the
+ * subchannel being removed was the most recently picked, the *lastpick*
+ * pointer moves to the removed node's previous element. Note that if the
+ * readylist only had one element, this is still legal, as the lastpick would
+ * point to the dummy root node, for an empty readylist.
+ * - Upon picking, *lastpick* is updated to point to the returned (connected)
+ * subchannel. Note that it's possible that the selected subchannel becomes
+ * disconnected in the interim between the selection and the actual usage of
+ * the subchannel by the caller.
+ */
+
#include <string.h>
#include <grpc/support/alloc.h>
@@ -48,7 +76,7 @@ int grpc_lb_round_robin_trace = 0;
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
- grpc_pollset *pollset;
+ grpc_polling_entity *pollent;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
@@ -173,9 +201,7 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
return;
}
if (node == p->ready_list_last_pick) {
- /* If removing the lastly picked node, reset the last pick pointer to the
- * dummy root of the list */
- p->ready_list_last_pick = &p->ready_list;
+ p->ready_list_last_pick = p->ready_list_last_pick->prev;
}
/* removing last item */
@@ -239,11 +265,13 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
+ GRPC_ERROR_CREATE("Channel Shutdown"), NULL);
gpr_free(pp);
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE("Channel Shutdown"), "shutdown");
for (i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL,
@@ -262,10 +290,11 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
+ NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -288,10 +317,11 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
+ NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -307,7 +337,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
p->started_picking = 1;
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p,
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%" PRIuPTR, p,
p->num_subchannels);
}
@@ -331,7 +361,8 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ grpc_polling_entity *pollent,
+ grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
@@ -344,8 +375,8 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
*target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
- "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
- selected->subchannel, selected);
+ "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)", *target,
+ selected);
}
/* only advance the last picked pointer if the selection was used */
advance_last_picked_locked(p);
@@ -354,10 +385,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollset = pollset;
+ pp->pollent = pollent;
pp->target = target;
pp->on_complete = on_complete;
pp->initial_metadata_flags = initial_metadata_flags;
@@ -368,7 +400,7 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- bool iomgr_success) {
+ grpc_error *error) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
pending_pick *pp;
@@ -376,6 +408,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int unref = 0;
+ GRPC_ERROR_REF(error);
gpr_mu_lock(&p->mu);
if (p->shutdown) {
@@ -384,7 +417,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
switch (sd->connectivity_state) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_READY, "connecting_ready");
+ GRPC_CHANNEL_READY, GRPC_ERROR_REF(error),
+ "connecting_ready");
/* add the newly connected subchannel to the list of connected ones.
* Note that it goes to the "end of the line". */
sd->ready_list_node = add_connected_sc_locked(p, sd->subchannel);
@@ -406,9 +440,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
@@ -417,9 +451,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
break;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- sd->connectivity_state,
- "connecting_changed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, sd->connectivity_state,
+ GRPC_ERROR_REF(error), "connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, sd->subchannel, p->base.interested_parties,
&sd->connectivity_state, &sd->connectivity_changed_closure);
@@ -435,11 +469,11 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
remove_disconnected_sc_locked(p, sd->ready_list_node);
sd->ready_list_node = NULL;
}
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "connecting_transient_failure");
break;
- case GRPC_CHANNEL_FATAL_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN:
if (sd->ready_list_node != NULL) {
remove_disconnected_sc_locked(p, sd->ready_list_node);
sd->ready_list_node = NULL;
@@ -454,19 +488,22 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
unref = 1;
if (p->num_subchannels == 0) {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE,
- "no_more_channels");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN,
+ GRPC_ERROR_CREATE_REFERENCING("Round Robin Channels Exhausted",
+ &error, 1),
+ "no_more_channels");
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE,
+ NULL);
gpr_free(pp);
}
} else {
- grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
- GRPC_CHANNEL_TRANSIENT_FAILURE,
- "subchannel_failed");
+ grpc_connectivity_state_set(
+ exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_ERROR_REF(error), "subchannel_failed");
}
} /* switch */
} /* !unref */
@@ -476,14 +513,17 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (unref) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "round_robin_connectivity");
}
+
+ GRPC_ERROR_UNREF(error);
}
static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
- grpc_lb_policy *pol) {
+ grpc_lb_policy *pol,
+ grpc_error **error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
- st = grpc_connectivity_state_check(&p->state_tracker);
+ st = grpc_connectivity_state_check(&p->state_tracker, error);
gpr_mu_unlock(&p->mu);
return st;
}
@@ -511,7 +551,8 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel_ping(exec_ctx, target, closure);
} else {
gpr_mu_unlock(&p->mu);
- grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure,
+ GRPC_ERROR_CREATE("Round Robin not connected"), NULL);
}
}
@@ -524,7 +565,7 @@ static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
-static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
+static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
@@ -580,7 +621,7 @@ static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
}
static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = {
- round_robin_factory_ref, round_robin_factory_unref, create_round_robin,
+ round_robin_factory_ref, round_robin_factory_unref, round_robin_create,
"round_robin"};
static grpc_lb_policy_factory round_robin_lb_policy_factory = {