diff options
author | 2018-10-17 08:52:17 -0700 | |
---|---|---|
committer | 2018-10-17 08:52:17 -0700 | |
commit | c68489805557253146aee59459103ce269ede9fe (patch) | |
tree | 1e4fea842d7eab2913e50b0991b212061f3f1e4a | |
parent | 9e0c210f7c76d7f2fbee47bff0594c12f6d86a4a (diff) | |
parent | 2c2090899a3c8724ee4cd27a22c9d8fd7f0847e7 (diff) |
Merge remote-tracking branch 'upstream/master' into health_checking_service
24 files changed, 3162 insertions, 547 deletions
@@ -1266,6 +1266,73 @@ grpc_cc_library( ) grpc_cc_library( + name = "grpc_lb_policy_xds", + srcs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.cc", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", + ], + hdrs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h", + "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h", + ], + external_deps = [ + "nanopb", + ], + language = "c++", + deps = [ + "grpc_base", + "grpc_client_channel", + "grpc_resolver_fake", + ], +) + +grpc_cc_library( + name = "grpc_lb_policy_xds_secure", + srcs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.cc", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", + ], + hdrs = [ + "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h", + "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h", + ], + external_deps = [ + "nanopb", + ], + language = "c++", + deps = [ + "grpc_base", + "grpc_client_channel", + "grpc_resolver_fake", + "grpc_secure", + ], +) + +grpc_cc_library( name = "grpc_lb_subchannel_list", hdrs = [ "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h", diff --git a/build.yaml b/build.yaml index b8d0ca9313..31474ac90b 100644 --- a/build.yaml +++ b/build.yaml @@ -715,6 +715,57 @@ filegroups: - grpc_base - grpc_client_channel - grpc_lb_subchannel_list +- name: grpc_lb_policy_xds + headers: + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h + - src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h + - src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h + - src/core/ext/filters/client_channel/lb_policy/xds/xds.h + - src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h + - src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h + src: + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c + - src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.cc + - src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc + plugin: grpc_lb_policy_xds + uses: + - grpc_base + - grpc_client_channel + - nanopb + - grpc_resolver_fake +- name: grpc_lb_policy_xds_secure + headers: + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h + - src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h + - src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h + - src/core/ext/filters/client_channel/lb_policy/xds/xds.h + - src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h + - src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h + src: + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c + - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c + - src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.cc + - src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc + - src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc + plugin: grpc_lb_policy_xds + uses: + - grpc_base + - grpc_secure + - grpc_client_channel + - nanopb + - grpc_resolver_fake - name: grpc_lb_subchannel_list headers: - src/core/ext/filters/client_channel/lb_policy/subchannel_list.h diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.cc new file mode 100644 index 0000000000..d79453d008 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.cc @@ -0,0 +1,140 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h" + +#include <grpc/support/atm.h> +#include <grpc/support/log.h> + +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/profiling/timers.h" + +static grpc_error* init_channel_elem(grpc_channel_element* elem, + grpc_channel_element_args* args) { + return GRPC_ERROR_NONE; +} + +static void destroy_channel_elem(grpc_channel_element* elem) {} + +namespace { + +struct call_data { + // Stats object to update. + grpc_core::RefCountedPtr<grpc_core::XdsLbClientStats> client_stats; + // State for intercepting send_initial_metadata. + grpc_closure on_complete_for_send; + grpc_closure* original_on_complete_for_send; + bool send_initial_metadata_succeeded; + // State for intercepting recv_initial_metadata. + grpc_closure recv_initial_metadata_ready; + grpc_closure* original_recv_initial_metadata_ready; + bool recv_initial_metadata_succeeded; +}; + +} // namespace + +static void on_complete_for_send(void* arg, grpc_error* error) { + call_data* calld = static_cast<call_data*>(arg); + if (error == GRPC_ERROR_NONE) { + calld->send_initial_metadata_succeeded = true; + } + GRPC_CLOSURE_RUN(calld->original_on_complete_for_send, GRPC_ERROR_REF(error)); +} + +static void recv_initial_metadata_ready(void* arg, grpc_error* error) { + call_data* calld = static_cast<call_data*>(arg); + if (error == GRPC_ERROR_NONE) { + calld->recv_initial_metadata_succeeded = true; + } + GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); +} + +static grpc_error* init_call_elem(grpc_call_element* elem, + const grpc_call_element_args* args) { + call_data* calld = static_cast<call_data*>(elem->call_data); + // Get stats object from context and take a ref. + GPR_ASSERT(args->context != nullptr); + if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) { + calld->client_stats = static_cast<grpc_core::XdsLbClientStats*>( + args->context[GRPC_GRPCLB_CLIENT_STATS].value) + ->Ref(); + // Record call started. + calld->client_stats->AddCallStarted(); + } + return GRPC_ERROR_NONE; +} + +static void destroy_call_elem(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* ignored) { + call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->client_stats != nullptr) { + // Record call finished, optionally setting client_failed_to_send and + // received. + calld->client_stats->AddCallFinished( + !calld->send_initial_metadata_succeeded /* client_failed_to_send */, + calld->recv_initial_metadata_succeeded /* known_received */); + // All done, so unref the stats object. + // TODO(roth): Eliminate this once filter stack is converted to C++. + calld->client_stats.reset(); + } +} + +static void start_transport_stream_op_batch( + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = static_cast<call_data*>(elem->call_data); + GPR_TIMER_SCOPE("clr_start_transport_stream_op_batch", 0); + if (calld->client_stats != nullptr) { + // Intercept send_initial_metadata. + if (batch->send_initial_metadata) { + calld->original_on_complete_for_send = batch->on_complete; + GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, + calld, grpc_schedule_on_exec_ctx); + batch->on_complete = &calld->on_complete_for_send; + } + // Intercept recv_initial_metadata. + if (batch->recv_initial_metadata) { + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, calld, + grpc_schedule_on_exec_ctx); + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; + } + } + // Chain to next filter. + grpc_call_next_op(elem, batch); +} + +const grpc_channel_filter xds_client_load_reporting_filter = { + start_transport_stream_op_batch, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + 0, // sizeof(channel_data) + init_channel_elem, + destroy_channel_elem, + grpc_channel_next_get_info, + "client_load_reporting"}; diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h b/src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h new file mode 100644 index 0000000000..31a799154c --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h @@ -0,0 +1,29 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_CLIENT_LOAD_REPORTING_FILTER_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_CLIENT_LOAD_REPORTING_FILTER_H + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter xds_client_load_reporting_filter; + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_CLIENT_LOAD_REPORTING_FILTER_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.cc b/src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.cc new file mode 100644 index 0000000000..3bd2653635 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.cc @@ -0,0 +1,307 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "pb_decode.h" +#include "pb_encode.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h" + +#include <grpc/support/alloc.h> + +/* invoked once for every Server in ServerList */ +static bool count_serverlist(pb_istream_t* stream, const pb_field_t* field, + void** arg) { + xds_grpclb_serverlist* sl = static_cast<xds_grpclb_serverlist*>(*arg); + xds_grpclb_server server; + if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, &server))) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); + return false; + } + ++sl->num_servers; + return true; +} + +typedef struct decode_serverlist_arg { + /* The decoding callback is invoked once per server in serverlist. Remember + * which index of the serverlist are we currently decoding */ + size_t decoding_idx; + /* The decoded serverlist */ + xds_grpclb_serverlist* serverlist; +} decode_serverlist_arg; + +/* invoked once for every Server in ServerList */ +static bool decode_serverlist(pb_istream_t* stream, const pb_field_t* field, + void** arg) { + decode_serverlist_arg* dec_arg = static_cast<decode_serverlist_arg*>(*arg); + GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx); + xds_grpclb_server* server = + static_cast<xds_grpclb_server*>(gpr_zalloc(sizeof(xds_grpclb_server))); + if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, server))) { + gpr_free(server); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); + return false; + } + dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server; + return true; +} + +xds_grpclb_request* xds_grpclb_request_create(const char* lb_service_name) { + xds_grpclb_request* req = + static_cast<xds_grpclb_request*>(gpr_malloc(sizeof(xds_grpclb_request))); + req->has_client_stats = false; + req->has_initial_request = true; + req->initial_request.has_name = true; + strncpy(req->initial_request.name, lb_service_name, + XDS_SERVICE_NAME_MAX_LENGTH); + return req; +} + +static void populate_timestamp(gpr_timespec timestamp, + xds_grpclb_timestamp* timestamp_pb) { + timestamp_pb->has_seconds = true; + timestamp_pb->seconds = timestamp.tv_sec; + timestamp_pb->has_nanos = true; + timestamp_pb->nanos = timestamp.tv_nsec; +} + +static bool encode_string(pb_ostream_t* stream, const pb_field_t* field, + void* const* arg) { + char* str = static_cast<char*>(*arg); + if (!pb_encode_tag_for_field(stream, field)) return false; + return pb_encode_string(stream, reinterpret_cast<uint8_t*>(str), strlen(str)); +} + +static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field, + void* const* arg) { + grpc_core::XdsLbClientStats::DroppedCallCounts* drop_entries = + static_cast<grpc_core::XdsLbClientStats::DroppedCallCounts*>(*arg); + if (drop_entries == nullptr) return true; + for (size_t i = 0; i < drop_entries->size(); ++i) { + if (!pb_encode_tag_for_field(stream, field)) return false; + grpc_lb_v1_ClientStatsPerToken drop_message; + drop_message.load_balance_token.funcs.encode = encode_string; + drop_message.load_balance_token.arg = (*drop_entries)[i].token.get(); + drop_message.has_num_calls = true; + drop_message.num_calls = (*drop_entries)[i].count; + if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields, + &drop_message)) { + return false; + } + } + return true; +} + +xds_grpclb_request* xds_grpclb_load_report_request_create_locked( + grpc_core::XdsLbClientStats* client_stats) { + xds_grpclb_request* req = + static_cast<xds_grpclb_request*>(gpr_zalloc(sizeof(xds_grpclb_request))); + req->has_client_stats = true; + req->client_stats.has_timestamp = true; + populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); + req->client_stats.has_num_calls_started = true; + req->client_stats.has_num_calls_finished = true; + req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; + req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; + req->client_stats.has_num_calls_finished_known_received = true; + req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops; + grpc_core::UniquePtr<grpc_core::XdsLbClientStats::DroppedCallCounts> + drop_counts; + client_stats->GetLocked( + &req->client_stats.num_calls_started, + &req->client_stats.num_calls_finished, + &req->client_stats.num_calls_finished_with_client_failed_to_send, + &req->client_stats.num_calls_finished_known_received, &drop_counts); + // Will be deleted in xds_grpclb_request_destroy(). + req->client_stats.calls_finished_with_drop.arg = drop_counts.release(); + return req; +} + +grpc_slice xds_grpclb_request_encode(const xds_grpclb_request* request) { + size_t encoded_length; + pb_ostream_t sizestream; + pb_ostream_t outputstream; + grpc_slice slice; + memset(&sizestream, 0, sizeof(pb_ostream_t)); + pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request); + encoded_length = sizestream.bytes_written; + + slice = GRPC_SLICE_MALLOC(encoded_length); + outputstream = + pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length); + GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields, + request) != 0); + return slice; +} + +void xds_grpclb_request_destroy(xds_grpclb_request* request) { + if (request->has_client_stats) { + grpc_core::XdsLbClientStats::DroppedCallCounts* drop_entries = + static_cast<grpc_core::XdsLbClientStats::DroppedCallCounts*>( + request->client_stats.calls_finished_with_drop.arg); + grpc_core::Delete(drop_entries); + } + gpr_free(request); +} + +typedef grpc_lb_v1_LoadBalanceResponse xds_grpclb_response; +xds_grpclb_initial_response* xds_grpclb_initial_response_parse( + grpc_slice encoded_xds_grpclb_response) { + pb_istream_t stream = + pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_xds_grpclb_response), + GRPC_SLICE_LENGTH(encoded_xds_grpclb_response)); + xds_grpclb_response res; + memset(&res, 0, sizeof(xds_grpclb_response)); + if (GPR_UNLIKELY( + !pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res))) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); + return nullptr; + } + + if (!res.has_initial_response) return nullptr; + + xds_grpclb_initial_response* initial_res = + static_cast<xds_grpclb_initial_response*>( + gpr_malloc(sizeof(xds_grpclb_initial_response))); + memcpy(initial_res, &res.initial_response, + sizeof(xds_grpclb_initial_response)); + + return initial_res; +} + +xds_grpclb_serverlist* xds_grpclb_response_parse_serverlist( + grpc_slice encoded_xds_grpclb_response) { + pb_istream_t stream = + pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_xds_grpclb_response), + GRPC_SLICE_LENGTH(encoded_xds_grpclb_response)); + pb_istream_t stream_at_start = stream; + xds_grpclb_serverlist* sl = static_cast<xds_grpclb_serverlist*>( + gpr_zalloc(sizeof(xds_grpclb_serverlist))); + xds_grpclb_response res; + memset(&res, 0, sizeof(xds_grpclb_response)); + // First pass: count number of servers. + res.server_list.servers.funcs.decode = count_serverlist; + res.server_list.servers.arg = sl; + bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); + if (GPR_UNLIKELY(!status)) { + gpr_free(sl); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); + return nullptr; + } + // Second pass: populate servers. + if (sl->num_servers > 0) { + sl->servers = static_cast<xds_grpclb_server**>( + gpr_zalloc(sizeof(xds_grpclb_server*) * sl->num_servers)); + decode_serverlist_arg decode_arg; + memset(&decode_arg, 0, sizeof(decode_arg)); + decode_arg.serverlist = sl; + res.server_list.servers.funcs.decode = decode_serverlist; + res.server_list.servers.arg = &decode_arg; + status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, + &res); + if (GPR_UNLIKELY(!status)) { + xds_grpclb_destroy_serverlist(sl); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); + return nullptr; + } + } + return sl; +} + +void xds_grpclb_destroy_serverlist(xds_grpclb_serverlist* serverlist) { + if (serverlist == nullptr) { + return; + } + for (size_t i = 0; i < serverlist->num_servers; i++) { + gpr_free(serverlist->servers[i]); + } + gpr_free(serverlist->servers); + gpr_free(serverlist); +} + +xds_grpclb_serverlist* xds_grpclb_serverlist_copy( + const xds_grpclb_serverlist* sl) { + xds_grpclb_serverlist* copy = static_cast<xds_grpclb_serverlist*>( + gpr_zalloc(sizeof(xds_grpclb_serverlist))); + copy->num_servers = sl->num_servers; + copy->servers = static_cast<xds_grpclb_server**>( + gpr_malloc(sizeof(xds_grpclb_server*) * sl->num_servers)); + for (size_t i = 0; i < sl->num_servers; i++) { + copy->servers[i] = + static_cast<xds_grpclb_server*>(gpr_malloc(sizeof(xds_grpclb_server))); + memcpy(copy->servers[i], sl->servers[i], sizeof(xds_grpclb_server)); + } + return copy; +} + +bool xds_grpclb_serverlist_equals(const xds_grpclb_serverlist* lhs, + const xds_grpclb_serverlist* rhs) { + if (lhs == nullptr || rhs == nullptr) { + return false; + } + if (lhs->num_servers != rhs->num_servers) { + return false; + } + for (size_t i = 0; i < lhs->num_servers; i++) { + if (!xds_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) { + return false; + } + } + return true; +} + +bool xds_grpclb_server_equals(const xds_grpclb_server* lhs, + const xds_grpclb_server* rhs) { + return memcmp(lhs, rhs, sizeof(xds_grpclb_server)) == 0; +} + +int xds_grpclb_duration_compare(const xds_grpclb_duration* lhs, + const xds_grpclb_duration* rhs) { + GPR_ASSERT(lhs && rhs); + if (lhs->has_seconds && rhs->has_seconds) { + if (lhs->seconds < rhs->seconds) return -1; + if (lhs->seconds > rhs->seconds) return 1; + } else if (lhs->has_seconds) { + return 1; + } else if (rhs->has_seconds) { + return -1; + } + + GPR_ASSERT(lhs->seconds == rhs->seconds); + if (lhs->has_nanos && rhs->has_nanos) { + if (lhs->nanos < rhs->nanos) return -1; + if (lhs->nanos > rhs->nanos) return 1; + } else if (lhs->has_nanos) { + return 1; + } else if (rhs->has_nanos) { + return -1; + } + + return 0; +} + +grpc_millis xds_grpclb_duration_to_millis(xds_grpclb_duration* duration_pb) { + return static_cast<grpc_millis>( + (duration_pb->has_seconds ? duration_pb->seconds : 0) * GPR_MS_PER_SEC + + (duration_pb->has_nanos ? duration_pb->nanos : 0) / GPR_NS_PER_MS); +} + +void xds_grpclb_initial_response_destroy( + xds_grpclb_initial_response* response) { + gpr_free(response); +} diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h new file mode 100644 index 0000000000..3f096af6d6 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h @@ -0,0 +1,89 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_LOAD_BALANCER_API_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_LOAD_BALANCER_API_H + +#include <grpc/support/port_platform.h> + +#include <grpc/slice_buffer.h> + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" +#include "src/core/ext/filters/client_channel/lb_policy_factory.h" + +#define XDS_SERVICE_NAME_MAX_LENGTH 128 + +typedef grpc_lb_v1_Server_ip_address_t xds_grpclb_ip_address; +typedef grpc_lb_v1_LoadBalanceRequest xds_grpclb_request; +typedef grpc_lb_v1_InitialLoadBalanceResponse xds_grpclb_initial_response; +typedef grpc_lb_v1_Server xds_grpclb_server; +typedef google_protobuf_Duration xds_grpclb_duration; +typedef google_protobuf_Timestamp xds_grpclb_timestamp; + +typedef struct { + xds_grpclb_server** servers; + size_t num_servers; +} xds_grpclb_serverlist; + +/** Create a request for a gRPC LB service under \a lb_service_name */ +xds_grpclb_request* xds_grpclb_request_create(const char* lb_service_name); +xds_grpclb_request* xds_grpclb_load_report_request_create_locked( + grpc_core::XdsLbClientStats* client_stats); + +/** Protocol Buffers v3-encode \a request */ +grpc_slice xds_grpclb_request_encode(const xds_grpclb_request* request); + +/** Destroy \a request */ +void xds_grpclb_request_destroy(xds_grpclb_request* request); + +/** Parse (ie, decode) the bytes in \a encoded_xds_grpclb_response as a \a + * xds_grpclb_initial_response */ +xds_grpclb_initial_response* xds_grpclb_initial_response_parse( + grpc_slice encoded_xds_grpclb_response); + +/** Parse the list of servers from an encoded \a xds_grpclb_response */ +xds_grpclb_serverlist* xds_grpclb_response_parse_serverlist( + grpc_slice encoded_xds_grpclb_response); + +/** Return a copy of \a sl. The caller is responsible for calling \a + * xds_grpclb_destroy_serverlist on the returned copy. */ +xds_grpclb_serverlist* xds_grpclb_serverlist_copy( + const xds_grpclb_serverlist* sl); + +bool xds_grpclb_serverlist_equals(const xds_grpclb_serverlist* lhs, + const xds_grpclb_serverlist* rhs); + +bool xds_grpclb_server_equals(const xds_grpclb_server* lhs, + const xds_grpclb_server* rhs); + +/** Destroy \a serverlist */ +void xds_grpclb_destroy_serverlist(xds_grpclb_serverlist* serverlist); + +/** Compare \a lhs against \a rhs and return 0 if \a lhs and \a rhs are equal, + * < 0 if \a lhs represents a duration shorter than \a rhs and > 0 otherwise */ +int xds_grpclb_duration_compare(const xds_grpclb_duration* lhs, + const xds_grpclb_duration* rhs); + +grpc_millis xds_grpclb_duration_to_millis(xds_grpclb_duration* duration_pb); + +/** Destroy \a initial_response */ +void xds_grpclb_initial_response_destroy(xds_grpclb_initial_response* response); + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_LOAD_BALANCER_API_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc new file mode 100644 index 0000000000..e6c94167f6 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.cc @@ -0,0 +1,1894 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/// Implementation of the gRPC LB policy. +/// +/// This policy takes as input a list of resolved addresses, which must +/// include at least one balancer address. +/// +/// An internal channel (\a lb_channel_) is created for the addresses +/// from that are balancers. This channel behaves just like a regular +/// channel that uses pick_first to select from the list of balancer +/// addresses. +/// +/// The first time the policy gets a request for a pick, a ping, or to exit +/// the idle state, \a StartPickingLocked() is called. This method is +/// responsible for instantiating the internal *streaming* call to the LB +/// server (whichever address pick_first chose). The call will be complete +/// when either the balancer sends status or when we cancel the call (e.g., +/// because we are shutting down). In needed, we retry the call. If we +/// received at least one valid message from the server, a new call attempt +/// will be made immediately; otherwise, we apply back-off delays between +/// attempts. +/// +/// We maintain an internal round_robin policy instance for distributing +/// requests across backends. Whenever we receive a new serverlist from +/// the balancer, we update the round_robin policy with the new list of +/// addresses. If we cannot communicate with the balancer on startup, +/// however, we may enter fallback mode, in which case we will populate +/// the RR policy's addresses from the backend addresses returned by the +/// resolver. +/// +/// Once an RR policy instance is in place (and getting updated as described), +/// calls for a pick, a ping, or a cancellation will be serviced right +/// away by forwarding them to the RR instance. Any time there's no RR +/// policy available (i.e., right after the creation of the gRPCLB policy), +/// pick and ping requests are added to a list of pending picks and pings +/// to be flushed and serviced when the RR policy instance becomes available. +/// +/// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the +/// high level design and details. + +// With the addition of a libuv endpoint, sockaddr.h now includes uv.h when +// using that endpoint. Because of various transitive includes in uv.h, +// including windows.h on Windows, uv.h must be included before other system +// headers. Therefore, sockaddr.h must always be included first. +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/socket_utils.h" + +#include <inttypes.h> +#include <limits.h> +#include <string.h> + +#include <grpc/byte_buffer_reader.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> + +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/ext/filters/client_channel/client_channel_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h" +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" +#include "src/core/ext/filters/client_channel/lb_policy_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy_registry.h" +#include "src/core/ext/filters/client_channel/parse_address.h" +#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" +#include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/gpr/host_port.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/mutex_lock.h" +#include "src/core/lib/gprpp/orphanable.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/combiner.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/slice/slice_hash_table.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/surface/call.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/channel_init.h" +#include "src/core/lib/transport/static_metadata.h" + +#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1 +#define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6 +#define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120 +#define GRPC_XDS_RECONNECT_JITTER 0.2 +#define GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS 10000 + +namespace grpc_core { + +TraceFlag grpc_lb_xds_trace(false, "xds"); + +namespace { + +class XdsLb : public LoadBalancingPolicy { + public: + XdsLb(const grpc_lb_addresses* addresses, const Args& args); + + void UpdateLocked(const grpc_channel_args& args) override; + bool PickLocked(PickState* pick, grpc_error** error) override; + void CancelPickLocked(PickState* pick, grpc_error* error) override; + void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) override; + void NotifyOnStateChangeLocked(grpc_connectivity_state* state, + grpc_closure* closure) override; + grpc_connectivity_state CheckConnectivityLocked( + grpc_error** connectivity_error) override; + void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; + void ExitIdleLocked() override; + void ResetBackoffLocked() override; + void FillChildRefsForChannelz( + channelz::ChildRefsList* child_subchannels, + channelz::ChildRefsList* child_channels) override; + + private: + /// Linked list of pending pick requests. It stores all information needed to + /// eventually call (Round Robin's) pick() on them. They mainly stay pending + /// waiting for the RR policy to be created. + /// + /// Note that when a pick is sent to the RR policy, we inject our own + /// on_complete callback, so that we can intercept the result before + /// invoking the original on_complete callback. This allows us to set the + /// LB token metadata and add client_stats to the call context. + /// See \a pending_pick_complete() for details. + struct PendingPick { + // The xds lb instance that created the wrapping. This instance is not + // owned; reference counts are untouched. It's used only for logging + // purposes. + XdsLb* xdslb_policy; + // The original pick. + PickState* pick; + // Our on_complete closure and the original one. + grpc_closure on_complete; + grpc_closure* original_on_complete; + // The LB token associated with the pick. This is set via user_data in + // the pick. + grpc_mdelem lb_token; + // Stats for client-side load reporting. + RefCountedPtr<XdsLbClientStats> client_stats; + // Next pending pick. + PendingPick* next = nullptr; + }; + + /// Contains a call to the LB server and all the data related to the call. + class BalancerCallState + : public InternallyRefCountedWithTracing<BalancerCallState> { + public: + explicit BalancerCallState( + RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy); + + // It's the caller's responsibility to ensure that Orphan() is called from + // inside the combiner. + void Orphan() override; + + void StartQuery(); + + XdsLbClientStats* client_stats() const { return client_stats_.get(); } + + bool seen_initial_response() const { return seen_initial_response_; } + + private: + // So Delete() can access our private dtor. + template <typename T> + friend void grpc_core::Delete(T*); + + ~BalancerCallState(); + + XdsLb* xdslb_policy() const { + return static_cast<XdsLb*>(xdslb_policy_.get()); + } + + void ScheduleNextClientLoadReportLocked(); + void SendClientLoadReportLocked(); + + static bool LoadReportCountersAreZero(xds_grpclb_request* request); + + static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error); + static void ClientLoadReportDoneLocked(void* arg, grpc_error* error); + static void OnInitialRequestSentLocked(void* arg, grpc_error* error); + static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error); + static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error); + + // The owning LB policy. + RefCountedPtr<LoadBalancingPolicy> xdslb_policy_; + + // The streaming call to the LB server. Always non-NULL. + grpc_call* lb_call_ = nullptr; + + // recv_initial_metadata + grpc_metadata_array lb_initial_metadata_recv_; + + // send_message + grpc_byte_buffer* send_message_payload_ = nullptr; + grpc_closure lb_on_initial_request_sent_; + + // recv_message + grpc_byte_buffer* recv_message_payload_ = nullptr; + grpc_closure lb_on_balancer_message_received_; + bool seen_initial_response_ = false; + + // recv_trailing_metadata + grpc_closure lb_on_balancer_status_received_; + grpc_metadata_array lb_trailing_metadata_recv_; + grpc_status_code lb_call_status_; + grpc_slice lb_call_status_details_; + + // The stats for client-side load reporting associated with this LB call. + // Created after the first serverlist is received. + RefCountedPtr<XdsLbClientStats> client_stats_; + grpc_millis client_stats_report_interval_ = 0; + grpc_timer client_load_report_timer_; + bool client_load_report_timer_callback_pending_ = false; + bool last_client_load_report_counters_were_zero_ = false; + bool client_load_report_is_due_ = false; + // The closure used for either the load report timer or the callback for + // completion of sending the load report. + grpc_closure client_load_report_closure_; + }; + + ~XdsLb(); + + void ShutdownLocked() override; + + // Helper function used in ctor and UpdateLocked(). + void ProcessChannelArgsLocked(const grpc_channel_args& args); + + // Methods for dealing with the balancer channel and call. + void StartPickingLocked(); + void StartBalancerCallLocked(); + static void OnFallbackTimerLocked(void* arg, grpc_error* error); + void StartBalancerCallRetryTimerLocked(); + static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error); + static void OnBalancerChannelConnectivityChangedLocked(void* arg, + grpc_error* error); + + // Pending pick methods. + static void PendingPickSetMetadataAndContext(PendingPick* pp); + PendingPick* PendingPickCreate(PickState* pick); + void AddPendingPick(PendingPick* pp); + static void OnPendingPickComplete(void* arg, grpc_error* error); + + // Methods for dealing with the RR policy. + void CreateOrUpdateRoundRobinPolicyLocked(); + grpc_channel_args* CreateRoundRobinPolicyArgsLocked(); + void CreateRoundRobinPolicyLocked(const Args& args); + bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, + grpc_error** error); + void UpdateConnectivityStateFromRoundRobinPolicyLocked( + grpc_error* rr_state_error); + static void OnRoundRobinConnectivityChangedLocked(void* arg, + grpc_error* error); + static void OnRoundRobinRequestReresolutionLocked(void* arg, + grpc_error* error); + + // Who the client is trying to communicate with. + const char* server_name_ = nullptr; + + // Current channel args from the resolver. + grpc_channel_args* args_ = nullptr; + + // Internal state. + bool started_picking_ = false; + bool shutting_down_ = false; + grpc_connectivity_state_tracker state_tracker_; + + // The channel for communicating with the LB server. + grpc_channel* lb_channel_ = nullptr; + // Mutex to protect the channel to the LB server. This is used when + // processing a channelz request. + gpr_mu lb_channel_mu_; + grpc_connectivity_state lb_channel_connectivity_; + grpc_closure lb_channel_on_connectivity_changed_; + // Are we already watching the LB channel's connectivity? + bool watching_lb_channel_ = false; + // Response generator to inject address updates into lb_channel_. + RefCountedPtr<FakeResolverResponseGenerator> response_generator_; + + // The data associated with the current LB call. It holds a ref to this LB + // policy. It's initialized every time we query for backends. It's reset to + // NULL whenever the current LB call is no longer needed (e.g., the LB policy + // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always + // contains a non-NULL lb_call_. + OrphanablePtr<BalancerCallState> lb_calld_; + // Timeout in milliseconds for the LB call. 0 means no deadline. + int lb_call_timeout_ms_ = 0; + // Balancer call retry state. + BackOff lb_call_backoff_; + bool retry_timer_callback_pending_ = false; + grpc_timer lb_call_retry_timer_; + grpc_closure lb_on_call_retry_; + + // The deserialized response from the balancer. May be nullptr until one + // such response has arrived. + xds_grpclb_serverlist* serverlist_ = nullptr; + // Index into serverlist for next pick. + // If the server at this index is a drop, we return a drop. + // Otherwise, we delegate to the RR policy. + size_t serverlist_index_ = 0; + + // Timeout in milliseconds for before using fallback backend addresses. + // 0 means not using fallback. + int lb_fallback_timeout_ms_ = 0; + // The backend addresses from the resolver. + grpc_lb_addresses* fallback_backend_addresses_ = nullptr; + // Fallback timer. + bool fallback_timer_callback_pending_ = false; + grpc_timer lb_fallback_timer_; + grpc_closure lb_on_fallback_; + + // Pending picks that are waiting on the RR policy's connectivity. + PendingPick* pending_picks_ = nullptr; + + // The RR policy to use for the backends. + OrphanablePtr<LoadBalancingPolicy> rr_policy_; + grpc_connectivity_state rr_connectivity_state_; + grpc_closure on_rr_connectivity_changed_; + grpc_closure on_rr_request_reresolution_; +}; + +// +// serverlist parsing code +// + +// vtable for LB tokens in grpc_lb_addresses +void* lb_token_copy(void* token) { + return token == nullptr + ? nullptr + : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload; +} +void lb_token_destroy(void* token) { + if (token != nullptr) { + GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token}); + } +} +int lb_token_cmp(void* token1, void* token2) { + if (token1 > token2) return 1; + if (token1 < token2) return -1; + return 0; +} +const grpc_lb_user_data_vtable lb_token_vtable = { + lb_token_copy, lb_token_destroy, lb_token_cmp}; + +// Returns the backend addresses extracted from the given addresses. +grpc_lb_addresses* ExtractBackendAddresses(const grpc_lb_addresses* addresses) { + // First pass: count the number of backend addresses. + size_t num_backends = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (!addresses->addresses[i].is_balancer) { + ++num_backends; + } + } + // Second pass: actually populate the addresses and (empty) LB tokens. + grpc_lb_addresses* backend_addresses = + grpc_lb_addresses_create(num_backends, &lb_token_vtable); + size_t num_copied = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) continue; + const grpc_resolved_address* addr = &addresses->addresses[i].address; + grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, + addr->len, false /* is_balancer */, + nullptr /* balancer_name */, + (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); + ++num_copied; + } + return backend_addresses; +} + +bool IsServerValid(const xds_grpclb_server* server, size_t idx, bool log) { + if (server->drop) return false; + const xds_grpclb_ip_address* ip = &server->ip_address; + if (GPR_UNLIKELY(server->port >> 16 != 0)) { + if (log) { + gpr_log(GPR_ERROR, + "Invalid port '%d' at index %lu of serverlist. Ignoring.", + server->port, (unsigned long)idx); + } + return false; + } + if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) { + if (log) { + gpr_log(GPR_ERROR, + "Expected IP to be 4 or 16 bytes, got %d at index %lu of " + "serverlist. Ignoring", + ip->size, (unsigned long)idx); + } + return false; + } + return true; +} + +void ParseServer(const xds_grpclb_server* server, grpc_resolved_address* addr) { + memset(addr, 0, sizeof(*addr)); + if (server->drop) return; + const uint16_t netorder_port = grpc_htons((uint16_t)server->port); + /* the addresses are given in binary format (a in(6)_addr struct) in + * server->ip_address.bytes. */ + const xds_grpclb_ip_address* ip = &server->ip_address; + if (ip->size == 4) { + addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in)); + grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr); + addr4->sin_family = GRPC_AF_INET; + memcpy(&addr4->sin_addr, ip->bytes, ip->size); + addr4->sin_port = netorder_port; + } else if (ip->size == 16) { + addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6)); + grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr; + addr6->sin6_family = GRPC_AF_INET6; + memcpy(&addr6->sin6_addr, ip->bytes, ip->size); + addr6->sin6_port = netorder_port; + } +} + +// Returns addresses extracted from \a serverlist. +grpc_lb_addresses* ProcessServerlist(const xds_grpclb_serverlist* serverlist) { + size_t num_valid = 0; + /* first pass: count how many are valid in order to allocate the necessary + * memory in a single block */ + for (size_t i = 0; i < serverlist->num_servers; ++i) { + if (IsServerValid(serverlist->servers[i], i, true)) ++num_valid; + } + grpc_lb_addresses* lb_addresses = + grpc_lb_addresses_create(num_valid, &lb_token_vtable); + /* second pass: actually populate the addresses and LB tokens (aka user data + * to the outside world) to be read by the RR policy during its creation. + * Given that the validity tests are very cheap, they are performed again + * instead of marking the valid ones during the first pass, as this would + * incurr in an allocation due to the arbitrary number of server */ + size_t addr_idx = 0; + for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { + const xds_grpclb_server* server = serverlist->servers[sl_idx]; + if (!IsServerValid(serverlist->servers[sl_idx], sl_idx, false)) continue; + GPR_ASSERT(addr_idx < num_valid); + /* address processing */ + grpc_resolved_address addr; + ParseServer(server, &addr); + /* lb token processing */ + void* user_data; + if (server->has_load_balance_token) { + const size_t lb_token_max_length = + GPR_ARRAY_SIZE(server->load_balance_token); + const size_t lb_token_length = + strnlen(server->load_balance_token, lb_token_max_length); + grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer( + server->load_balance_token, lb_token_length); + user_data = + (void*)grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr) + .payload; + } else { + char* uri = grpc_sockaddr_to_uri(&addr); + gpr_log(GPR_INFO, + "Missing LB token for backend address '%s'. The empty token will " + "be used instead", + uri); + gpr_free(uri); + user_data = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload; + } + grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len, + false /* is_balancer */, + nullptr /* balancer_name */, user_data); + ++addr_idx; + } + GPR_ASSERT(addr_idx == num_valid); + return lb_addresses; +} + +// +// XdsLb::BalancerCallState +// + +XdsLb::BalancerCallState::BalancerCallState( + RefCountedPtr<LoadBalancingPolicy> parent_xdslb_policy) + : InternallyRefCountedWithTracing<BalancerCallState>(&grpc_lb_xds_trace), + xdslb_policy_(std::move(parent_xdslb_policy)) { + GPR_ASSERT(xdslb_policy_ != nullptr); + GPR_ASSERT(!xdslb_policy()->shutting_down_); + // Init the LB call. Note that the LB call will progress every time there's + // activity in xdslb_policy_->interested_parties(), which is comprised of + // the polling entities from client_channel. + GPR_ASSERT(xdslb_policy()->server_name_ != nullptr); + GPR_ASSERT(xdslb_policy()->server_name_[0] != '\0'); + const grpc_millis deadline = + xdslb_policy()->lb_call_timeout_ms_ == 0 + ? GRPC_MILLIS_INF_FUTURE + : ExecCtx::Get()->Now() + xdslb_policy()->lb_call_timeout_ms_; + lb_call_ = grpc_channel_create_pollset_set_call( + xdslb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS, + xdslb_policy_->interested_parties(), + GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, + nullptr, deadline, nullptr); + // Init the LB call request payload. + xds_grpclb_request* request = + xds_grpclb_request_create(xdslb_policy()->server_name_); + grpc_slice request_payload_slice = xds_grpclb_request_encode(request); + send_message_payload_ = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(request_payload_slice); + xds_grpclb_request_destroy(request); + // Init other data associated with the LB call. + grpc_metadata_array_init(&lb_initial_metadata_recv_); + grpc_metadata_array_init(&lb_trailing_metadata_recv_); + GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked, + this, grpc_combiner_scheduler(xdslb_policy()->combiner())); + GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_, + OnBalancerMessageReceivedLocked, this, + grpc_combiner_scheduler(xdslb_policy()->combiner())); + GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_, + OnBalancerStatusReceivedLocked, this, + grpc_combiner_scheduler(xdslb_policy()->combiner())); +} + +XdsLb::BalancerCallState::~BalancerCallState() { + GPR_ASSERT(lb_call_ != nullptr); + grpc_call_unref(lb_call_); + grpc_metadata_array_destroy(&lb_initial_metadata_recv_); + grpc_metadata_array_destroy(&lb_trailing_metadata_recv_); + grpc_byte_buffer_destroy(send_message_payload_); + grpc_byte_buffer_destroy(recv_message_payload_); + grpc_slice_unref_internal(lb_call_status_details_); +} + +void XdsLb::BalancerCallState::Orphan() { + GPR_ASSERT(lb_call_ != nullptr); + // If we are here because xdslb_policy wants to cancel the call, + // lb_on_balancer_status_received_ will complete the cancellation and clean + // up. Otherwise, we are here because xdslb_policy has to orphan a failed + // call, then the following cancellation will be a no-op. + grpc_call_cancel(lb_call_, nullptr); + if (client_load_report_timer_callback_pending_) { + grpc_timer_cancel(&client_load_report_timer_); + } + // Note that the initial ref is hold by lb_on_balancer_status_received_ + // instead of the caller of this function. So the corresponding unref happens + // in lb_on_balancer_status_received_ instead of here. +} + +void XdsLb::BalancerCallState::StartQuery() { + GPR_ASSERT(lb_call_ != nullptr); + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, "[xdslb %p] Starting LB call (lb_calld: %p, lb_call: %p)", + xdslb_policy_.get(), this, lb_call_); + } + // Create the ops. + grpc_call_error call_error; + grpc_op ops[3]; + memset(ops, 0, sizeof(ops)); + // Op: send initial metadata. + grpc_op* op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + // Op: send request message. + GPR_ASSERT(send_message_payload_ != nullptr); + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = send_message_payload_; + op->flags = 0; + op->reserved = nullptr; + op++; + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent"); + self.release(); + call_error = grpc_call_start_batch_and_execute( + lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: recv initial metadata. + op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = + &lb_initial_metadata_recv_; + op->flags = 0; + op->reserved = nullptr; + op++; + // Op: recv response. + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &recv_message_payload_; + op->flags = 0; + op->reserved = nullptr; + op++; + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + self = Ref(DEBUG_LOCATION, "on_message_received"); + self.release(); + call_error = grpc_call_start_batch_and_execute( + lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: recv server status. + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &lb_trailing_metadata_recv_; + op->data.recv_status_on_client.status = &lb_call_status_; + op->data.recv_status_on_client.status_details = &lb_call_status_details_; + op->flags = 0; + op->reserved = nullptr; + op++; + // This callback signals the end of the LB call, so it relies on the initial + // ref instead of a new ref. When it's invoked, it's the initial ref that is + // unreffed. + call_error = grpc_call_start_batch_and_execute( + lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +void XdsLb::BalancerCallState::ScheduleNextClientLoadReportLocked() { + const grpc_millis next_client_load_report_time = + ExecCtx::Get()->Now() + client_stats_report_interval_; + GRPC_CLOSURE_INIT(&client_load_report_closure_, + MaybeSendClientLoadReportLocked, this, + grpc_combiner_scheduler(xdslb_policy()->combiner())); + grpc_timer_init(&client_load_report_timer_, next_client_load_report_time, + &client_load_report_closure_); + client_load_report_timer_callback_pending_ = true; +} + +void XdsLb::BalancerCallState::MaybeSendClientLoadReportLocked( + void* arg, grpc_error* error) { + BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); + XdsLb* xdslb_policy = lb_calld->xdslb_policy(); + lb_calld->client_load_report_timer_callback_pending_ = false; + if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) { + lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); + return; + } + // If we've already sent the initial request, then we can go ahead and send + // the load report. Otherwise, we need to wait until the initial request has + // been sent to send this (see OnInitialRequestSentLocked()). + if (lb_calld->send_message_payload_ == nullptr) { + lb_calld->SendClientLoadReportLocked(); + } else { + lb_calld->client_load_report_is_due_ = true; + } +} + +bool XdsLb::BalancerCallState::LoadReportCountersAreZero( + xds_grpclb_request* request) { + XdsLbClientStats::DroppedCallCounts* drop_entries = + static_cast<XdsLbClientStats::DroppedCallCounts*>( + request->client_stats.calls_finished_with_drop.arg); + return request->client_stats.num_calls_started == 0 && + request->client_stats.num_calls_finished == 0 && + request->client_stats.num_calls_finished_with_client_failed_to_send == + 0 && + request->client_stats.num_calls_finished_known_received == 0 && + (drop_entries == nullptr || drop_entries->size() == 0); +} + +void XdsLb::BalancerCallState::SendClientLoadReportLocked() { + // Construct message payload. + GPR_ASSERT(send_message_payload_ == nullptr); + xds_grpclb_request* request = + xds_grpclb_load_report_request_create_locked(client_stats_.get()); + // Skip client load report if the counters were all zero in the last + // report and they are still zero in this one. + if (LoadReportCountersAreZero(request)) { + if (last_client_load_report_counters_were_zero_) { + xds_grpclb_request_destroy(request); + ScheduleNextClientLoadReportLocked(); + return; + } + last_client_load_report_counters_were_zero_ = true; + } else { + last_client_load_report_counters_were_zero_ = false; + } + grpc_slice request_payload_slice = xds_grpclb_request_encode(request); + send_message_payload_ = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(request_payload_slice); + xds_grpclb_request_destroy(request); + // Send the report. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = send_message_payload_; + GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked, + this, grpc_combiner_scheduler(xdslb_policy()->combiner())); + grpc_call_error call_error = grpc_call_start_batch_and_execute( + lb_call_, &op, 1, &client_load_report_closure_); + if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) { + gpr_log(GPR_ERROR, "[xdslb %p] call_error=%d", xdslb_policy_.get(), + call_error); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } +} + +void XdsLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg, + grpc_error* error) { + BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); + XdsLb* xdslb_policy = lb_calld->xdslb_policy(); + grpc_byte_buffer_destroy(lb_calld->send_message_payload_); + lb_calld->send_message_payload_ = nullptr; + if (error != GRPC_ERROR_NONE || lb_calld != xdslb_policy->lb_calld_.get()) { + lb_calld->Unref(DEBUG_LOCATION, "client_load_report"); + return; + } + lb_calld->ScheduleNextClientLoadReportLocked(); +} + +void XdsLb::BalancerCallState::OnInitialRequestSentLocked(void* arg, + grpc_error* error) { + BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); + grpc_byte_buffer_destroy(lb_calld->send_message_payload_); + lb_calld->send_message_payload_ = nullptr; + // If we attempted to send a client load report before the initial request was + // sent (and this lb_calld is still in use), send the load report now. + if (lb_calld->client_load_report_is_due_ && + lb_calld == lb_calld->xdslb_policy()->lb_calld_.get()) { + lb_calld->SendClientLoadReportLocked(); + lb_calld->client_load_report_is_due_ = false; + } + lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent"); +} + +void XdsLb::BalancerCallState::OnBalancerMessageReceivedLocked( + void* arg, grpc_error* error) { + BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); + XdsLb* xdslb_policy = lb_calld->xdslb_policy(); + // Empty payload means the LB call was cancelled. + if (lb_calld != xdslb_policy->lb_calld_.get() || + lb_calld->recv_message_payload_ == nullptr) { + lb_calld->Unref(DEBUG_LOCATION, "on_message_received"); + return; + } + grpc_byte_buffer_reader bbr; + grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_); + grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_byte_buffer_reader_destroy(&bbr); + grpc_byte_buffer_destroy(lb_calld->recv_message_payload_); + lb_calld->recv_message_payload_ = nullptr; + xds_grpclb_initial_response* initial_response; + xds_grpclb_serverlist* serverlist; + if (!lb_calld->seen_initial_response_ && + (initial_response = xds_grpclb_initial_response_parse(response_slice)) != + nullptr) { + // Have NOT seen initial response, look for initial response. + if (initial_response->has_client_stats_report_interval) { + lb_calld->client_stats_report_interval_ = GPR_MAX( + GPR_MS_PER_SEC, xds_grpclb_duration_to_millis( + &initial_response->client_stats_report_interval)); + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Received initial LB response message; " + "client load reporting interval = %" PRId64 " milliseconds", + xdslb_policy, lb_calld->client_stats_report_interval_); + } + } else if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Received initial LB response message; client load " + "reporting NOT enabled", + xdslb_policy); + } + xds_grpclb_initial_response_destroy(initial_response); + lb_calld->seen_initial_response_ = true; + } else if ((serverlist = xds_grpclb_response_parse_serverlist( + response_slice)) != nullptr) { + // Have seen initial response, look for serverlist. + GPR_ASSERT(lb_calld->lb_call_ != nullptr); + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Serverlist with %" PRIuPTR " servers received", + xdslb_policy, serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + ParseServer(serverlist->servers[i], &addr); + char* ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "[xdslb %p] Serverlist[%" PRIuPTR "]: %s", + xdslb_policy, i, ipport); + gpr_free(ipport); + } + } + /* update serverlist */ + if (serverlist->num_servers > 0) { + // Start sending client load report only after we start using the + // serverlist returned from the current LB call. + if (lb_calld->client_stats_report_interval_ > 0 && + lb_calld->client_stats_ == nullptr) { + lb_calld->client_stats_.reset(New<XdsLbClientStats>()); + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report"); + self.release(); + lb_calld->ScheduleNextClientLoadReportLocked(); + } + if (xds_grpclb_serverlist_equals(xdslb_policy->serverlist_, serverlist)) { + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Incoming server list identical to current, " + "ignoring.", + xdslb_policy); + } + xds_grpclb_destroy_serverlist(serverlist); + } else { /* new serverlist */ + if (xdslb_policy->serverlist_ != nullptr) { + /* dispose of the old serverlist */ + xds_grpclb_destroy_serverlist(xdslb_policy->serverlist_); + } else { + /* or dispose of the fallback */ + grpc_lb_addresses_destroy(xdslb_policy->fallback_backend_addresses_); + xdslb_policy->fallback_backend_addresses_ = nullptr; + if (xdslb_policy->fallback_timer_callback_pending_) { + grpc_timer_cancel(&xdslb_policy->lb_fallback_timer_); + } + } + // and update the copy in the XdsLb instance. This + // serverlist instance will be destroyed either upon the next + // update or when the XdsLb instance is destroyed. + xdslb_policy->serverlist_ = serverlist; + xdslb_policy->serverlist_index_ = 0; + xdslb_policy->CreateOrUpdateRoundRobinPolicyLocked(); + } + } else { + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, "[xdslb %p] Received empty server list, ignoring.", + xdslb_policy); + } + xds_grpclb_destroy_serverlist(serverlist); + } + } else { + // No valid initial response or serverlist found. + char* response_slice_str = + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX); + gpr_log(GPR_ERROR, + "[xdslb %p] Invalid LB response received: '%s'. Ignoring.", + xdslb_policy, response_slice_str); + gpr_free(response_slice_str); + } + grpc_slice_unref_internal(response_slice); + if (!xdslb_policy->shutting_down_) { + // Keep listening for serverlist updates. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_RECV_MESSAGE; + op.data.recv_message.recv_message = &lb_calld->recv_message_payload_; + op.flags = 0; + op.reserved = nullptr; + // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery(). + const grpc_call_error call_error = grpc_call_start_batch_and_execute( + lb_calld->lb_call_, &op, 1, + &lb_calld->lb_on_balancer_message_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } else { + lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown"); + } +} + +void XdsLb::BalancerCallState::OnBalancerStatusReceivedLocked( + void* arg, grpc_error* error) { + BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg); + XdsLb* xdslb_policy = lb_calld->xdslb_policy(); + GPR_ASSERT(lb_calld->lb_call_ != nullptr); + if (grpc_lb_xds_trace.enabled()) { + char* status_details = + grpc_slice_to_c_string(lb_calld->lb_call_status_details_); + gpr_log(GPR_INFO, + "[xdslb %p] Status from LB server received. Status = %d, details " + "= '%s', (lb_calld: %p, lb_call: %p), error '%s'", + xdslb_policy, lb_calld->lb_call_status_, status_details, lb_calld, + lb_calld->lb_call_, grpc_error_string(error)); + gpr_free(status_details); + } + xdslb_policy->TryReresolutionLocked(&grpc_lb_xds_trace, GRPC_ERROR_NONE); + // If this lb_calld is still in use, this call ended because of a failure so + // we want to retry connecting. Otherwise, we have deliberately ended this + // call and no further action is required. + if (lb_calld == xdslb_policy->lb_calld_.get()) { + xdslb_policy->lb_calld_.reset(); + GPR_ASSERT(!xdslb_policy->shutting_down_); + if (lb_calld->seen_initial_response_) { + // If we lose connection to the LB server, reset the backoff and restart + // the LB call immediately. + xdslb_policy->lb_call_backoff_.Reset(); + xdslb_policy->StartBalancerCallLocked(); + } else { + // If this LB call fails establishing any connection to the LB server, + // retry later. + xdslb_policy->StartBalancerCallRetryTimerLocked(); + } + } + lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended"); +} + +// +// helper code for creating balancer channel +// + +grpc_lb_addresses* ExtractBalancerAddresses( + const grpc_lb_addresses* addresses) { + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + } + // There must be at least one balancer address, or else the + // client_channel would not have chosen this LB policy. + GPR_ASSERT(num_grpclb_addrs > 0); + grpc_lb_addresses* lb_addresses = + grpc_lb_addresses_create(num_grpclb_addrs, nullptr); + size_t lb_addresses_idx = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (!addresses->addresses[i].is_balancer) continue; + if (GPR_UNLIKELY(addresses->addresses[i].user_data != nullptr)) { + gpr_log(GPR_ERROR, + "This LB policy doesn't support user data. It will be ignored"); + } + grpc_lb_addresses_set_address( + lb_addresses, lb_addresses_idx++, addresses->addresses[i].address.addr, + addresses->addresses[i].address.len, false /* is balancer */, + addresses->addresses[i].balancer_name, nullptr /* user data */); + } + GPR_ASSERT(num_grpclb_addrs == lb_addresses_idx); + return lb_addresses; +} + +/* Returns the channel args for the LB channel, used to create a bidirectional + * stream for the reception of load balancing updates. + * + * Inputs: + * - \a addresses: corresponding to the balancers. + * - \a response_generator: in order to propagate updates from the resolver + * above the grpclb policy. + * - \a args: other args inherited from the grpclb policy. */ +grpc_channel_args* BuildBalancerChannelArgs( + const grpc_lb_addresses* addresses, + FakeResolverResponseGenerator* response_generator, + const grpc_channel_args* args) { + grpc_lb_addresses* lb_addresses = ExtractBalancerAddresses(addresses); + // Channel args to remove. + static const char* args_to_remove[] = { + // LB policy name, since we want to use the default (pick_first) in + // the LB channel. + GRPC_ARG_LB_POLICY_NAME, + // The channel arg for the server URI, since that will be different for + // the LB channel than for the parent channel. The client channel + // factory will re-add this arg with the right value. + GRPC_ARG_SERVER_URI, + // The resolved addresses, which will be generated by the name resolver + // used in the LB channel. Note that the LB channel will use the fake + // resolver, so this won't actually generate a query to DNS (or some + // other name service). However, the addresses returned by the fake + // resolver will have is_balancer=false, whereas our own addresses have + // is_balancer=true. We need the LB channel to return addresses with + // is_balancer=false so that it does not wind up recursively using the + // grpclb LB policy, as per the special case logic in client_channel.c. + GRPC_ARG_LB_ADDRESSES, + // The fake resolver response generator, because we are replacing it + // with the one from the grpclb policy, used to propagate updates to + // the LB channel. + GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, + // The LB channel should use the authority indicated by the target + // authority table (see \a grpc_lb_policy_xds_modify_lb_channel_args), + // as opposed to the authority from the parent channel. + GRPC_ARG_DEFAULT_AUTHORITY, + // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be + // treated as a stand-alone channel and not inherit this argument from the + // args of the parent channel. + GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, + }; + // Channel args to add. + const grpc_arg args_to_add[] = { + // New LB addresses. + // Note that we pass these in both when creating the LB channel + // and via the fake resolver. The latter is what actually gets used. + grpc_lb_addresses_create_channel_arg(lb_addresses), + // The fake resolver response generator, which we use to inject + // address updates into the LB channel. + grpc_core::FakeResolverResponseGenerator::MakeChannelArg( + response_generator), + // A channel arg indicating the target is a grpclb load balancer. + grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER), 1), + // A channel arg indicating this is an internal channels, aka it is + // owned by components in Core, not by the user application. + grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1), + }; + // Construct channel args. + grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( + args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add, + GPR_ARRAY_SIZE(args_to_add)); + // Make any necessary modifications for security. + new_args = grpc_lb_policy_xds_modify_lb_channel_args(new_args); + // Clean up. + grpc_lb_addresses_destroy(lb_addresses); + return new_args; +} + +// +// ctor and dtor +// + +XdsLb::XdsLb(const grpc_lb_addresses* addresses, + const LoadBalancingPolicy::Args& args) + : LoadBalancingPolicy(args), + response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()), + lb_call_backoff_( + BackOff::Options() + .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS * + 1000) + .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_XDS_RECONNECT_JITTER) + .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { + // Initialization. + gpr_mu_init(&lb_channel_mu_); + grpc_subchannel_index_ref(); + GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_, + &XdsLb::OnBalancerChannelConnectivityChangedLocked, this, + grpc_combiner_scheduler(args.combiner)); + GRPC_CLOSURE_INIT(&on_rr_connectivity_changed_, + &XdsLb::OnRoundRobinConnectivityChangedLocked, this, + grpc_combiner_scheduler(args.combiner)); + GRPC_CLOSURE_INIT(&on_rr_request_reresolution_, + &XdsLb::OnRoundRobinRequestReresolutionLocked, this, + grpc_combiner_scheduler(args.combiner)); + grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "xds"); + // Record server name. + const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI); + const char* server_uri = grpc_channel_arg_get_string(arg); + GPR_ASSERT(server_uri != nullptr); + grpc_uri* uri = grpc_uri_parse(server_uri, true); + GPR_ASSERT(uri->path[0] != '\0'); + server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Will use '%s' as the server name for LB request.", this, + server_name_); + } + grpc_uri_destroy(uri); + // Record LB call timeout. + arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); + lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX}); + // Record fallback timeout. + arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); + lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer( + arg, {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX}); + // Process channel args. + ProcessChannelArgsLocked(*args.args); +} + +XdsLb::~XdsLb() { + GPR_ASSERT(pending_picks_ == nullptr); + gpr_mu_destroy(&lb_channel_mu_); + gpr_free((void*)server_name_); + grpc_channel_args_destroy(args_); + grpc_connectivity_state_destroy(&state_tracker_); + if (serverlist_ != nullptr) { + xds_grpclb_destroy_serverlist(serverlist_); + } + if (fallback_backend_addresses_ != nullptr) { + grpc_lb_addresses_destroy(fallback_backend_addresses_); + } + grpc_subchannel_index_unref(); +} + +void XdsLb::ShutdownLocked() { + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); + shutting_down_ = true; + lb_calld_.reset(); + if (retry_timer_callback_pending_) { + grpc_timer_cancel(&lb_call_retry_timer_); + } + if (fallback_timer_callback_pending_) { + grpc_timer_cancel(&lb_fallback_timer_); + } + rr_policy_.reset(); + TryReresolutionLocked(&grpc_lb_xds_trace, GRPC_ERROR_CANCELLED); + // We destroy the LB channel here instead of in our destructor because + // destroying the channel triggers a last callback to + // OnBalancerChannelConnectivityChangedLocked(), and we need to be + // alive when that callback is invoked. + if (lb_channel_ != nullptr) { + gpr_mu_lock(&lb_channel_mu_); + grpc_channel_destroy(lb_channel_); + lb_channel_ = nullptr; + gpr_mu_unlock(&lb_channel_mu_); + } + grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_REF(error), "grpclb_shutdown"); + // Clear pending picks. + PendingPick* pp; + while ((pp = pending_picks_) != nullptr) { + pending_picks_ = pp->next; + pp->pick->connected_subchannel.reset(); + // Note: pp is deleted in this callback. + GRPC_CLOSURE_SCHED(&pp->on_complete, GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); +} + +// +// public methods +// + +void XdsLb::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { + PendingPick* pp; + while ((pp = pending_picks_) != nullptr) { + pending_picks_ = pp->next; + pp->pick->on_complete = pp->original_on_complete; + pp->pick->user_data = nullptr; + grpc_error* error = GRPC_ERROR_NONE; + if (new_policy->PickLocked(pp->pick, &error)) { + // Synchronous return; schedule closure. + GRPC_CLOSURE_SCHED(pp->pick->on_complete, error); + } + Delete(pp); + } +} + +// Cancel a specific pending pick. +// +// A grpclb pick progresses as follows: +// - If there's a Round Robin policy (rr_policy_) available, it'll be +// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From +// that point onwards, it'll be RR's responsibility. For cancellations, that +// implies the pick needs also be cancelled by the RR instance. +// - Otherwise, without an RR instance, picks stay pending at this policy's +// level (grpclb), inside the pending_picks_ list. To cancel these, +// we invoke the completion closure and set the pick's connected +// subchannel to nullptr right here. +void XdsLb::CancelPickLocked(PickState* pick, grpc_error* error) { + PendingPick* pp = pending_picks_; + pending_picks_ = nullptr; + while (pp != nullptr) { + PendingPick* next = pp->next; + if (pp->pick == pick) { + pick->connected_subchannel.reset(); + // Note: pp is deleted in this callback. + GRPC_CLOSURE_SCHED(&pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick Cancelled", &error, 1)); + } else { + pp->next = pending_picks_; + pending_picks_ = pp; + } + pp = next; + } + if (rr_policy_ != nullptr) { + rr_policy_->CancelPickLocked(pick, GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); +} + +// Cancel all pending picks. +// +// A grpclb pick progresses as follows: +// - If there's a Round Robin policy (rr_policy_) available, it'll be +// handed over to the RR policy (in CreateRoundRobinPolicyLocked()). From +// that point onwards, it'll be RR's responsibility. For cancellations, that +// implies the pick needs also be cancelled by the RR instance. +// - Otherwise, without an RR instance, picks stay pending at this policy's +// level (grpclb), inside the pending_picks_ list. To cancel these, +// we invoke the completion closure and set the pick's connected +// subchannel to nullptr right here. +void XdsLb::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error* error) { + PendingPick* pp = pending_picks_; + pending_picks_ = nullptr; + while (pp != nullptr) { + PendingPick* next = pp->next; + if ((pp->pick->initial_metadata_flags & initial_metadata_flags_mask) == + initial_metadata_flags_eq) { + // Note: pp is deleted in this callback. + GRPC_CLOSURE_SCHED(&pp->on_complete, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick Cancelled", &error, 1)); + } else { + pp->next = pending_picks_; + pending_picks_ = pp; + } + pp = next; + } + if (rr_policy_ != nullptr) { + rr_policy_->CancelMatchingPicksLocked(initial_metadata_flags_mask, + initial_metadata_flags_eq, + GRPC_ERROR_REF(error)); + } + GRPC_ERROR_UNREF(error); +} + +void XdsLb::ExitIdleLocked() { + if (!started_picking_) { + StartPickingLocked(); + } +} + +void XdsLb::ResetBackoffLocked() { + if (lb_channel_ != nullptr) { + grpc_channel_reset_connect_backoff(lb_channel_); + } + if (rr_policy_ != nullptr) { + rr_policy_->ResetBackoffLocked(); + } +} + +bool XdsLb::PickLocked(PickState* pick, grpc_error** error) { + PendingPick* pp = PendingPickCreate(pick); + bool pick_done = false; + if (rr_policy_ != nullptr) { + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, "[xdslb %p] about to PICK from RR %p", this, + rr_policy_.get()); + } + pick_done = + PickFromRoundRobinPolicyLocked(false /* force_async */, pp, error); + } else { // rr_policy_ == NULL + if (pick->on_complete == nullptr) { + *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "No pick result available but synchronous result required."); + pick_done = true; + } else { + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] No RR policy. Adding to grpclb's pending picks", + this); + } + AddPendingPick(pp); + if (!started_picking_) { + StartPickingLocked(); + } + pick_done = false; + } + } + return pick_done; +} + +void XdsLb::FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels, + channelz::ChildRefsList* child_channels) { + // delegate to the RoundRobin to fill the children subchannels. + rr_policy_->FillChildRefsForChannelz(child_subchannels, child_channels); + MutexLock lock(&lb_channel_mu_); + if (lb_channel_ != nullptr) { + grpc_core::channelz::ChannelNode* channel_node = + grpc_channel_get_channelz_node(lb_channel_); + if (channel_node != nullptr) { + child_channels->push_back(channel_node->uuid()); + } + } +} + +grpc_connectivity_state XdsLb::CheckConnectivityLocked( + grpc_error** connectivity_error) { + return grpc_connectivity_state_get(&state_tracker_, connectivity_error); +} + +void XdsLb::NotifyOnStateChangeLocked(grpc_connectivity_state* current, + grpc_closure* notify) { + grpc_connectivity_state_notify_on_state_change(&state_tracker_, current, + notify); +} + +void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) { + const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); + if (GPR_UNLIKELY(arg == nullptr || arg->type != GRPC_ARG_POINTER)) { + // Ignore this update. + gpr_log(GPR_ERROR, + "[xdslb %p] No valid LB addresses channel arg in update, ignoring.", + this); + return; + } + const grpc_lb_addresses* addresses = + static_cast<const grpc_lb_addresses*>(arg->value.pointer.p); + // Update fallback address list. + if (fallback_backend_addresses_ != nullptr) { + grpc_lb_addresses_destroy(fallback_backend_addresses_); + } + fallback_backend_addresses_ = ExtractBackendAddresses(addresses); + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, + // since we use this to trigger the client_load_reporting filter. + static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; + grpc_arg new_arg = grpc_channel_arg_string_create( + (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"xds"); + grpc_channel_args_destroy(args_); + args_ = grpc_channel_args_copy_and_add_and_remove( + &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + // Construct args for balancer channel. + grpc_channel_args* lb_channel_args = + BuildBalancerChannelArgs(addresses, response_generator_.get(), &args); + // Create balancer channel if needed. + if (lb_channel_ == nullptr) { + char* uri_str; + gpr_asprintf(&uri_str, "fake:///%s", server_name_); + gpr_mu_lock(&lb_channel_mu_); + lb_channel_ = grpc_client_channel_factory_create_channel( + client_channel_factory(), uri_str, + GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, lb_channel_args); + gpr_mu_unlock(&lb_channel_mu_); + GPR_ASSERT(lb_channel_ != nullptr); + gpr_free(uri_str); + } + // Propagate updates to the LB channel (pick_first) through the fake + // resolver. + response_generator_->SetResponse(lb_channel_args); + grpc_channel_args_destroy(lb_channel_args); +} + +void XdsLb::UpdateLocked(const grpc_channel_args& args) { + ProcessChannelArgsLocked(args); + // If fallback is configured and the RR policy already exists, update + // it with the new fallback addresses. + if (lb_fallback_timeout_ms_ > 0 && rr_policy_ != nullptr) { + CreateOrUpdateRoundRobinPolicyLocked(); + } + // Start watching the LB channel connectivity for connection, if not + // already doing so. + if (!watching_lb_channel_) { + lb_channel_connectivity_ = grpc_channel_check_connectivity_state( + lb_channel_, true /* try to connect */); + grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(lb_channel_)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + watching_lb_channel_ = true; + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity"); + self.release(); + grpc_client_channel_watch_connectivity_state( + client_channel_elem, + grpc_polling_entity_create_from_pollset_set(interested_parties()), + &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_, + nullptr); + } +} + +// +// code for balancer channel and call +// + +void XdsLb::StartPickingLocked() { + // Start a timer to fall back. + if (lb_fallback_timeout_ms_ > 0 && serverlist_ == nullptr && + !fallback_timer_callback_pending_) { + grpc_millis deadline = ExecCtx::Get()->Now() + lb_fallback_timeout_ms_; + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = Ref(DEBUG_LOCATION, "on_fallback_timer"); + self.release(); + GRPC_CLOSURE_INIT(&lb_on_fallback_, &XdsLb::OnFallbackTimerLocked, this, + grpc_combiner_scheduler(combiner())); + fallback_timer_callback_pending_ = true; + grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_); + } + started_picking_ = true; + StartBalancerCallLocked(); +} + +void XdsLb::StartBalancerCallLocked() { + GPR_ASSERT(lb_channel_ != nullptr); + if (shutting_down_) return; + // Init the LB call data. + GPR_ASSERT(lb_calld_ == nullptr); + lb_calld_ = MakeOrphanable<BalancerCallState>(Ref()); + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Query for backends (lb_channel: %p, lb_calld: %p)", + this, lb_channel_, lb_calld_.get()); + } + lb_calld_->StartQuery(); +} + +void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) { + XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); + xdslb_policy->fallback_timer_callback_pending_ = false; + // If we receive a serverlist after the timer fires but before this callback + // actually runs, don't fall back. + if (xdslb_policy->serverlist_ == nullptr && !xdslb_policy->shutting_down_ && + error == GRPC_ERROR_NONE) { + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, "[xdslb %p] Falling back to use backends from resolver", + xdslb_policy); + } + GPR_ASSERT(xdslb_policy->fallback_backend_addresses_ != nullptr); + xdslb_policy->CreateOrUpdateRoundRobinPolicyLocked(); + } + xdslb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer"); +} + +void XdsLb::StartBalancerCallRetryTimerLocked() { + grpc_millis next_try = lb_call_backoff_.NextAttemptTime(); + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, "[xdslb %p] Connection to LB server lost...", this); + grpc_millis timeout = next_try - ExecCtx::Get()->Now(); + if (timeout > 0) { + gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active in %" PRId64 "ms.", + this, timeout); + } else { + gpr_log(GPR_INFO, "[xdslb %p] ... retry_timer_active immediately.", this); + } + } + // TODO(roth): We currently track this ref manually. Once the + // ClosureRef API is ready, we should pass the RefCountedPtr<> along + // with the callback. + auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); + self.release(); + GRPC_CLOSURE_INIT(&lb_on_call_retry_, &XdsLb::OnBalancerCallRetryTimerLocked, + this, grpc_combiner_scheduler(combiner())); + retry_timer_callback_pending_ = true; + grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_); +} + +void XdsLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) { + XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); + xdslb_policy->retry_timer_callback_pending_ = false; + if (!xdslb_policy->shutting_down_ && error == GRPC_ERROR_NONE && + xdslb_policy->lb_calld_ == nullptr) { + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, "[xdslb %p] Restarting call to LB server", + xdslb_policy); + } + xdslb_policy->StartBalancerCallLocked(); + } + xdslb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer"); +} + +// Invoked as part of the update process. It continues watching the LB channel +// until it shuts down or becomes READY. It's invoked even if the LB channel +// stayed READY throughout the update (for example if the update is identical). +void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg, + grpc_error* error) { + XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); + if (xdslb_policy->shutting_down_) goto done; + // Re-initialize the lb_call. This should also take care of updating the + // embedded RR policy. Note that the current RR policy, if any, will stay in + // effect until an update from the new lb_call is received. + switch (xdslb_policy->lb_channel_connectivity_) { + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_TRANSIENT_FAILURE: { + // Keep watching the LB channel. + grpc_channel_element* client_channel_elem = + grpc_channel_stack_last_element( + grpc_channel_get_channel_stack(xdslb_policy->lb_channel_)); + GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter); + grpc_client_channel_watch_connectivity_state( + client_channel_elem, + grpc_polling_entity_create_from_pollset_set( + xdslb_policy->interested_parties()), + &xdslb_policy->lb_channel_connectivity_, + &xdslb_policy->lb_channel_on_connectivity_changed_, nullptr); + break; + } + // The LB channel may be IDLE because it's shut down before the update. + // Restart the LB call to kick the LB channel into gear. + case GRPC_CHANNEL_IDLE: + case GRPC_CHANNEL_READY: + xdslb_policy->lb_calld_.reset(); + if (xdslb_policy->started_picking_) { + if (xdslb_policy->retry_timer_callback_pending_) { + grpc_timer_cancel(&xdslb_policy->lb_call_retry_timer_); + } + xdslb_policy->lb_call_backoff_.Reset(); + xdslb_policy->StartBalancerCallLocked(); + } + [[fallthrough]]; + // Fall through. + case GRPC_CHANNEL_SHUTDOWN: + done: + xdslb_policy->watching_lb_channel_ = false; + xdslb_policy->Unref(DEBUG_LOCATION, + "watch_lb_channel_connectivity_cb_shutdown"); + } +} + +// +// PendingPick +// + +// Adds lb_token of selected subchannel (address) to the call's initial +// metadata. +grpc_error* AddLbTokenToInitialMetadata( + grpc_mdelem lb_token, grpc_linked_mdelem* lb_token_mdelem_storage, + grpc_metadata_batch* initial_metadata) { + GPR_ASSERT(lb_token_mdelem_storage != nullptr); + GPR_ASSERT(!GRPC_MDISNULL(lb_token)); + return grpc_metadata_batch_add_tail(initial_metadata, lb_token_mdelem_storage, + lb_token); +} + +// Destroy function used when embedding client stats in call context. +void DestroyClientStats(void* arg) { + static_cast<XdsLbClientStats*>(arg)->Unref(); +} + +void XdsLb::PendingPickSetMetadataAndContext(PendingPick* pp) { + /* if connected_subchannel is nullptr, no pick has been made by the RR + * policy (e.g., all addresses failed to connect). There won't be any + * user_data/token available */ + if (pp->pick->connected_subchannel != nullptr) { + if (GPR_LIKELY(!GRPC_MDISNULL(pp->lb_token))) { + AddLbTokenToInitialMetadata(GRPC_MDELEM_REF(pp->lb_token), + &pp->pick->lb_token_mdelem_storage, + pp->pick->initial_metadata); + } else { + gpr_log(GPR_ERROR, + "[xdslb %p] No LB token for connected subchannel pick %p", + pp->xdslb_policy, pp->pick); + abort(); + } + // Pass on client stats via context. Passes ownership of the reference. + if (pp->client_stats != nullptr) { + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = + pp->client_stats.release(); + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = + DestroyClientStats; + } + } else { + pp->client_stats.reset(); + } +} + +/* The \a on_complete closure passed as part of the pick requires keeping a + * reference to its associated round robin instance. We wrap this closure in + * order to unref the round robin instance upon its invocation */ +void XdsLb::OnPendingPickComplete(void* arg, grpc_error* error) { + PendingPick* pp = static_cast<PendingPick*>(arg); + PendingPickSetMetadataAndContext(pp); + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_REF(error)); + Delete(pp); +} + +XdsLb::PendingPick* XdsLb::PendingPickCreate(PickState* pick) { + PendingPick* pp = New<PendingPick>(); + pp->xdslb_policy = this; + pp->pick = pick; + GRPC_CLOSURE_INIT(&pp->on_complete, &XdsLb::OnPendingPickComplete, pp, + grpc_schedule_on_exec_ctx); + pp->original_on_complete = pick->on_complete; + pick->on_complete = &pp->on_complete; + return pp; +} + +void XdsLb::AddPendingPick(PendingPick* pp) { + pp->next = pending_picks_; + pending_picks_ = pp; +} + +// +// code for interacting with the RR policy +// + +// Performs a pick over \a rr_policy_. Given that a pick can return +// immediately (ignoring its completion callback), we need to perform the +// cleanups this callback would otherwise be responsible for. +// If \a force_async is true, then we will manually schedule the +// completion callback even if the pick is available immediately. +bool XdsLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp, + grpc_error** error) { + // Check for drops if we are not using fallback backend addresses. + if (serverlist_ != nullptr) { + // Look at the index into the serverlist to see if we should drop this call. + xds_grpclb_server* server = serverlist_->servers[serverlist_index_++]; + if (serverlist_index_ == serverlist_->num_servers) { + serverlist_index_ = 0; // Wrap-around. + } + if (server->drop) { + // Update client load reporting stats to indicate the number of + // dropped calls. Note that we have to do this here instead of in + // the client_load_reporting filter, because we do not create a + // subchannel call (and therefore no client_load_reporting filter) + // for dropped calls. + if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { + lb_calld_->client_stats()->AddCallDroppedLocked( + server->load_balance_token); + } + if (force_async) { + GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); + Delete(pp); + return false; + } + Delete(pp); + return true; + } + } + // Set client_stats and user_data. + if (lb_calld_ != nullptr && lb_calld_->client_stats() != nullptr) { + pp->client_stats = lb_calld_->client_stats()->Ref(); + } + GPR_ASSERT(pp->pick->user_data == nullptr); + pp->pick->user_data = (void**)&pp->lb_token; + // Pick via the RR policy. + bool pick_done = rr_policy_->PickLocked(pp->pick, error); + if (pick_done) { + PendingPickSetMetadataAndContext(pp); + if (force_async) { + GRPC_CLOSURE_SCHED(pp->original_on_complete, *error); + *error = GRPC_ERROR_NONE; + pick_done = false; + } + Delete(pp); + } + // else, the pending pick will be registered and taken care of by the + // pending pick list inside the RR policy. Eventually, + // OnPendingPickComplete() will be called, which will (among other + // things) add the LB token to the call's initial metadata. + return pick_done; +} + +void XdsLb::CreateRoundRobinPolicyLocked(const Args& args) { + GPR_ASSERT(rr_policy_ == nullptr); + rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( + "round_robin", args); + if (GPR_UNLIKELY(rr_policy_ == nullptr)) { + gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a RoundRobin policy", this); + return; + } + // TODO(roth): We currently track this ref manually. Once the new + // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. + auto self = Ref(DEBUG_LOCATION, "on_rr_reresolution_requested"); + self.release(); + rr_policy_->SetReresolutionClosureLocked(&on_rr_request_reresolution_); + grpc_error* rr_state_error = nullptr; + rr_connectivity_state_ = rr_policy_->CheckConnectivityLocked(&rr_state_error); + // Connectivity state is a function of the RR policy updated/created. + UpdateConnectivityStateFromRoundRobinPolicyLocked(rr_state_error); + // Add the gRPC LB's interested_parties pollset_set to that of the newly + // created RR policy. This will make the RR policy progress upon activity on + // gRPC LB, which in turn is tied to the application's call. + grpc_pollset_set_add_pollset_set(rr_policy_->interested_parties(), + interested_parties()); + // Subscribe to changes to the connectivity of the new RR. + // TODO(roth): We currently track this ref manually. Once the new + // ClosureRef API is done, pass the RefCountedPtr<> along with the closure. + self = Ref(DEBUG_LOCATION, "on_rr_connectivity_changed"); + self.release(); + rr_policy_->NotifyOnStateChangeLocked(&rr_connectivity_state_, + &on_rr_connectivity_changed_); + rr_policy_->ExitIdleLocked(); + // Send pending picks to RR policy. + PendingPick* pp; + while ((pp = pending_picks_)) { + pending_picks_ = pp->next; + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, + "[xdslb %p] Pending pick about to (async) PICK from RR %p", this, + rr_policy_.get()); + } + grpc_error* error = GRPC_ERROR_NONE; + PickFromRoundRobinPolicyLocked(true /* force_async */, pp, &error); + } +} + +grpc_channel_args* XdsLb::CreateRoundRobinPolicyArgsLocked() { + grpc_lb_addresses* addresses; + bool is_backend_from_grpclb_load_balancer = false; + if (serverlist_ != nullptr) { + GPR_ASSERT(serverlist_->num_servers > 0); + addresses = ProcessServerlist(serverlist_); + is_backend_from_grpclb_load_balancer = true; + } else { + // If CreateOrUpdateRoundRobinPolicyLocked() is invoked when we haven't + // received any serverlist from the balancer, we use the fallback backends + // returned by the resolver. Note that the fallback backend list may be + // empty, in which case the new round_robin policy will keep the requested + // picks pending. + GPR_ASSERT(fallback_backend_addresses_ != nullptr); + addresses = grpc_lb_addresses_copy(fallback_backend_addresses_); + } + GPR_ASSERT(addresses != nullptr); + // Replace the LB addresses in the channel args that we pass down to + // the subchannel. + static const char* keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; + const grpc_arg args_to_add[] = { + grpc_lb_addresses_create_channel_arg(addresses), + // A channel arg indicating if the target is a backend inferred from a + // grpclb load balancer. + grpc_channel_arg_integer_create( + const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER), + is_backend_from_grpclb_load_balancer), + }; + grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove( + args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add, + GPR_ARRAY_SIZE(args_to_add)); + grpc_lb_addresses_destroy(addresses); + return args; +} + +void XdsLb::CreateOrUpdateRoundRobinPolicyLocked() { + if (shutting_down_) return; + grpc_channel_args* args = CreateRoundRobinPolicyArgsLocked(); + GPR_ASSERT(args != nullptr); + if (rr_policy_ != nullptr) { + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, "[xdslb %p] Updating RR policy %p", this, + rr_policy_.get()); + } + rr_policy_->UpdateLocked(*args); + } else { + LoadBalancingPolicy::Args lb_policy_args; + lb_policy_args.combiner = combiner(); + lb_policy_args.client_channel_factory = client_channel_factory(); + lb_policy_args.args = args; + CreateRoundRobinPolicyLocked(lb_policy_args); + if (grpc_lb_xds_trace.enabled()) { + gpr_log(GPR_INFO, "[xdslb %p] Created new RR policy %p", this, + rr_policy_.get()); + } + } + grpc_channel_args_destroy(args); +} + +void XdsLb::OnRoundRobinRequestReresolutionLocked(void* arg, + grpc_error* error) { + XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); + if (xdslb_policy->shutting_down_ || error != GRPC_ERROR_NONE) { + xdslb_policy->Unref(DEBUG_LOCATION, "on_rr_reresolution_requested"); + return; + } + if (grpc_lb_xds_trace.enabled()) { + gpr_log( + GPR_INFO, + "[xdslb %p] Re-resolution requested from the internal RR policy (%p).", + xdslb_policy, xdslb_policy->rr_policy_.get()); + } + // If we are talking to a balancer, we expect to get updated addresses form + // the balancer, so we can ignore the re-resolution request from the RR + // policy. Otherwise, handle the re-resolution request using the + // grpclb policy's original re-resolution closure. + if (xdslb_policy->lb_calld_ == nullptr || + !xdslb_policy->lb_calld_->seen_initial_response()) { + xdslb_policy->TryReresolutionLocked(&grpc_lb_xds_trace, GRPC_ERROR_NONE); + } + // Give back the wrapper closure to the RR policy. + xdslb_policy->rr_policy_->SetReresolutionClosureLocked( + &xdslb_policy->on_rr_request_reresolution_); +} + +void XdsLb::UpdateConnectivityStateFromRoundRobinPolicyLocked( + grpc_error* rr_state_error) { + const grpc_connectivity_state curr_glb_state = + grpc_connectivity_state_check(&state_tracker_); + /* The new connectivity status is a function of the previous one and the new + * input coming from the status of the RR policy. + * + * current state (grpclb's) + * | + * v || I | C | R | TF | SD | <- new state (RR's) + * ===++====+=====+=====+======+======+ + * I || I | C | R | [I] | [I] | + * ---++----+-----+-----+------+------+ + * C || I | C | R | [C] | [C] | + * ---++----+-----+-----+------+------+ + * R || I | C | R | [R] | [R] | + * ---++----+-----+-----+------+------+ + * TF || I | C | R | [TF] | [TF] | + * ---++----+-----+-----+------+------+ + * SD || NA | NA | NA | NA | NA | (*) + * ---++----+-----+-----+------+------+ + * + * A [STATE] indicates that the old RR policy is kept. In those cases, STATE + * is the current state of grpclb, which is left untouched. + * + * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to + * the previous RR instance. + * + * Note that the status is never updated to SHUTDOWN as a result of calling + * this function. Only glb_shutdown() has the power to set that state. + * + * (*) This function mustn't be called during shutting down. */ + GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN); + switch (rr_connectivity_state_) { + case GRPC_CHANNEL_TRANSIENT_FAILURE: + case GRPC_CHANNEL_SHUTDOWN: + GPR_ASSERT(rr_state_error != GRPC_ERROR_NONE); + break; + case GRPC_CHANNEL_IDLE: + case GRPC_CHANNEL_CONNECTING: + case GRPC_CHANNEL_READY: + GPR_ASSERT(rr_state_error == GRPC_ERROR_NONE); + } + if (grpc_lb_xds_trace.enabled()) { + gpr_log( + GPR_INFO, + "[xdslb %p] Setting grpclb's state to %s from new RR policy %p state.", + this, grpc_connectivity_state_name(rr_connectivity_state_), + rr_policy_.get()); + } + grpc_connectivity_state_set(&state_tracker_, rr_connectivity_state_, + rr_state_error, + "update_lb_connectivity_status_locked"); +} + +void XdsLb::OnRoundRobinConnectivityChangedLocked(void* arg, + grpc_error* error) { + XdsLb* xdslb_policy = static_cast<XdsLb*>(arg); + if (xdslb_policy->shutting_down_) { + xdslb_policy->Unref(DEBUG_LOCATION, "on_rr_connectivity_changed"); + return; + } + xdslb_policy->UpdateConnectivityStateFromRoundRobinPolicyLocked( + GRPC_ERROR_REF(error)); + // Resubscribe. Reuse the "on_rr_connectivity_changed" ref. + xdslb_policy->rr_policy_->NotifyOnStateChangeLocked( + &xdslb_policy->rr_connectivity_state_, + &xdslb_policy->on_rr_connectivity_changed_); +} + +// +// factory +// + +class XdsFactory : public LoadBalancingPolicyFactory { + public: + OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( + const LoadBalancingPolicy::Args& args) const override { + /* Count the number of gRPC-LB addresses. There must be at least one. */ + const grpc_arg* arg = + grpc_channel_args_find(args.args, GRPC_ARG_LB_ADDRESSES); + if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { + return nullptr; + } + grpc_lb_addresses* addresses = + static_cast<grpc_lb_addresses*>(arg->value.pointer.p); + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + } + if (num_grpclb_addrs == 0) return nullptr; + return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(addresses, args)); + } + + const char* name() const override { return "xds"; } +}; + +} // namespace + +} // namespace grpc_core + +// +// Plugin registration +// + +namespace { + +// Only add client_load_reporting filter if the grpclb LB policy is used. +bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder, + void* arg) { + const grpc_channel_args* args = + grpc_channel_stack_builder_get_channel_arguments(builder); + const grpc_arg* channel_arg = + grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME); + if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING && + strcmp(channel_arg->value.string, "grpclb") == 0) { + return grpc_channel_stack_builder_append_filter( + builder, (const grpc_channel_filter*)arg, nullptr, nullptr); + } + return true; +} + +} // namespace + +void grpc_lb_policy_xds_init() { + grpc_core::LoadBalancingPolicyRegistry::Builder:: + RegisterLoadBalancingPolicyFactory( + grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( + grpc_core::New<grpc_core::XdsFactory>())); + grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, + GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_client_load_reporting_filter, + (void*)&xds_client_load_reporting_filter); +} + +void grpc_lb_policy_xds_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds.h new file mode 100644 index 0000000000..8b20680f2d --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds.h @@ -0,0 +1,36 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H + +#include <grpc/support/port_platform.h> + +/** Channel arg indicating if a target corresponding to the address is grpclb + * loadbalancer. The type of this arg is an integer and the value is treated as + * a bool. */ +#define GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER \ + "grpc.address_is_xds_load_balancer" +/** Channel arg indicating if a target corresponding to the address is a backend + * received from a balancer. The type of this arg is an integer and the value is + * treated as a bool. */ +#define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER \ + "grpc.address_is_backend_from_xds_load_balancer" + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.cc new file mode 100644 index 0000000000..0aa145a24e --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.cc @@ -0,0 +1,26 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h" + +grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( + grpc_channel_args* args) { + return args; +} diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h new file mode 100644 index 0000000000..32c4acc8a3 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h @@ -0,0 +1,36 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_H + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/lb_policy_factory.h" + +/// Makes any necessary modifications to \a args for use in the xds +/// balancer channel. +/// +/// Takes ownership of \a args. +/// +/// Caller takes ownership of the returned args. +grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( + grpc_channel_args* args); + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc new file mode 100644 index 0000000000..5ab72efce4 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc @@ -0,0 +1,107 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> +#include <string.h> + +#include "src/core/ext/filters/client_channel/client_channel.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/transport/target_authority_table.h" +#include "src/core/lib/slice/slice_internal.h" + +namespace grpc_core { +namespace { + +int BalancerNameCmp(const grpc_core::UniquePtr<char>& a, + const grpc_core::UniquePtr<char>& b) { + return strcmp(a.get(), b.get()); +} + +RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable( + grpc_lb_addresses* addresses) { + TargetAuthorityTable::Entry* target_authority_entries = + static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc( + sizeof(*target_authority_entries) * addresses->num_addresses)); + for (size_t i = 0; i < addresses->num_addresses; ++i) { + char* addr_str; + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_str, &addresses->addresses[i].address, true) > 0); + target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str); + target_authority_entries[i].value.reset( + gpr_strdup(addresses->addresses[i].balancer_name)); + gpr_free(addr_str); + } + RefCountedPtr<TargetAuthorityTable> target_authority_table = + TargetAuthorityTable::Create(addresses->num_addresses, + target_authority_entries, BalancerNameCmp); + gpr_free(target_authority_entries); + return target_authority_table; +} + +} // namespace +} // namespace grpc_core + +grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( + grpc_channel_args* args) { + const char* args_to_remove[1]; + size_t num_args_to_remove = 0; + grpc_arg args_to_add[2]; + size_t num_args_to_add = 0; + // Add arg for targets info table. + const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES); + GPR_ASSERT(arg != nullptr); + GPR_ASSERT(arg->type == GRPC_ARG_POINTER); + grpc_lb_addresses* addresses = + static_cast<grpc_lb_addresses*>(arg->value.pointer.p); + grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable> + target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses); + args_to_add[num_args_to_add++] = + grpc_core::CreateTargetAuthorityTableChannelArg( + target_authority_table.get()); + // Substitute the channel credentials with a version without call + // credentials: the load balancer is not necessarily trusted to handle + // bearer token credentials. + grpc_channel_credentials* channel_credentials = + grpc_channel_credentials_find_in_args(args); + grpc_channel_credentials* creds_sans_call_creds = nullptr; + if (channel_credentials != nullptr) { + creds_sans_call_creds = + grpc_channel_credentials_duplicate_without_call_credentials( + channel_credentials); + GPR_ASSERT(creds_sans_call_creds != nullptr); + args_to_remove[num_args_to_remove++] = GRPC_ARG_CHANNEL_CREDENTIALS; + args_to_add[num_args_to_add++] = + grpc_channel_credentials_to_arg(creds_sans_call_creds); + } + grpc_channel_args* result = grpc_channel_args_copy_and_add_and_remove( + args, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add); + // Clean up. + grpc_channel_args_destroy(args); + if (creds_sans_call_creds != nullptr) { + grpc_channel_credentials_unref(creds_sans_call_creds); + } + return result; +} diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc new file mode 100644 index 0000000000..cdf5408be3 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc @@ -0,0 +1,85 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" + +#include <grpc/support/atm.h> +#include <grpc/support/string_util.h> +#include <string.h> + +namespace grpc_core { + +void XdsLbClientStats::AddCallStarted() { + gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1); +} + +void XdsLbClientStats::AddCallFinished(bool finished_with_client_failed_to_send, + bool finished_known_received) { + gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1); + if (finished_with_client_failed_to_send) { + gpr_atm_full_fetch_add(&num_calls_finished_with_client_failed_to_send_, + (gpr_atm)1); + } + if (finished_known_received) { + gpr_atm_full_fetch_add(&num_calls_finished_known_received_, (gpr_atm)1); + } +} + +void XdsLbClientStats::AddCallDroppedLocked(char* token) { + // Increment num_calls_started and num_calls_finished. + gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1); + gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1); + // Record the drop. + if (drop_token_counts_ == nullptr) { + drop_token_counts_.reset(New<DroppedCallCounts>()); + } + for (size_t i = 0; i < drop_token_counts_->size(); ++i) { + if (strcmp((*drop_token_counts_)[i].token.get(), token) == 0) { + ++(*drop_token_counts_)[i].count; + return; + } + } + // Not found, so add a new entry. + drop_token_counts_->emplace_back(UniquePtr<char>(gpr_strdup(token)), 1); +} + +namespace { + +void AtomicGetAndResetCounter(int64_t* value, gpr_atm* counter) { + *value = static_cast<int64_t>(gpr_atm_full_xchg(counter, (gpr_atm)0)); +} + +} // namespace + +void XdsLbClientStats::GetLocked( + int64_t* num_calls_started, int64_t* num_calls_finished, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received, + UniquePtr<DroppedCallCounts>* drop_token_counts) { + AtomicGetAndResetCounter(num_calls_started, &num_calls_started_); + AtomicGetAndResetCounter(num_calls_finished, &num_calls_finished_); + AtomicGetAndResetCounter(num_calls_finished_with_client_failed_to_send, + &num_calls_finished_with_client_failed_to_send_); + AtomicGetAndResetCounter(num_calls_finished_known_received, + &num_calls_finished_known_received_); + *drop_token_counts = std::move(drop_token_counts_); +} + +} // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h new file mode 100644 index 0000000000..fa0b9f4b63 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h @@ -0,0 +1,72 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CLIENT_STATS_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CLIENT_STATS_H + +#include <grpc/support/port_platform.h> + +#include <grpc/support/atm.h> + +#include "src/core/lib/gprpp/inlined_vector.h" +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/gprpp/ref_counted.h" + +namespace grpc_core { + +class XdsLbClientStats : public RefCounted<XdsLbClientStats> { + public: + struct DropTokenCount { + UniquePtr<char> token; + int64_t count; + + DropTokenCount(UniquePtr<char> token, int64_t count) + : token(std::move(token)), count(count) {} + }; + + typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts; + + XdsLbClientStats() {} + + void AddCallStarted(); + void AddCallFinished(bool finished_with_client_failed_to_send, + bool finished_known_received); + + // This method is not thread-safe; caller must synchronize. + void AddCallDroppedLocked(char* token); + + // This method is not thread-safe; caller must synchronize. + void GetLocked(int64_t* num_calls_started, int64_t* num_calls_finished, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received, + UniquePtr<DroppedCallCounts>* drop_token_counts); + + private: + // This field must only be accessed via *_locked() methods. + UniquePtr<DroppedCallCounts> drop_token_counts_; + // These fields may be accessed from multiple threads at a time. + gpr_atm num_calls_started_ = 0; + gpr_atm num_calls_finished_ = 0; + gpr_atm num_calls_finished_with_client_failed_to_send_ = 0; + gpr_atm num_calls_finished_known_received_ = 0; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CLIENT_STATS_H \ + */ diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index 67e56ed791..e90a9e1f7f 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -38,6 +38,8 @@ namespace { // singleton instance of the registry. ChannelzRegistry* g_channelz_registry = nullptr; +const int kPaginationLimit = 100; + } // anonymous namespace void ChannelzRegistry::Init() { g_channelz_registry = New<ChannelzRegistry>(); } @@ -124,6 +126,7 @@ BaseNode* ChannelzRegistry::InternalGet(intptr_t uuid) { } char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { + MutexLock lock(&mu_); grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* json = top_level_json; grpc_json* json_iterator = nullptr; @@ -133,10 +136,18 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { // start_channel_id=0, which signifies "give me everything." Hence this // funky looking line below. size_t start_idx = start_channel_id == 0 ? 0 : start_channel_id - 1; + bool reached_pagination_limit = false; for (size_t i = start_idx; i < entities_.size(); ++i) { if (entities_[i] != nullptr && entities_[i]->type() == grpc_core::channelz::BaseNode::EntityType::kTopLevelChannel) { + // check if we are over pagination limit to determine if we need to set + // the "end" element. If we don't go through this block, we know that + // when the loop terminates, we have <= to kPaginationLimit. + if (top_level_channels.size() == kPaginationLimit) { + reached_pagination_limit = true; + break; + } top_level_channels.push_back(entities_[i]); } } @@ -150,17 +161,17 @@ char* ChannelzRegistry::InternalGetTopChannels(intptr_t start_channel_id) { grpc_json_link_child(array_parent, channel_json, json_iterator); } } - // For now we do not have any pagination rules. In the future we could - // pick a constant for max_channels_sent for a GetTopChannels request. - // Tracking: https://github.com/grpc/grpc/issues/16019. - json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr, - GRPC_JSON_TRUE, false); + if (!reached_pagination_limit) { + grpc_json_create_child(nullptr, json, "end", nullptr, GRPC_JSON_TRUE, + false); + } char* json_str = grpc_json_dump_to_string(top_level_json, 0); grpc_json_destroy(top_level_json); return json_str; } char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { + MutexLock lock(&mu_); grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); grpc_json* json = top_level_json; grpc_json* json_iterator = nullptr; @@ -169,10 +180,18 @@ char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { // reserved). However, we want to support requests coming in with // start_server_id=0, which signifies "give me everything." size_t start_idx = start_server_id == 0 ? 0 : start_server_id - 1; + bool reached_pagination_limit = false; for (size_t i = start_idx; i < entities_.size(); ++i) { if (entities_[i] != nullptr && entities_[i]->type() == grpc_core::channelz::BaseNode::EntityType::kServer) { + // check if we are over pagination limit to determine if we need to set + // the "end" element. If we don't go through this block, we know that + // when the loop terminates, we have <= to kPaginationLimit. + if (servers.size() == kPaginationLimit) { + reached_pagination_limit = true; + break; + } servers.push_back(entities_[i]); } } @@ -186,11 +205,10 @@ char* ChannelzRegistry::InternalGetServers(intptr_t start_server_id) { grpc_json_link_child(array_parent, server_json, json_iterator); } } - // For now we do not have any pagination rules. In the future we could - // pick a constant for max_channels_sent for a GetServers request. - // Tracking: https://github.com/grpc/grpc/issues/16019. - json_iterator = grpc_json_create_child(nullptr, json, "end", nullptr, - GRPC_JSON_TRUE, false); + if (!reached_pagination_limit) { + grpc_json_create_child(nullptr, json, "end", nullptr, GRPC_JSON_TRUE, + false); + } char* json_str = grpc_json_dump_to_string(top_level_json, 0); grpc_json_destroy(top_level_json); return json_str; diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index 50674b0845..bdfc1d70c3 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -307,7 +307,10 @@ grpc_error* grpc_set_socket_tcp_user_timeout( } } #else - gpr_log(GPR_INFO, "TCP_USER_TIMEOUT not supported for this platform"); + extern grpc_core::TraceFlag grpc_tcp_trace; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP_USER_TIMEOUT not supported for this platform"); + } #endif /* GRPC_HAVE_TCP_USER_TIMEOUT */ return GRPC_ERROR_NONE; } diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 5dc9991f70..b81ae73b4d 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -79,6 +79,7 @@ typedef struct non_polling_worker { typedef struct { gpr_mu mu; + bool kicked_without_poller; non_polling_worker* root; grpc_closure* shutdown; } non_polling_poller; @@ -103,6 +104,10 @@ static grpc_error* non_polling_poller_work(grpc_pollset* pollset, grpc_millis deadline) { non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset); if (npp->shutdown) return GRPC_ERROR_NONE; + if (npp->kicked_without_poller) { + npp->kicked_without_poller = false; + return GRPC_ERROR_NONE; + } non_polling_worker w; gpr_cv_init(&w.cv); if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w); @@ -148,6 +153,8 @@ static grpc_error* non_polling_poller_kick( w->kicked = true; gpr_cv_signal(&w->cv); } + } else { + p->kicked_without_poller = true; } return GRPC_ERROR_NONE; } diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m index 862909f238..85b95dee91 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.m +++ b/src/objective-c/GRPCClient/private/GRPCHost.m @@ -69,7 +69,7 @@ static NSMutableDictionary *kHostCache; // gRPC library. // TODO(jcanizales): Add unit tests for the types of addresses we want to let pass untouched. NSURL *hostURL = [NSURL URLWithString:[@"https://" stringByAppendingString:address]]; - if (hostURL.host && !hostURL.port) { + if (hostURL.host && hostURL.port == nil) { address = [hostURL.host stringByAppendingString:@":443"]; } @@ -193,6 +193,7 @@ static NSMutableDictionary *kHostCache; if (pemPrivateKey == nil && pemCertChain == nil) { creds = grpc_ssl_credentials_create(rootsASCII.bytes, NULL, NULL, NULL); } else { + assert(pemPrivateKey != nil && pemCertChain != nil); grpc_ssl_pem_key_cert_pair key_cert_pair; NSData *privateKeyASCII = [self nullTerminatedDataWithString:pemPrivateKey]; NSData *certChainASCII = [self nullTerminatedDataWithString:pemCertChain]; @@ -226,7 +227,7 @@ static NSMutableDictionary *kHostCache; args[@GRPC_SSL_TARGET_NAME_OVERRIDE_ARG] = _hostNameOverride; } - if (_responseSizeLimitOverride) { + if (_responseSizeLimitOverride != nil) { args[@GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH] = _responseSizeLimitOverride; } diff --git a/test/core/channel/channelz_test.cc b/test/core/channel/channelz_test.cc index 057d53b4ab..5ead12e870 100644 --- a/test/core/channel/channelz_test.cc +++ b/test/core/channel/channelz_test.cc @@ -226,7 +226,19 @@ void ChannelzSleep(int64_t sleep_us) { } // anonymous namespace -class ChannelzChannelTest : public ::testing::TestWithParam<size_t> {}; +class ChannelzChannelTest : public ::testing::TestWithParam<size_t> { + protected: + // ensure we always have a fresh registry for tests. + void SetUp() override { + ChannelzRegistry::Shutdown(); + ChannelzRegistry::Init(); + } + + void TearDown() override { + ChannelzRegistry::Shutdown(); + ChannelzRegistry::Init(); + } +}; TEST_P(ChannelzChannelTest, BasicChannel) { grpc_core::ExecCtx exec_ctx; @@ -313,6 +325,32 @@ TEST(ChannelzGetTopChannelsTest, ManyChannelsTest) { ValidateGetTopChannels(10); } +TEST(ChannelzGetTopChannelsTest, GetTopChannelsPagination) { + grpc_core::ExecCtx exec_ctx; + // this is over the pagination limit. + ChannelFixture channels[150]; + (void)channels; // suppress unused variable error + char* json_str = ChannelzRegistry::GetTopChannels(0); + grpc::testing::ValidateGetTopChannelsResponseProtoJsonTranslation(json_str); + grpc_json* parsed_json = grpc_json_parse_string(json_str); + // 100 is the pagination limit. + ValidateJsonArraySize(parsed_json, "channel", 100); + grpc_json* end = GetJsonChild(parsed_json, "end"); + EXPECT_EQ(end, nullptr); + grpc_json_destroy(parsed_json); + gpr_free(json_str); + // Now we get the rest + json_str = ChannelzRegistry::GetTopChannels(101); + grpc::testing::ValidateGetTopChannelsResponseProtoJsonTranslation(json_str); + parsed_json = grpc_json_parse_string(json_str); + ValidateJsonArraySize(parsed_json, "channel", 50); + end = GetJsonChild(parsed_json, "end"); + ASSERT_NE(end, nullptr); + EXPECT_EQ(end->type, GRPC_JSON_TRUE); + grpc_json_destroy(parsed_json); + gpr_free(json_str); +} + TEST(ChannelzGetTopChannelsTest, InternalChannelTest) { grpc_core::ExecCtx exec_ctx; ChannelFixture channels[10]; diff --git a/test/cpp/util/config_grpc_cli.h b/test/cpp/util/config_grpc_cli.h index 1df7b36e2e..358884196d 100644 --- a/test/cpp/util/config_grpc_cli.h +++ b/test/cpp/util/config_grpc_cli.h @@ -40,11 +40,6 @@ #define GRPC_CUSTOM_TEXTFORMAT ::google::protobuf::TextFormat #endif -#ifndef GRPC_CUSTOM_JSONUTIL -#include <google/protobuf/util/json_util.h> -#define GRPC_CUSTOM_JSONUTIL ::google::protobuf::util -#endif - #ifndef GRPC_CUSTOM_DISKSOURCETREE #include <google/protobuf/compiler/importer.h> #define GRPC_CUSTOM_DISKSOURCETREE ::google::protobuf::compiler::DiskSourceTree @@ -63,8 +58,6 @@ typedef GRPC_CUSTOM_MERGEDDESCRIPTORDATABASE MergedDescriptorDatabase; typedef GRPC_CUSTOM_TEXTFORMAT TextFormat; -namespace json = GRPC_CUSTOM_JSONUTIL; - namespace compiler { typedef GRPC_CUSTOM_DISKSOURCETREE DiskSourceTree; typedef GRPC_CUSTOM_IMPORTER Importer; diff --git a/test/cpp/util/grpc_tool.cc b/test/cpp/util/grpc_tool.cc index 80eaf4f727..ccc60cca27 100644 --- a/test/cpp/util/grpc_tool.cc +++ b/test/cpp/util/grpc_tool.cc @@ -57,8 +57,6 @@ DEFINE_string(proto_path, ".", "Path to look for the proto file."); DEFINE_string(protofiles, "", "Name of the proto file."); DEFINE_bool(binary_input, false, "Input in binary format"); DEFINE_bool(binary_output, false, "Output in binary format"); -DEFINE_bool(json_input, false, "Input in json format"); -DEFINE_bool(json_output, false, "Output in json format"); DEFINE_string(infile, "", "Input file (default is stdin)"); DEFINE_bool(batch, false, "Input contains multiple requests. Please do not use this to send " @@ -90,8 +88,6 @@ class GrpcTool { GrpcToolOutputCallback callback); bool ToText(int argc, const char** argv, const CliCredentials& cred, GrpcToolOutputCallback callback); - bool ToJson(int argc, const char** argv, const CliCredentials& cred, - GrpcToolOutputCallback callback); bool ToBinary(int argc, const char** argv, const CliCredentials& cred, GrpcToolOutputCallback callback); @@ -193,9 +189,8 @@ void ReadResponse(CliCall* call, const grpc::string& method_name, fprintf(stderr, "got response.\n"); if (!FLAGS_binary_output) { gpr_mu_lock(parser_mu); - serialized_response_proto = parser->GetFormattedStringFromMethod( - method_name, serialized_response_proto, false /* is_request */, - FLAGS_json_output); + serialized_response_proto = parser->GetTextFormatFromMethod( + method_name, serialized_response_proto, false /* is_request */); if (parser->HasError() && print_mode) { fprintf(stderr, "Failed to parse response.\n"); } @@ -238,7 +233,6 @@ const Command ops[] = { {"parse", BindWith5Args(&GrpcTool::ParseMessage), 2, 3}, {"totext", BindWith5Args(&GrpcTool::ToText), 2, 3}, {"tobinary", BindWith5Args(&GrpcTool::ToBinary), 2, 3}, - {"tojson", BindWith5Args(&GrpcTool::ToJson), 2, 3}, }; void Usage(const grpc::string& msg) { @@ -250,7 +244,6 @@ void Usage(const grpc::string& msg) { " grpc_cli type ... ; Print type\n" " grpc_cli parse ... ; Parse message\n" " grpc_cli totext ... ; Convert binary message to text\n" - " grpc_cli tojson ... ; Convert binary message to json\n" " grpc_cli tobinary ... ; Convert text message to binary\n" " grpc_cli help ... ; Print this message, or per-command usage\n" "\n", @@ -472,9 +465,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv, " --infile ; Input filename (defaults to stdin)\n" " --outfile ; Output filename (defaults to stdout)\n" " --binary_input ; Input in binary format\n" - " --binary_output ; Output in binary format\n" - " --json_input ; Input in json format\n" - " --json_output ; Output in json format\n" + + " --binary_output ; Output in binary format\n" + cred.GetCredentialUsage()); std::stringstream output_ss; @@ -557,8 +548,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv, } else { gpr_mu_lock(&parser_mu); serialized_request_proto = parser->GetSerializedProtoFromMethod( - method_name, request_text, true /* is_request */, - FLAGS_json_input); + method_name, request_text, true /* is_request */); request_text.clear(); if (parser->HasError()) { if (print_mode) { @@ -642,8 +632,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv, request_text.clear(); } else { serialized_request_proto = parser->GetSerializedProtoFromMethod( - method_name, request_text, true /* is_request */, - FLAGS_json_input); + method_name, request_text, true /* is_request */); request_text.clear(); if (parser->HasError()) { if (print_mode) { @@ -679,10 +668,9 @@ bool GrpcTool::CallMethod(int argc, const char** argv, break; } } else { - grpc::string response_text = parser->GetFormattedStringFromMethod( + grpc::string response_text = parser->GetTextFormatFromMethod( method_name, serialized_response_proto, - false /* is_request */, FLAGS_json_output); - + false /* is_request */); if (parser->HasError() && print_mode) { fprintf(stderr, "Failed to parse response.\n"); } else { @@ -739,7 +727,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv, serialized_request_proto = request_text; } else { serialized_request_proto = parser->GetSerializedProtoFromMethod( - method_name, request_text, true /* is_request */, FLAGS_json_input); + method_name, request_text, true /* is_request */); if (parser->HasError()) { fprintf(stderr, "Failed to parse request.\n"); return false; @@ -763,15 +751,13 @@ bool GrpcTool::CallMethod(int argc, const char** argv, receive_initial_metadata ? &server_initial_metadata : nullptr); receive_initial_metadata = false) { if (!FLAGS_binary_output) { - serialized_response_proto = parser->GetFormattedStringFromMethod( - method_name, serialized_response_proto, false /* is_request */, - FLAGS_json_output); + serialized_response_proto = parser->GetTextFormatFromMethod( + method_name, serialized_response_proto, false /* is_request */); if (parser->HasError()) { fprintf(stderr, "Failed to parse response.\n"); return false; } } - if (receive_initial_metadata) { PrintMetadata(server_initial_metadata, "Received initial metadata from server:"); @@ -811,9 +797,7 @@ bool GrpcTool::ParseMessage(int argc, const char** argv, " --infile ; Input filename (defaults to stdin)\n" " --outfile ; Output filename (defaults to stdout)\n" " --binary_input ; Input in binary format\n" - " --binary_output ; Output in binary format\n" - " --json_input ; Input in json format\n" - " --json_output ; Output in json format\n" + + " --binary_output ; Output in binary format\n" + cred.GetCredentialUsage()); std::stringstream output_ss; @@ -860,8 +844,8 @@ bool GrpcTool::ParseMessage(int argc, const char** argv, if (FLAGS_binary_input) { serialized_request_proto = message_text; } else { - serialized_request_proto = parser->GetSerializedProtoFromMessageType( - type_name, message_text, FLAGS_json_input); + serialized_request_proto = + parser->GetSerializedProtoFromMessageType(type_name, message_text); if (parser->HasError()) { fprintf(stderr, "Failed to serialize the message.\n"); return false; @@ -871,14 +855,12 @@ bool GrpcTool::ParseMessage(int argc, const char** argv, if (FLAGS_binary_output) { output_ss << serialized_request_proto; } else { - grpc::string output_text; - output_text = parser->GetFormattedStringFromMessageType( - type_name, serialized_request_proto, FLAGS_json_output); + grpc::string output_text = parser->GetTextFormatFromMessageType( + type_name, serialized_request_proto); if (parser->HasError()) { fprintf(stderr, "Failed to deserialize the message.\n"); return false; } - output_ss << output_text << std::endl; } @@ -903,25 +885,6 @@ bool GrpcTool::ToText(int argc, const char** argv, const CliCredentials& cred, return ParseMessage(argc, argv, cred, callback); } -bool GrpcTool::ToJson(int argc, const char** argv, const CliCredentials& cred, - GrpcToolOutputCallback callback) { - CommandUsage( - "Convert binary message to json\n" - " grpc_cli tojson <protofiles> <type>\n" - " <protofiles> ; Comma separated list of proto files\n" - " <type> ; Protocol buffer type name\n" - " --proto_path ; The search path of proto files\n" - " --infile ; Input filename (defaults to stdin)\n" - " --outfile ; Output filename (defaults to stdout)\n"); - - FLAGS_protofiles = argv[0]; - FLAGS_remotedb = false; - FLAGS_binary_input = true; - FLAGS_binary_output = false; - FLAGS_json_output = true; - return ParseMessage(argc, argv, cred, callback); -} - bool GrpcTool::ToBinary(int argc, const char** argv, const CliCredentials& cred, GrpcToolOutputCallback callback) { CommandUsage( diff --git a/test/cpp/util/grpc_tool_test.cc b/test/cpp/util/grpc_tool_test.cc index be9a624a2c..3aae090e81 100644 --- a/test/cpp/util/grpc_tool_test.cc +++ b/test/cpp/util/grpc_tool_test.cc @@ -74,20 +74,11 @@ using grpc::testing::EchoResponse; " rpc Echo(grpc.testing.EchoRequest) returns (grpc.testing.EchoResponse) " \ "{}\n" -#define ECHO_RESPONSE_MESSAGE_TEXT_FORMAT \ - "message: \"echo\"\n" \ - "param {\n" \ - " host: \"localhost\"\n" \ - " peer: \"peer\"\n" \ - "}\n\n" - -#define ECHO_RESPONSE_MESSAGE_JSON_FORMAT \ - "{\n" \ - " \"message\": \"echo\",\n" \ - " \"param\": {\n" \ - " \"host\": \"localhost\",\n" \ - " \"peer\": \"peer\"\n" \ - " }\n" \ +#define ECHO_RESPONSE_MESSAGE \ + "message: \"echo\"\n" \ + "param {\n" \ + " host: \"localhost\"\n" \ + " peer: \"peer\"\n" \ "}\n\n" DECLARE_string(channel_creds_type); @@ -98,8 +89,6 @@ namespace testing { DECLARE_bool(binary_input); DECLARE_bool(binary_output); -DECLARE_bool(json_input); -DECLARE_bool(json_output); DECLARE_bool(l); DECLARE_bool(batch); DECLARE_string(metadata); @@ -437,61 +426,6 @@ TEST_F(GrpcToolTest, CallCommand) { // Expected output: "message: \"Hello\"" EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), "message: \"Hello\"")); - - // with json_output - output_stream.str(grpc::string()); - output_stream.clear(); - - FLAGS_json_output = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_output = false; - - // Expected output: - // { - // "message": "Hello" - // } - EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), - "{\n \"message\": \"Hello\"\n}")); - - ShutdownServer(); -} - -TEST_F(GrpcToolTest, CallCommandJsonInput) { - // Test input "grpc_cli call localhost:<port> Echo "{ \"message\": \"Hello\"}" - std::stringstream output_stream; - - const grpc::string server_address = SetUpServer(); - const char* argv[] = {"grpc_cli", "call", server_address.c_str(), "Echo", - "{ \"message\": \"Hello\"}"}; - - FLAGS_json_input = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - // Expected output: "message: \"Hello\"" - EXPECT_TRUE(nullptr != - strstr(output_stream.str().c_str(), "message: \"Hello\"")); - - // with json_output - output_stream.str(grpc::string()); - output_stream.clear(); - - FLAGS_json_output = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_output = false; - FLAGS_json_input = false; - - // Expected output: - // { - // "message": "Hello" - // } - EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), - "{\n \"message\": \"Hello\"\n}")); - ShutdownServer(); } @@ -519,101 +453,6 @@ TEST_F(GrpcToolTest, CallCommandBatch) { EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), "message: \"Hello0\"\nmessage: " "\"Hello1\"\nmessage: \"Hello2\"\n")); - // with json_output - output_stream.str(grpc::string()); - output_stream.clear(); - ss.clear(); - ss.seekg(0); - std::cin.rdbuf(ss.rdbuf()); - - FLAGS_batch = true; - FLAGS_json_output = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_output = false; - FLAGS_batch = false; - - // Expected output: - // { - // "message": "Hello0" - // } - // { - // "message": "Hello1" - // } - // { - // "message": "Hello2" - // } - // Expected output: "message: "Hello0"\nmessage: "Hello1"\nmessage: - // "Hello2"\n" - EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), - "{\n \"message\": \"Hello0\"\n}\n" - "{\n \"message\": \"Hello1\"\n}\n" - "{\n \"message\": \"Hello2\"\n}\n")); - - std::cin.rdbuf(orig); - ShutdownServer(); -} - -TEST_F(GrpcToolTest, CallCommandBatchJsonInput) { - // Test input "grpc_cli call Echo" - std::stringstream output_stream; - - const grpc::string server_address = SetUpServer(); - const char* argv[] = {"grpc_cli", "call", server_address.c_str(), "Echo", - "{\"message\": \"Hello0\"}"}; - - // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n" - std::streambuf* orig = std::cin.rdbuf(); - std::istringstream ss( - "{\"message\": \"Hello1\"}\n\n{\"message\": \"Hello2\" }\n\n"); - std::cin.rdbuf(ss.rdbuf()); - - FLAGS_json_input = true; - FLAGS_batch = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_batch = false; - - // Expected output: "message: "Hello0"\nmessage: "Hello1"\nmessage: - // "Hello2"\n" - EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), - "message: \"Hello0\"\nmessage: " - "\"Hello1\"\nmessage: \"Hello2\"\n")); - // with json_output - output_stream.str(grpc::string()); - output_stream.clear(); - ss.clear(); - ss.seekg(0); - std::cin.rdbuf(ss.rdbuf()); - - FLAGS_batch = true; - FLAGS_json_output = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_output = false; - FLAGS_batch = false; - FLAGS_json_input = false; - - // Expected output: - // { - // "message": "Hello0" - // } - // { - // "message": "Hello1" - // } - // { - // "message": "Hello2" - // } - // Expected output: "message: "Hello0"\nmessage: "Hello1"\nmessage: - // "Hello2"\n" - EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), - "{\n \"message\": \"Hello0\"\n}\n" - "{\n \"message\": \"Hello1\"\n}\n" - "{\n \"message\": \"Hello2\"\n}\n")); - std::cin.rdbuf(orig); ShutdownServer(); } @@ -640,95 +479,6 @@ TEST_F(GrpcToolTest, CallCommandBatchWithBadRequest) { // Expected output: "message: "Hello0"\nmessage: "Hello2"\n" EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), "message: \"Hello0\"\nmessage: \"Hello2\"\n")); - - // with json_output - output_stream.str(grpc::string()); - output_stream.clear(); - ss.clear(); - ss.seekg(0); - std::cin.rdbuf(ss.rdbuf()); - - FLAGS_batch = true; - FLAGS_json_output = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_output = false; - FLAGS_batch = false; - - // Expected output: - // { - // "message": "Hello0" - // } - // { - // "message": "Hello2" - // } - // Expected output: "message: "Hello0"\nmessage: "Hello1"\nmessage: - // "Hello2"\n" - EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), - "{\n \"message\": \"Hello0\"\n}\n" - "{\n \"message\": \"Hello2\"\n}\n")); - - std::cin.rdbuf(orig); - ShutdownServer(); -} - -TEST_F(GrpcToolTest, CallCommandBatchJsonInputWithBadRequest) { - // Test input "grpc_cli call Echo" - std::stringstream output_stream; - - const grpc::string server_address = SetUpServer(); - const char* argv[] = {"grpc_cli", "call", server_address.c_str(), "Echo", - "{ \"message\": \"Hello0\"}"}; - - // Mock std::cin input "message: 1\n\n message: 'Hello2'\n\n" - std::streambuf* orig = std::cin.rdbuf(); - std::istringstream ss( - "{ \"message\": 1 }\n\n { \"message\": \"Hello2\" }\n\n"); - std::cin.rdbuf(ss.rdbuf()); - - FLAGS_batch = true; - FLAGS_json_input = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_input = false; - FLAGS_batch = false; - - // Expected output: "message: "Hello0"\nmessage: "Hello2"\n" - EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), - "message: \"Hello0\"\nmessage: \"Hello2\"\n")); - - // with json_output - output_stream.str(grpc::string()); - output_stream.clear(); - ss.clear(); - ss.seekg(0); - std::cin.rdbuf(ss.rdbuf()); - - FLAGS_batch = true; - FLAGS_json_input = true; - FLAGS_json_output = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_output = false; - FLAGS_json_input = false; - FLAGS_batch = false; - - // Expected output: - // { - // "message": "Hello0" - // } - // { - // "message": "Hello2" - // } - // Expected output: "message: "Hello0"\nmessage: "Hello1"\nmessage: - // "Hello2"\n" - EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), - "{\n \"message\": \"Hello0\"\n}\n" - "{\n \"message\": \"Hello2\"\n}\n")); - std::cin.rdbuf(orig); ShutdownServer(); } @@ -758,34 +508,6 @@ TEST_F(GrpcToolTest, CallCommandRequestStream) { ShutdownServer(); } -TEST_F(GrpcToolTest, CallCommandRequestStreamJsonInput) { - // Test input: grpc_cli call localhost:<port> RequestStream "{ \"message\": - // \"Hello0\"}" - std::stringstream output_stream; - - const grpc::string server_address = SetUpServer(); - const char* argv[] = {"grpc_cli", "call", server_address.c_str(), - "RequestStream", "{ \"message\": \"Hello0\" }"}; - - // Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n" - std::streambuf* orig = std::cin.rdbuf(); - std::istringstream ss( - "{ \"message\": \"Hello1\" }\n\n{ \"message\": \"Hello2\" }\n\n"); - std::cin.rdbuf(ss.rdbuf()); - - FLAGS_json_input = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_input = false; - - // Expected output: "message: \"Hello0Hello1Hello2\"" - EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), - "message: \"Hello0Hello1Hello2\"")); - std::cin.rdbuf(orig); - ShutdownServer(); -} - TEST_F(GrpcToolTest, CallCommandRequestStreamWithBadRequest) { // Test input: grpc_cli call localhost:<port> RequestStream "message: // 'Hello0'" @@ -811,34 +533,6 @@ TEST_F(GrpcToolTest, CallCommandRequestStreamWithBadRequest) { ShutdownServer(); } -TEST_F(GrpcToolTest, CallCommandRequestStreamWithBadRequestJsonInput) { - // Test input: grpc_cli call localhost:<port> RequestStream "message: - // 'Hello0'" - std::stringstream output_stream; - - const grpc::string server_address = SetUpServer(); - const char* argv[] = {"grpc_cli", "call", server_address.c_str(), - "RequestStream", "{ \"message\": \"Hello0\" }"}; - - // Mock std::cin input "bad_field: 'Hello1'\n\n message: 'Hello2'\n\n" - std::streambuf* orig = std::cin.rdbuf(); - std::istringstream ss( - "{ \"bad_field\": \"Hello1\" }\n\n{ \"message\": \"Hello2\" }\n\n"); - std::cin.rdbuf(ss.rdbuf()); - - FLAGS_json_input = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_input = false; - - // Expected output: "message: \"Hello0Hello2\"" - EXPECT_TRUE(nullptr != - strstr(output_stream.str().c_str(), "message: \"Hello0Hello2\"")); - std::cin.rdbuf(orig); - ShutdownServer(); -} - TEST_F(GrpcToolTest, CallCommandResponseStream) { // Test input: grpc_cli call localhost:<port> ResponseStream "message: // 'Hello'" @@ -860,24 +554,6 @@ TEST_F(GrpcToolTest, CallCommandResponseStream) { expected_response_text.c_str())); } - // with json_output - output_stream.str(grpc::string()); - output_stream.clear(); - - FLAGS_json_output = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_output = false; - - // Expected output: "{\n \"message\": \"Hello{n}\"\n}\n" - for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { - grpc::string expected_response_text = - "{\n \"message\": \"Hello" + grpc::to_string(i) + "\"\n}\n"; - EXPECT_TRUE(nullptr != strstr(output_stream.str().c_str(), - expected_response_text.c_str())); - } - ShutdownServer(); } @@ -941,31 +617,15 @@ TEST_F(GrpcToolTest, ParseCommand) { const grpc::string server_address = SetUpServer(); const char* argv[] = {"grpc_cli", "parse", server_address.c_str(), - "grpc.testing.EchoResponse", - ECHO_RESPONSE_MESSAGE_TEXT_FORMAT}; + "grpc.testing.EchoResponse", ECHO_RESPONSE_MESSAGE}; FLAGS_binary_input = false; FLAGS_binary_output = false; EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), std::bind(PrintStream, &output_stream, std::placeholders::_1))); - // Expected output: ECHO_RESPONSE_MESSAGE_TEXT_FORMAT - EXPECT_TRUE(0 == strcmp(output_stream.str().c_str(), - ECHO_RESPONSE_MESSAGE_TEXT_FORMAT)); - - // with json_output - output_stream.str(grpc::string()); - output_stream.clear(); - - FLAGS_json_output = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_output = false; - - // Expected output: ECHO_RESPONSE_MESSAGE_JSON_FORMAT - EXPECT_TRUE(0 == strcmp(output_stream.str().c_str(), - ECHO_RESPONSE_MESSAGE_JSON_FORMAT)); + // Expected output: ECHO_RESPONSE_MESSAGE + EXPECT_TRUE(0 == strcmp(output_stream.str().c_str(), ECHO_RESPONSE_MESSAGE)); // Parse text message to binary message and then parse it back to text message output_stream.str(grpc::string()); @@ -985,52 +645,13 @@ TEST_F(GrpcToolTest, ParseCommand) { std::placeholders::_1))); // Expected output: ECHO_RESPONSE_MESSAGE - EXPECT_TRUE(0 == strcmp(output_stream.str().c_str(), - ECHO_RESPONSE_MESSAGE_TEXT_FORMAT)); + EXPECT_TRUE(0 == strcmp(output_stream.str().c_str(), ECHO_RESPONSE_MESSAGE)); FLAGS_binary_input = false; FLAGS_binary_output = false; ShutdownServer(); } -TEST_F(GrpcToolTest, ParseCommandJsonFormat) { - // Test input "grpc_cli parse localhost:<port> grpc.testing.EchoResponse - // ECHO_RESPONSE_MESSAGE_JSON_FORMAT" - std::stringstream output_stream; - std::stringstream binary_output_stream; - - const grpc::string server_address = SetUpServer(); - const char* argv[] = {"grpc_cli", "parse", server_address.c_str(), - "grpc.testing.EchoResponse", - ECHO_RESPONSE_MESSAGE_JSON_FORMAT}; - - FLAGS_json_input = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - - // Expected output: ECHO_RESPONSE_MESSAGE_TEXT_FORMAT - EXPECT_TRUE(0 == strcmp(output_stream.str().c_str(), - ECHO_RESPONSE_MESSAGE_TEXT_FORMAT)); - - // with json_output - output_stream.str(grpc::string()); - output_stream.clear(); - - FLAGS_json_output = true; - EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(), - std::bind(PrintStream, &output_stream, - std::placeholders::_1))); - FLAGS_json_output = false; - FLAGS_json_input = false; - - // Expected output: ECHO_RESPONSE_MESSAGE_JSON_FORMAT - EXPECT_TRUE(0 == strcmp(output_stream.str().c_str(), - ECHO_RESPONSE_MESSAGE_JSON_FORMAT)); - - ShutdownServer(); -} - TEST_F(GrpcToolTest, TooFewArguments) { // Test input "grpc_cli call Echo" std::stringstream output_stream; diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc index 68ecfeae2c..a530ed1ffc 100644 --- a/test/cpp/util/proto_file_parser.cc +++ b/test/cpp/util/proto_file_parser.cc @@ -217,32 +217,31 @@ bool ProtoFileParser::IsStreaming(const grpc::string& method, bool is_request) { } grpc::string ProtoFileParser::GetSerializedProtoFromMethod( - const grpc::string& method, const grpc::string& formatted_proto, - bool is_request, bool is_json_format) { + const grpc::string& method, const grpc::string& text_format_proto, + bool is_request) { has_error_ = false; grpc::string message_type_name = GetMessageTypeFromMethod(method, is_request); if (has_error_) { return ""; } - return GetSerializedProtoFromMessageType(message_type_name, formatted_proto, - is_json_format); + return GetSerializedProtoFromMessageType(message_type_name, + text_format_proto); } -grpc::string ProtoFileParser::GetFormattedStringFromMethod( +grpc::string ProtoFileParser::GetTextFormatFromMethod( const grpc::string& method, const grpc::string& serialized_proto, - bool is_request, bool is_json_format) { + bool is_request) { has_error_ = false; grpc::string message_type_name = GetMessageTypeFromMethod(method, is_request); if (has_error_) { return ""; } - return GetFormattedStringFromMessageType(message_type_name, serialized_proto, - is_json_format); + return GetTextFormatFromMessageType(message_type_name, serialized_proto); } grpc::string ProtoFileParser::GetSerializedProtoFromMessageType( - const grpc::string& message_type_name, const grpc::string& formatted_proto, - bool is_json_format) { + const grpc::string& message_type_name, + const grpc::string& text_format_proto) { has_error_ = false; grpc::string serialized; const protobuf::Descriptor* desc = @@ -253,23 +252,11 @@ grpc::string ProtoFileParser::GetSerializedProtoFromMessageType( } std::unique_ptr<grpc::protobuf::Message> msg( dynamic_factory_->GetPrototype(desc)->New()); - - bool ok; - if (is_json_format) { - ok = grpc::protobuf::json::JsonStringToMessage(formatted_proto, msg.get()) - .ok(); - if (!ok) { - LogError("Failed to convert json format to proto."); - return ""; - } - } else { - ok = protobuf::TextFormat::ParseFromString(formatted_proto, msg.get()); - if (!ok) { - LogError("Failed to convert text format to proto."); - return ""; - } + bool ok = protobuf::TextFormat::ParseFromString(text_format_proto, msg.get()); + if (!ok) { + LogError("Failed to parse text format to proto."); + return ""; } - ok = msg->SerializeToString(&serialized); if (!ok) { LogError("Failed to serialize proto."); @@ -278,9 +265,9 @@ grpc::string ProtoFileParser::GetSerializedProtoFromMessageType( return serialized; } -grpc::string ProtoFileParser::GetFormattedStringFromMessageType( - const grpc::string& message_type_name, const grpc::string& serialized_proto, - bool is_json_format) { +grpc::string ProtoFileParser::GetTextFormatFromMessageType( + const grpc::string& message_type_name, + const grpc::string& serialized_proto) { has_error_ = false; const protobuf::Descriptor* desc = desc_pool_->FindMessageTypeByName(message_type_name); @@ -294,24 +281,12 @@ grpc::string ProtoFileParser::GetFormattedStringFromMessageType( LogError("Failed to deserialize proto."); return ""; } - grpc::string formatted_string; - - if (is_json_format) { - grpc::protobuf::json::JsonPrintOptions jsonPrintOptions; - jsonPrintOptions.add_whitespace = true; - if (!grpc::protobuf::json::MessageToJsonString( - *msg.get(), &formatted_string, jsonPrintOptions) - .ok()) { - LogError("Failed to print proto message to json format"); - return ""; - } - } else { - if (!protobuf::TextFormat::PrintToString(*msg.get(), &formatted_string)) { - LogError("Failed to print proto message to text format"); - return ""; - } + grpc::string text_format; + if (!protobuf::TextFormat::PrintToString(*msg.get(), &text_format)) { + LogError("Failed to print proto message to text format"); + return ""; } - return formatted_string; + return text_format; } void ProtoFileParser::LogError(const grpc::string& error_msg) { diff --git a/test/cpp/util/proto_file_parser.h b/test/cpp/util/proto_file_parser.h index 1e49c98daf..eb1d793c2b 100644 --- a/test/cpp/util/proto_file_parser.h +++ b/test/cpp/util/proto_file_parser.h @@ -53,49 +53,21 @@ class ProtoFileParser { // used as the argument of Stub::Call() grpc::string GetFormattedMethodName(const grpc::string& method); - /// Converts a text or json string to its binary proto representation for the - /// given method's input or return type. - /// \param method the name of the method (does not need to be fully qualified - /// name) - /// \param formatted_proto the text- or json-formatted proto string - /// \param is_request if \c true the resolved type is that of the input - /// parameter of the method, otherwise it is the output type - /// \param is_json_format if \c true the \c formatted_proto is treated as a - /// json-formatted proto, otherwise it is treated as a text-formatted - /// proto - /// \return the serialised binary proto represenation of \c formatted_proto - grpc::string GetSerializedProtoFromMethod(const grpc::string& method, - const grpc::string& formatted_proto, - bool is_request, - bool is_json_format); - - /// Converts a text or json string to its proto representation for the given - /// message type. - /// \param formatted_proto the text- or json-formatted proto string - /// \return the serialised binary proto represenation of \c formatted_proto + grpc::string GetSerializedProtoFromMethod( + const grpc::string& method, const grpc::string& text_format_proto, + bool is_request); + + grpc::string GetTextFormatFromMethod(const grpc::string& method, + const grpc::string& serialized_proto, + bool is_request); + grpc::string GetSerializedProtoFromMessageType( const grpc::string& message_type_name, - const grpc::string& formatted_proto, bool is_json_format); - - /// Converts a binary proto string to its text or json string representation - /// for the given method's input or return type. - /// \param method the name of the method (does not need to be a fully - /// qualified name) - /// \param the serialised binary proto representation of type - /// \c message_type_name - /// \return the text- or json-formatted proto string of \c serialized_proto - grpc::string GetFormattedStringFromMethod( - const grpc::string& method, const grpc::string& serialized_proto, - bool is_request, bool is_json_format); - - /// Converts a binary proto string to its text or json string representation - /// for the given message type. - /// \param the serialised binary proto representation of type - /// \c message_type_name - /// \return the text- or json-formatted proto string of \c serialized_proto - grpc::string GetFormattedStringFromMessageType( + const grpc::string& text_format_proto); + + grpc::string GetTextFormatFromMessageType( const grpc::string& message_type_name, - const grpc::string& serialized_proto, bool is_json_format); + const grpc::string& serialized_proto); bool IsStreaming(const grpc::string& method, bool is_request); diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index eb99c7dda9..00c52baf9e 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -10254,6 +10254,91 @@ "deps": [ "gpr", "grpc_base", + "grpc_client_channel", + "grpc_resolver_fake", + "nanopb" + ], + "headers": [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h", + "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h", + "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" + ], + "is_filegroup": true, + "language": "c", + "name": "grpc_lb_policy_xds", + "src": [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h", + "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h", + "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" + ], + "third_party": false, + "type": "filegroup" + }, + { + "deps": [ + "gpr", + "grpc_base", + "grpc_client_channel", + "grpc_resolver_fake", + "grpc_secure", + "nanopb" + ], + "headers": [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h", + "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h", + "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" + ], + "is_filegroup": true, + "language": "c", + "name": "grpc_lb_policy_xds_secure", + "src": [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/duration.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/google/protobuf/timestamp.pb.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h", + "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h", + "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_secure.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.cc", + "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" + ], + "third_party": false, + "type": "filegroup" + }, + { + "deps": [ + "gpr", + "grpc_base", "grpc_client_channel" ], "headers": [ |