/* * * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" #include "third_party/nanopb/pb_decode.h" #include "third_party/nanopb/pb_encode.h" #include /* invoked once for every Server in ServerList */ static bool count_serverlist(pb_istream_t *stream, const pb_field_t *field, void **arg) { grpc_grpclb_serverlist *sl = *arg; grpc_grpclb_server server; if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) { gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); return false; } ++sl->num_servers; return true; } typedef struct decode_serverlist_arg { /* The decoding callback is invoked once per server in serverlist. Remember * which index of the serverlist are we currently decoding */ size_t decoding_idx; /* The decoded serverlist */ grpc_grpclb_serverlist *serverlist; } decode_serverlist_arg; /* invoked once for every Server in ServerList */ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, void **arg) { decode_serverlist_arg *dec_arg = *arg; GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx); grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) { gpr_free(server); gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); return false; } dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server; return true; } grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name) { grpc_grpclb_request *req = gpr_malloc(sizeof(grpc_grpclb_request)); req->has_client_stats = false; req->has_initial_request = true; req->initial_request.has_name = true; strncpy(req->initial_request.name, lb_service_name, GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH); return req; } static void populate_timestamp(gpr_timespec timestamp, struct _grpc_lb_v1_Timestamp *timestamp_pb) { timestamp_pb->has_seconds = true; timestamp_pb->seconds = timestamp.tv_sec; timestamp_pb->has_nanos = true; timestamp_pb->nanos = timestamp.tv_nsec; } grpc_grpclb_request *grpc_grpclb_load_report_request_create( grpc_grpclb_client_stats *client_stats) { grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request)); req->has_client_stats = true; req->client_stats.has_timestamp = true; populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); req->client_stats.has_num_calls_started = true; req->client_stats.has_num_calls_finished = true; req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true; req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true; req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; req->client_stats.has_num_calls_finished_known_received = true; grpc_grpclb_client_stats_get( client_stats, &req->client_stats.num_calls_started, &req->client_stats.num_calls_finished, &req->client_stats.num_calls_finished_with_drop_for_rate_limiting, &req->client_stats.num_calls_finished_with_drop_for_load_balancing, &req->client_stats.num_calls_finished_with_client_failed_to_send, &req->client_stats.num_calls_finished_known_received); return req; } grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { size_t encoded_length; pb_ostream_t sizestream; pb_ostream_t outputstream; grpc_slice slice; memset(&sizestream, 0, sizeof(pb_ostream_t)); pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request); encoded_length = sizestream.bytes_written; slice = GRPC_SLICE_MALLOC(encoded_length); outputstream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length); GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields, request) != 0); return slice; } void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { gpr_free(request); } typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response; grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( grpc_slice encoded_grpc_grpclb_response) { pb_istream_t stream = pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response), GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response)); grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) { gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } if (!res.has_initial_response) return NULL; grpc_grpclb_initial_response *initial_res = gpr_malloc(sizeof(grpc_grpclb_initial_response)); memcpy(initial_res, &res.initial_response, sizeof(grpc_grpclb_initial_response)); return initial_res; } grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( grpc_slice encoded_grpc_grpclb_response) { pb_istream_t stream = pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response), GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response)); pb_istream_t stream_at_start = stream; grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); // First pass: count number of servers. res.server_list.servers.funcs.decode = count_serverlist; res.server_list.servers.arg = sl; bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); if (!status) { gpr_free(sl); gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } // Second pass: populate servers. if (sl->num_servers > 0) { sl->servers = gpr_zalloc(sizeof(grpc_grpclb_server *) * sl->num_servers); decode_serverlist_arg decode_arg; memset(&decode_arg, 0, sizeof(decode_arg)); decode_arg.serverlist = sl; res.server_list.servers.funcs.decode = decode_serverlist; res.server_list.servers.arg = &decode_arg; status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res); if (!status) { grpc_grpclb_destroy_serverlist(sl); gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } } if (res.server_list.has_expiration_interval) { sl->expiration_interval = res.server_list.expiration_interval; } return sl; } void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist) { if (serverlist == NULL) { return; } for (size_t i = 0; i < serverlist->num_servers; i++) { gpr_free(serverlist->servers[i]); } gpr_free(serverlist->servers); gpr_free(serverlist); } grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( const grpc_grpclb_serverlist *sl) { grpc_grpclb_serverlist *copy = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); copy->num_servers = sl->num_servers; memcpy(©->expiration_interval, &sl->expiration_interval, sizeof(grpc_grpclb_duration)); copy->servers = gpr_malloc(sizeof(grpc_grpclb_server *) * sl->num_servers); for (size_t i = 0; i < sl->num_servers; i++) { copy->servers[i] = gpr_malloc(sizeof(grpc_grpclb_server)); memcpy(copy->servers[i], sl->servers[i], sizeof(grpc_grpclb_server)); } return copy; } bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs, const grpc_grpclb_serverlist *rhs) { if (lhs == NULL || rhs == NULL) { return false; } if (lhs->num_servers != rhs->num_servers) { return false; } if (grpc_grpclb_duration_compare(&lhs->expiration_interval, &rhs->expiration_interval) != 0) { return false; } for (size_t i = 0; i < lhs->num_servers; i++) { if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) { return false; } } return true; } bool grpc_grpclb_server_equals(const grpc_grpclb_server *lhs, const grpc_grpclb_server *rhs) { return memcmp(lhs, rhs, sizeof(grpc_grpclb_server)) == 0; } int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, const grpc_grpclb_duration *rhs) { GPR_ASSERT(lhs && rhs); if (lhs->has_seconds && rhs->has_seconds) { if (lhs->seconds < rhs->seconds) return -1; if (lhs->seconds > rhs->seconds) return 1; } else if (lhs->has_seconds) { return 1; } else if (rhs->has_seconds) { return -1; } GPR_ASSERT(lhs->seconds == rhs->seconds); if (lhs->has_nanos && rhs->has_nanos) { if (lhs->nanos < rhs->nanos) return -1; if (lhs->nanos > rhs->nanos) return 1; } else if (lhs->has_nanos) { return 1; } else if (rhs->has_nanos) { return -1; } return 0; } gpr_timespec grpc_grpclb_duration_to_timespec( grpc_grpclb_duration *duration_pb) { gpr_timespec duration; duration.tv_sec = duration_pb->has_seconds ? duration_pb->seconds : 0; duration.tv_nsec = duration_pb->has_nanos ? duration_pb->nanos : 0; duration.clock_type = GPR_TIMESPAN; return duration; } void grpc_grpclb_initial_response_destroy( grpc_grpclb_initial_response *response) { gpr_free(response); }