aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--doc/server_reflection_tutorial.md192
-rw-r--r--src/core/ext/client_config/lb_policy_factory.c38
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h32
-rw-r--r--src/core/ext/client_config/resolver_result.c80
-rw-r--r--src/core/ext/client_config/resolver_result.h83
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c149
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c23
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c36
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c23
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c41
-rw-r--r--src/core/lib/security/credentials/plugin/plugin_credentials.c25
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs4
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs102
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs31
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs40
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs6
-rw-r--r--test/cpp/end2end/end2end_test.cc63
17 files changed, 715 insertions, 253 deletions
diff --git a/doc/server_reflection_tutorial.md b/doc/server_reflection_tutorial.md
new file mode 100644
index 0000000000..ecb176723c
--- /dev/null
+++ b/doc/server_reflection_tutorial.md
@@ -0,0 +1,192 @@
+# gRPC Server Reflection Tutorial
+
+gRPC Server Reflection provides information about publicly-accessible gRPC
+services on a server, and assists clients at runtime to construct RPC
+requests and responses without precompiled service information. It is used by
+gRPC CLI, which can be used to introspect server protos and send/receive test
+RPCs.
+
+## Enable Server Reflection
+
+### Enable server reflection in C++ servers
+
+C++ Server Reflection is an add-on library, `libgrpc++_reflction`. To enable C++
+server reflection, you can link this library to your server binary.
+
+Some platforms (e.g. Ubuntu 11.10 onwards) only link in libraries that directly
+contain symbols used by the application. On these platforms, LD flag
+`--no-as-needed` is needed for for dynamic linking and `--whole-archive` is
+needed for for static linking.
+
+This [Makefile](../examples/cpp/helloworld/Makefile#L37#L45) demonstrates
+enabling c++ server reflection on Linux and MacOS.
+
+## Test services using Server Reflection
+
+After enabling Server Reflection in a server application, you can use gRPC CLI
+to test its services.
+
+Instructions on how to use gRPC CLI can be found at
+[command_line_tool.md](command_line_tool.md), or using `grpc_cli help` command.
+
+Here we use `examples/cpp/helloworld` as an example to show the use of gRPC
+Server Reflection and gRPC CLI. First, we need to build gRPC CLI and setup an
+example server with Server Reflection enabled.
+
+- Setup an example server
+
+ Server Reflection has already been enabled in the
+ [Makefile](../examples/cpp/helloworld/Makefile) of the helloworld example. We
+ can simply make it and run the greeter_server.
+
+ ```sh
+ $ make -C examples/cpp/helloworld
+ $ examples/cpp/helloworld/greeter_server &
+ ```
+
+- Build gRPC CLI
+
+ ```sh
+ make grpc_cli
+ cd bins/opt
+ ```
+
+ gRPC CLI binary `grpc_cli` can be found at `bins/opt/` folder. This tool is
+ still new and does not have a `make install` target yet.
+
+### List services
+
+`grpc_cli ls` command lists services and methods exposed at a given port
+
+- List all the services exposed at a given port
+
+ ```sh
+ $ grpc_cli ls localhost:50051
+ ```
+
+ output:
+ ```sh
+ helloworld.Greeter
+ grpc.reflection.v1alpha.ServerReflection
+ ```
+
+- List one service with details
+
+ `grpc_cli ls` command inspects a service given its full name (in the format of
+ \<package\>.\<service\>). It can print information with a long listing format
+ when `-l` flag is set. This flag can be used to get more details about a
+ service.
+
+ ```sh
+ $ grpc_cli ls localhost:50051 helloworld.Greeter -l
+ ```
+
+ output:
+ ```sh
+ filename: helloworld.proto
+ package: helloworld;
+ service Greeter {
+ rpc SayHello(helloworld.HelloRequest) returns (helloworld.HelloReply) {}
+ }
+
+ ```
+
+### List methods
+
+- List one method with details
+
+ `grpc_cli ls` command also inspects a method given its full name (in the
+ format of \<package\>.\<service\>.\<method\>).
+
+ ```sh
+ $ grpc_cli ls localhost:50051 helloworld.Greeter.SayHello -l
+ ```
+
+ output:
+ ```sh
+ rpc SayHello(helloworld.HelloRequest) returns (helloworld.HelloReply) {}
+ ```
+
+### Inspect message types
+
+We can use`grpc_cli type` command to inspect request/response types given the
+full name of the type (in the format of \<package\>.\<type\>).
+
+- Get information about the request type
+
+ ```sh
+ $ grpc_cli type localhost:50051 helloworld.HelloRequest
+ ```
+
+ output:
+ ```sh
+ message HelloRequest {
+ optional string name = 1;
+ }
+ ```
+
+### Call a remote method
+
+We can send RPCs to a server and get responses using `grpc_cli call` command.
+
+- Call a unary method
+
+ ```sh
+ $ grpc_cli call localhost:50051 SayHello "name: 'gRPC CLI'"
+ ```
+
+ output:
+ ```sh
+ message: "Hello gRPC CLI"
+ ```
+
+## Use Server Reflection in a C++ client
+
+Server Reflection can be used by clients to get information about gRPC services
+at runtime. We've provided a descriptor database called
+[grpc::ProtoReflectionDescriptorDatabase](../test/cpp/util/proto_reflection_descriptor_database.h)
+which implements the
+[google::protobuf::DescriptorDatabase](https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.descriptor_database#DescriptorDatabase)
+interface. It manages the communication between clients and reflection services
+and the storage of received information. Clients can use it as using a local
+descriptor database.
+
+- To use Server Reflection with grpc::ProtoReflectionDescriptorDatabase, first
+ initialize an instance with a grpc::Channel.
+
+ ```c++
+ std::shared_ptr<grpc::Channel> channel =
+ grpc::CreateChannel(server_address, server_cred);
+ grpc::ProtoReflectionDescriptorDatabase reflection_db(channel);
+ ```
+
+- Then use this instance to feed a
+ [google::protobuf::DescriptorPool](https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.descriptor#DescriptorPool).
+
+ ```c++
+ google::protobuf::DescriptorPool desc_pool(&reflection_db);
+ ```
+
+- Example usage of this descriptor pool
+
+ * Get Service/method descriptors.
+
+ ```c++
+ const google::protobuf::ServiceDescriptor* service_desc =
+ desc_pool->FindServiceByName("helloworld.Greeter");
+ const google::protobuf::MethodDescriptor* method_desc =
+ desc_pool->FindMethodByName("helloworld.Greeter.SayHello");
+ ```
+
+ * Get message type descriptors.
+
+ ```c++
+ const google::protobuf::Descriptor* request_desc =
+ desc_pool->FindMessageTypeByName("helloworld.HelloRequest");
+ ```
+
+ * Feed [google::protobuf::DynamicMessageFactory](https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.dynamic_message#DynamicMessageFactory).
+
+ ```c++
+ google::protobuf::DynamicMessageFactory(&desc_pool);
+ ```
diff --git a/src/core/ext/client_config/lb_policy_factory.c b/src/core/ext/client_config/lb_policy_factory.c
index 70e46ef3cf..f86117aa38 100644
--- a/src/core/ext/client_config/lb_policy_factory.c
+++ b/src/core/ext/client_config/lb_policy_factory.c
@@ -31,8 +31,46 @@
*
*/
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
#include "src/core/ext/client_config/lb_policy_factory.h"
+grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) {
+ grpc_lb_addresses* addresses = gpr_malloc(sizeof(grpc_lb_addresses));
+ addresses->num_addresses = num_addresses;
+ const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses;
+ addresses->addresses = gpr_malloc(addresses_size);
+ memset(addresses->addresses, 0, addresses_size);
+ return addresses;
+}
+
+void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
+ void* address, size_t address_len,
+ bool is_balancer, char* balancer_name,
+ void* user_data) {
+ GPR_ASSERT(index < addresses->num_addresses);
+ grpc_lb_address* target = &addresses->addresses[index];
+ memcpy(target->address.addr, address, address_len);
+ target->address.len = address_len;
+ target->is_balancer = is_balancer;
+ target->balancer_name = balancer_name;
+ target->user_data = user_data;
+}
+
+void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses,
+ void (*user_data_destroy)(void*)) {
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ gpr_free(addresses->addresses[i].balancer_name);
+ if (user_data_destroy != NULL) {
+ user_data_destroy(addresses->addresses[i].user_data);
+ }
+ }
+ gpr_free(addresses->addresses);
+ gpr_free(addresses);
+}
+
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
factory->vtable->ref(factory);
}
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index 7191ca7d89..b31179cf80 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -36,7 +36,7 @@
#include "src/core/ext/client_config/client_channel_factory.h"
#include "src/core/ext/client_config/lb_policy.h"
-#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/ext/client_config/resolver_result.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@@ -53,13 +53,37 @@ struct grpc_lb_policy_factory {
* Those who don't will simply ignore it and will correspondingly return NULL in
* their namesake pick() output argument. */
typedef struct grpc_lb_address {
- grpc_resolved_address *resolved_address;
+ grpc_resolved_address address;
+ bool is_balancer;
+ char *balancer_name; /* For secure naming. */
void *user_data;
} grpc_lb_address;
-typedef struct grpc_lb_policy_args {
- grpc_lb_address *addresses;
+typedef struct grpc_lb_addresses {
size_t num_addresses;
+ grpc_lb_address *addresses;
+} grpc_lb_addresses;
+
+/** Returns a grpc_addresses struct with enough space for
+ * \a num_addresses addresses. */
+grpc_lb_addresses *grpc_lb_addresses_create(size_t num_addresses);
+
+/** Sets the value of the address at index \a index of \a addresses.
+ * \a address is a socket address of length \a address_len.
+ * Takes ownership of \a balancer_name. */
+void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
+ void *address, size_t address_len,
+ bool is_balancer, char *balancer_name,
+ void *user_data);
+
+/** Destroys \a addresses. If \a user_data_destroy is not NULL, it will
+ * be invoked to destroy the \a user_data field of each address. */
+void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses,
+ void (*user_data_destroy)(void *));
+
+/** Arguments passed to LB policies. */
+typedef struct grpc_lb_policy_args {
+ grpc_lb_addresses *addresses;
grpc_client_channel_factory *client_channel_factory;
} grpc_lb_policy_args;
diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c
index c6c4166e83..242989a0da 100644
--- a/src/core/ext/client_config/resolver_result.c
+++ b/src/core/ext/client_config/resolver_result.c
@@ -1,35 +1,33 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
+//
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
#include "src/core/ext/client_config/resolver_result.h"
@@ -39,20 +37,20 @@
struct grpc_resolver_result {
gpr_refcount refs;
- grpc_lb_policy *lb_policy;
+ grpc_lb_policy* lb_policy;
};
-grpc_resolver_result *grpc_resolver_result_create() {
- grpc_resolver_result *c = gpr_malloc(sizeof(*c));
+grpc_resolver_result* grpc_resolver_result_create() {
+ grpc_resolver_result* c = gpr_malloc(sizeof(*c));
memset(c, 0, sizeof(*c));
gpr_ref_init(&c->refs, 1);
return c;
}
-void grpc_resolver_result_ref(grpc_resolver_result *c) { gpr_ref(&c->refs); }
+void grpc_resolver_result_ref(grpc_resolver_result* c) { gpr_ref(&c->refs); }
-void grpc_resolver_result_unref(grpc_exec_ctx *exec_ctx,
- grpc_resolver_result *c) {
+void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
+ grpc_resolver_result* c) {
if (gpr_unref(&c->refs)) {
if (c->lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "resolver_result");
@@ -61,8 +59,8 @@ void grpc_resolver_result_unref(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_resolver_result_set_lb_policy(grpc_resolver_result *c,
- grpc_lb_policy *lb_policy) {
+void grpc_resolver_result_set_lb_policy(grpc_resolver_result* c,
+ grpc_lb_policy* lb_policy) {
GPR_ASSERT(c->lb_policy == NULL);
if (lb_policy) {
GRPC_LB_POLICY_REF(lb_policy, "resolver_result");
@@ -70,6 +68,6 @@ void grpc_resolver_result_set_lb_policy(grpc_resolver_result *c,
c->lb_policy = lb_policy;
}
-grpc_lb_policy *grpc_resolver_result_get_lb_policy(grpc_resolver_result *c) {
+grpc_lb_policy* grpc_resolver_result_get_lb_policy(grpc_resolver_result* c) {
return c->lb_policy;
}
diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h
index 402f7dbd7e..5a69d81990 100644
--- a/src/core/ext/client_config/resolver_result.h
+++ b/src/core/ext/client_config/resolver_result.h
@@ -1,52 +1,53 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
+//
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
+#include <stdbool.h>
+
#include "src/core/ext/client_config/lb_policy.h"
+#include "src/core/lib/iomgr/resolve_address.h"
-/** Results reported from a grpc_resolver. */
+/// Results reported from a grpc_resolver.
typedef struct grpc_resolver_result grpc_resolver_result;
-grpc_resolver_result *grpc_resolver_result_create();
-void grpc_resolver_result_ref(grpc_resolver_result *client_config);
-void grpc_resolver_result_unref(grpc_exec_ctx *exec_ctx,
- grpc_resolver_result *client_config);
+grpc_resolver_result* grpc_resolver_result_create();
+void grpc_resolver_result_ref(grpc_resolver_result* result);
+void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
+ grpc_resolver_result* result);
-void grpc_resolver_result_set_lb_policy(grpc_resolver_result *client_config,
- grpc_lb_policy *lb_policy);
-grpc_lb_policy *grpc_resolver_result_get_lb_policy(
- grpc_resolver_result *client_config);
+void grpc_resolver_result_set_lb_policy(grpc_resolver_result* result,
+ grpc_lb_policy* lb_policy);
+grpc_lb_policy* grpc_resolver_result_get_lb_policy(
+ grpc_resolver_result* result);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 874e26834a..070dbc889f 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -107,6 +107,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/client_config/client_channel_factory.h"
+#include "src/core/ext/client_config/lb_policy_factory.h"
#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/lb_policy/grpclb/grpclb.h"
@@ -120,18 +121,6 @@
int grpc_lb_glb_trace = 0;
-static void lb_addrs_destroy(grpc_lb_address *lb_addresses,
- size_t num_addresses) {
- /* free "resolved" addresses memblock */
- gpr_free(lb_addresses->resolved_address);
- for (size_t i = 0; i < num_addresses; ++i) {
- if (lb_addresses[i].user_data != NULL) {
- GRPC_MDELEM_UNREF(lb_addresses[i].user_data);
- }
- }
- gpr_free(lb_addresses);
-}
-
/* add lb_token of selected subchannel (address) to the call's initial
* metadata */
static void initial_metadata_add_lb_token(
@@ -311,11 +300,8 @@ typedef struct glb_lb_policy {
* response has arrived. */
grpc_grpclb_serverlist *serverlist;
- /** total number of valid addresses received in \a serverlist */
- size_t num_ok_serverlist_addresses;
-
- /** LB addresses from \a serverlist, \a num_ok_serverlist_addresses of them */
- grpc_lb_address *lb_addresses;
+ /** addresses from \a serverlist */
+ grpc_lb_addresses *addresses;
/** list of picks that are waiting on RR's policy connectivity */
pending_pick *pending_picks;
@@ -368,26 +354,18 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
return true;
}
-/* populate \a addresses according to \a serverlist. Returns the number of
- * addresses successfully parsed and added to \a addresses */
-static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
- grpc_lb_address **lb_addresses) {
+/* Returns addresses extracted from \a serverlist. */
+static grpc_lb_addresses *process_serverlist(
+ const grpc_grpclb_serverlist *serverlist) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
* memory in a single block */
for (size_t i = 0; i < serverlist->num_servers; ++i) {
if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid;
}
- if (num_valid == 0) {
- return 0;
- }
+ if (num_valid == 0) return NULL;
- /* allocate the memory block for the "resolved" addresses. */
- grpc_resolved_address *r_addrs_memblock =
- gpr_malloc(sizeof(grpc_resolved_address) * num_valid);
- memset(r_addrs_memblock, 0, sizeof(grpc_resolved_address) * num_valid);
- grpc_lb_address *lb_addrs = gpr_malloc(sizeof(grpc_lb_address) * num_valid);
- memset(lb_addrs, 0, sizeof(grpc_lb_address) * num_valid);
+ grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid);
/* second pass: actually populate the addresses and LB tokens (aka user data
* to the outside world) to be read by the RR policy during its creation.
@@ -399,56 +377,58 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist,
GPR_ASSERT(addr_idx < num_valid);
const grpc_grpclb_server *server = serverlist->servers[sl_idx];
if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue;
- grpc_lb_address *const lb_addr = &lb_addrs[addr_idx];
/* address processing */
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
const grpc_grpclb_ip_address *ip = &server->ip_address;
-
- lb_addr->resolved_address = &r_addrs_memblock[addr_idx];
- struct sockaddr_storage *sa =
- (struct sockaddr_storage *)lb_addr->resolved_address->addr;
- size_t *sa_len = &lb_addr->resolved_address->len;
- *sa_len = 0;
+ grpc_resolved_address addr;
+ memset(&addr, 0, sizeof(addr));
if (ip->size == 4) {
- struct sockaddr_in *addr4 = (struct sockaddr_in *)sa;
- *sa_len = sizeof(struct sockaddr_in);
- memset(addr4, 0, *sa_len);
+ addr.len = sizeof(struct sockaddr_in);
+ struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr;
addr4->sin_family = AF_INET;
memcpy(&addr4->sin_addr, ip->bytes, ip->size);
addr4->sin_port = netorder_port;
} else if (ip->size == 16) {
- struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)sa;
- *sa_len = sizeof(struct sockaddr_in6);
- memset(addr6, 0, *sa_len);
+ addr.len = sizeof(struct sockaddr_in6);
+ struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr;
addr6->sin6_family = AF_INET;
memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
addr6->sin6_port = netorder_port;
}
- GPR_ASSERT(*sa_len > 0);
/* lb token processing */
+ void *user_data;
if (server->has_load_balance_token) {
const size_t lb_token_size =
GPR_ARRAY_SIZE(server->load_balance_token) - 1;
grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer(
(uint8_t *)server->load_balance_token, lb_token_size);
- lb_addr->user_data = grpc_mdelem_from_metadata_strings(
+ user_data = grpc_mdelem_from_metadata_strings(
GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr);
} else {
gpr_log(GPR_ERROR,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
- grpc_sockaddr_to_uri((struct sockaddr *)sa));
- lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
+ grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr));
+ user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY;
}
+
+ grpc_lb_addresses_set_address(lb_addresses, addr_idx, &addr.addr, addr.len,
+ false /* is_balancer */,
+ NULL /* balancer_name */, user_data);
++addr_idx;
}
GPR_ASSERT(addr_idx == num_valid);
- *lb_addresses = lb_addrs;
- return num_valid;
+
+ return lb_addresses;
+}
+
+/* A plugin for grpc_lb_addresses_destroy that unrefs the LB token metadata. */
+static void lb_token_destroy(void *token) {
+ if (token != NULL) GRPC_MDELEM_UNREF(token);
}
static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
@@ -459,19 +439,15 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
- const size_t num_ok_addresses =
- process_serverlist(serverlist, &args.addresses);
- args.num_addresses = num_ok_addresses;
+ args.addresses = process_serverlist(serverlist);
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
- if (glb_policy->lb_addresses != NULL) {
+ if (glb_policy->addresses != NULL) {
/* dispose of the previous version */
- lb_addrs_destroy(glb_policy->lb_addresses,
- glb_policy->num_ok_serverlist_addresses);
+ grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy);
}
- glb_policy->num_ok_serverlist_addresses = num_ok_addresses;
- glb_policy->lb_addresses = args.addresses;
+ glb_policy->addresses = args.addresses;
return rr;
}
@@ -565,6 +541,19 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
+ /* Count the number of gRPC-LB addresses. There must be at least one.
+ * TODO(roth): For now, we ignore non-balancer addresses, but in the
+ * future, we may change the behavior such that we fall back to using
+ * the non-balancer addresses if we cannot reach any balancers. At that
+ * time, this should be changed to allow a list with no balancer addresses,
+ * since the resolver might fail to return a balancer address even when
+ * this is the right LB policy to use. */
+ size_t num_grpclb_addrs = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; ++i) {
+ if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ }
+ if (num_grpclb_addrs == 0) return NULL;
+
glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
memset(glb_policy, 0, sizeof(*glb_policy));
@@ -576,36 +565,34 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* Create a client channel over them to communicate with a LB service */
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
- if (args->num_addresses == 0) {
- return NULL;
- }
-
- if (args->addresses[0].user_data != NULL) {
- gpr_log(GPR_ERROR,
- "This LB policy doesn't support user data. It will be ignored");
- }
/* construct a target from the addresses in args, given in the form
* ipvX://ip1:port1,ip2:port2,...
* TODO(dgq): support mixed ip version */
- char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses);
- addr_strs[0] = grpc_sockaddr_to_uri(
- (const struct sockaddr *)&args->addresses[0].resolved_address->addr);
- for (size_t i = 1; i < args->num_addresses; i++) {
- if (args->addresses[i].user_data != NULL) {
+ char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
+ size_t addr_index = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ if (args->addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
-
- GPR_ASSERT(
- grpc_sockaddr_to_string(
- &addr_strs[i],
- (const struct sockaddr *)&args->addresses[i].resolved_address->addr,
- true) == 0);
+ if (args->addresses->addresses[i].is_balancer) {
+ if (addr_index == 0) {
+ addr_strs[addr_index++] = grpc_sockaddr_to_uri(
+ (const struct sockaddr *)&args->addresses->addresses[i]
+ .address.addr);
+ } else {
+ GPR_ASSERT(grpc_sockaddr_to_string(
+ &addr_strs[addr_index++],
+ (const struct sockaddr *)&args->addresses->addresses[i]
+ .address.addr,
+ true) == 0);
+ }
+ }
}
size_t uri_path_len;
- char *target_uri_str = gpr_strjoin_sep(
- (const char **)addr_strs, args->num_addresses, ",", &uri_path_len);
+ char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
+ num_grpclb_addrs, ",", &uri_path_len);
/* will pick using pick_first */
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
@@ -613,7 +600,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
gpr_free(target_uri_str);
- for (size_t i = 0; i < args->num_addresses; i++) {
+ for (size_t i = 0; i < num_grpclb_addrs; i++) {
gpr_free(addr_strs[i]);
}
gpr_free(addr_strs);
@@ -649,9 +636,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
gpr_mu_destroy(&glb_policy->mu);
-
- lb_addrs_destroy(glb_policy->lb_addresses,
- glb_policy->num_ok_serverlist_addresses);
+ grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy);
gpr_free(glb_policy);
}
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 3c863affb3..728d678532 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -447,25 +447,34 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
- if (args->num_addresses == 0) return NULL;
+ /* Find the number of backend addresses. We ignore balancer
+ * addresses, since we don't know how to handle them. */
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ if (!args->addresses->addresses[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) return NULL;
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_addresses);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * args->num_addresses);
+ p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_addrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->num_addresses; i++) {
- if (args->addresses[i].user_data != NULL) {
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ /* Skip balancer addresses, since we only know how to handle backends. */
+ if (args->addresses->addresses[i].is_balancer) continue;
+
+ if (args->addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
sc_args.addr =
- (struct sockaddr *)(args->addresses[i].resolved_address->addr);
- sc_args.addr_len = (size_t)args->addresses[i].resolved_address->len;
+ (struct sockaddr *)(&args->addresses->addresses[i].address.addr);
+ sc_args.addr_len = args->addresses->addresses[i].address.len;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index ad89491167..6953cceb63 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -130,10 +130,6 @@ struct round_robin_lb_policy {
/** total number of addresses received at creation time */
size_t num_addresses;
- /** array holding the borrowed and opaque pointers to incoming user data, one
- * per incoming address. These individual pointers will be returned as-is in
- * successful picks. */
- void **user_data_pointers;
/** all our subchannels */
size_t num_subchannels;
@@ -282,7 +278,6 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
elem = tmp;
}
- gpr_free(p->user_data_pointers);
gpr_free(p);
}
@@ -617,25 +612,32 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_args *args) {
GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
- if (args->num_addresses == 0) return NULL;
+
+ /* Find the number of backend addresses. We ignore balancer
+ * addresses, since we don't know how to handle them. */
+ size_t num_addrs = 0;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ if (!args->addresses->addresses[i].is_balancer) ++num_addrs;
+ }
+ if (num_addrs == 0) return NULL;
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- p->num_addresses = args->num_addresses;
- p->subchannels = gpr_malloc(sizeof(subchannel_data) * p->num_addresses);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_addresses);
- p->user_data_pointers = gpr_malloc(sizeof(void *) * p->num_addresses);
- memset(p->user_data_pointers, 0, sizeof(void *) * p->num_addresses);
+ p->num_addresses = num_addrs;
+ p->subchannels = gpr_malloc(sizeof(*p->subchannels) * num_addrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < p->num_addresses; i++) {
- memset(&sc_args, 0, sizeof(grpc_subchannel_args));
- sc_args.addr = (struct sockaddr *)args->addresses[i].resolved_address->addr;
- sc_args.addr_len = args->addresses[i].resolved_address->len;
+ for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ /* Skip balancer addresses, since we only know how to handle backends. */
+ if (args->addresses->addresses[i].is_balancer) continue;
- p->user_data_pointers[i] = args->addresses[i].user_data;
+ memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ sc_args.addr =
+ (struct sockaddr *)(&args->addresses->addresses[i].address.addr);
+ sc_args.addr_len = args->addresses->addresses[i].address.len;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
@@ -647,7 +649,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
- sd->user_data = p->user_data_pointers[i];
+ sd->user_data = args->addresses->addresses[i].user_data;
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 32e9de69a6..63682db7de 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -170,28 +170,27 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = 0;
- grpc_resolved_addresses *addresses = r->addresses;
- if (addresses != NULL) {
+ if (r->addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
- result = grpc_resolver_result_create();
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.num_addresses = addresses->naddrs;
- lb_policy_args.addresses =
- gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
- memset(lb_policy_args.addresses, 0,
- sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
- for (size_t i = 0; i < addresses->naddrs; ++i) {
- lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i];
+ lb_policy_args.addresses = grpc_lb_addresses_create(r->addresses->naddrs);
+ for (size_t i = 0; i < r->addresses->naddrs; ++i) {
+ grpc_lb_addresses_set_address(
+ lb_policy_args.addresses, i, &r->addresses->addrs[i].addr,
+ r->addresses->addrs[i].len, false /* is_balancer */,
+ NULL /* balancer_name */, NULL /* user_data */);
}
+ grpc_resolved_addresses_destroy(r->addresses);
lb_policy_args.client_channel_factory = r->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
- gpr_free(lb_policy_args.addresses);
+ grpc_lb_addresses_destroy(lb_policy_args.addresses,
+ NULL /* user_data_destroy */);
+ result = grpc_resolver_result_create();
if (lb_policy != NULL) {
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
}
- grpc_resolved_addresses_destroy(addresses);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 425285287c..fbfe5d774b 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -58,12 +58,12 @@ typedef struct {
char *lb_policy_name;
/** the addresses that we've 'resolved' */
- grpc_resolved_addresses *addresses;
+ grpc_lb_addresses *addresses;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** have we published? */
- int published;
+ bool published;
/** pending next completion, or NULL */
grpc_closure *next_completion;
/** target result address for next completion */
@@ -102,7 +102,7 @@ static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
gpr_mu_lock(&r->mu);
- r->published = 0;
+ r->published = false;
sockaddr_maybe_finish_next_locked(exec_ctx, r);
gpr_mu_unlock(&r->mu);
}
@@ -125,21 +125,13 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
grpc_resolver_result *result = grpc_resolver_result_create();
grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.num_addresses = r->addresses->naddrs;
- lb_policy_args.addresses =
- gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
- memset(lb_policy_args.addresses, 0,
- sizeof(grpc_lb_address) * lb_policy_args.num_addresses);
- for (size_t i = 0; i < lb_policy_args.num_addresses; ++i) {
- lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i];
- }
+ lb_policy_args.addresses = r->addresses;
lb_policy_args.client_channel_factory = r->client_channel_factory;
grpc_lb_policy *lb_policy =
grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
- gpr_free(lb_policy_args.addresses);
grpc_resolver_result_set_lb_policy(result, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
- r->published = 1;
+ r->published = true;
*r->target_result = result;
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
@@ -150,7 +142,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
- grpc_resolved_addresses_destroy(r->addresses);
+ grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r->lb_policy_name);
gpr_free(r);
}
@@ -183,7 +175,7 @@ static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_resolver_args *args, const char *default_lb_policy_name,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) {
- int errors_found = 0; /* GPR_FALSE */
+ bool errors_found = false;
sockaddr_resolver *r;
gpr_slice path_slice;
gpr_slice_buffer path_parts;
@@ -224,21 +216,18 @@ static grpc_resolver *sockaddr_create(
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
- r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
- r->addresses->naddrs = path_parts.count;
- r->addresses->addrs =
- gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs);
-
- for (size_t i = 0; i < r->addresses->naddrs; i++) {
+ r->addresses = grpc_lb_addresses_create(path_parts.count);
+ for (size_t i = 0; i < r->addresses->num_addresses; i++) {
grpc_uri ith_uri = *args->uri;
char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
ith_uri.path = part_str;
- if (!parse(&ith_uri,
- (struct sockaddr_storage *)(&r->addresses->addrs[i].addr),
- &r->addresses->addrs[i].len)) {
- errors_found = 1; /* GPR_TRUE */
+ if (!parse(&ith_uri, (struct sockaddr_storage *)(&r->addresses->addresses[i]
+ .address.addr),
+ &r->addresses->addresses[i].address.len)) {
+ errors_found = true;
}
gpr_free(part_str);
+ r->addresses->addresses[i].is_balancer = lb_enabled;
if (errors_found) break;
}
@@ -246,7 +235,7 @@ static grpc_resolver *sockaddr_create(
gpr_slice_unref(path_slice);
if (errors_found) {
gpr_free(r->lb_policy_name);
- grpc_resolved_addresses_destroy(r->addresses);
+ grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r);
return NULL;
}
diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.c b/src/core/lib/security/credentials/plugin/plugin_credentials.c
index 824ff081dc..905de3723e 100644
--- a/src/core/lib/security/credentials/plugin/plugin_credentials.c
+++ b/src/core/lib/security/credentials/plugin/plugin_credentials.c
@@ -37,6 +37,7 @@
#include "src/core/lib/surface/api_trace.h"
+#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
@@ -71,17 +72,33 @@ static void plugin_md_request_metadata_ready(void *request,
error_details);
} else {
size_t i;
+ bool seen_illegal_header = false;
grpc_credentials_md *md_array = NULL;
- if (num_md > 0) {
+ for (i = 0; i < num_md; i++) {
+ if (!grpc_header_key_is_legal(md[i].key, strlen(md[i].key))) {
+ gpr_log(GPR_ERROR, "Plugin added invalid metadata key: %s", md[i].key);
+ seen_illegal_header = true;
+ break;
+ } else if (!grpc_is_binary_header(md[i].key, strlen(md[i].key)) &&
+ !grpc_header_nonbin_value_is_legal(md[i].value,
+ md[i].value_length)) {
+ gpr_log(GPR_ERROR, "Plugin added invalid metadata value.");
+ seen_illegal_header = true;
+ break;
+ }
+ }
+ if (seen_illegal_header) {
+ r->cb(&exec_ctx, r->user_data, NULL, 0, GRPC_CREDENTIALS_ERROR,
+ "Illegal metadata");
+ } else if (num_md > 0) {
md_array = gpr_malloc(num_md * sizeof(grpc_credentials_md));
for (i = 0; i < num_md; i++) {
md_array[i].key = gpr_slice_from_copied_string(md[i].key);
md_array[i].value =
gpr_slice_from_copied_buffer(md[i].value, md[i].value_length);
}
- }
- r->cb(&exec_ctx, r->user_data, md_array, num_md, GRPC_CREDENTIALS_OK, NULL);
- if (md_array != NULL) {
+ r->cb(&exec_ctx, r->user_data, md_array, num_md, GRPC_CREDENTIALS_OK,
+ NULL);
for (i = 0; i < num_md; i++) {
gpr_slice_unref(md_array[i].key);
gpr_slice_unref(md_array[i].value);
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index c35aaf680f..09790120d1 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
+using System.IO;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
@@ -149,8 +150,7 @@ namespace Grpc.Core.Internal.Tests
var writeTask = responseStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
- // TODO(jtattermusch): should we throw a different exception type instead?
- Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
+ Assert.ThrowsAsync(typeof(IOException), async () => await writeTask);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 98e27a17a1..616bc06d76 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -180,21 +180,74 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void ClientStreaming_WriteCompletionFailure()
+ public void ClientStreaming_WriteFailureThrowsRpcException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
- // TODO: maybe IOException or waiting for RPCException is more appropriate here.
- Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
+
+ // The write will wait for call to finish to receive the status code.
+ Assert.IsFalse(writeTask.IsCompleted);
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
null,
new Metadata());
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
+ Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteFailureThrowsRpcException2()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ var writeTask = requestStream.WriteAsync("request1");
+
+ fakeCall.UnaryResponseClientHandler(true,
+ CreateClientSideStatus(StatusCode.Internal),
+ null,
+ new Metadata());
+
+ fakeCall.SendCompletionHandler(false);
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
+ Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteFailureThrowsRpcException3()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ var writeTask = requestStream.WriteAsync("request1");
+ fakeCall.SendCompletionHandler(false);
+
+ // Until the delayed write completion has been triggered,
+ // we still act as if there was an active write.
+ Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2"));
+
+ fakeCall.UnaryResponseClientHandler(true,
+ CreateClientSideStatus(StatusCode.Internal),
+ null,
+ new Metadata());
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
+ Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
+
+ // Following attempts to write keep delivering the same status
+ var ex2 = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("after call has finished"));
+ Assert.AreEqual(StatusCode.Internal, ex2.Status.StatusCode);
+
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
}
@@ -416,6 +469,49 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
+ public void DuplexStreaming_WriteFailureThrowsRpcException()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var writeTask = requestStream.WriteAsync("request1");
+ fakeCall.SendCompletionHandler(false);
+
+ // The write will wait for call to finish to receive the status code.
+ Assert.IsFalse(writeTask.IsCompleted);
+
+ var readTask = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
+ Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
+
+ AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied);
+ }
+
+ [Test]
+ public void DuplexStreaming_WriteFailureThrowsRpcException2()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var writeTask = requestStream.WriteAsync("request1");
+
+ var readTask = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.PermissionDenied));
+ fakeCall.SendCompletionHandler(false);
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
+ Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
+
+ AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied);
+ }
+
+ [Test]
public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{
asyncCall.StartDuplexStreamingCall();
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index f549c52876..9abaf1120f 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -341,6 +341,11 @@ namespace Grpc.Core.Internal
get { return true; }
}
+ protected override Exception GetRpcExceptionClientOnly()
+ {
+ return new RpcException(finishedStatus.Value.Status);
+ }
+
protected override Task CheckSendAllowedOrEarlyResult()
{
var earlyResult = CheckSendPreconditionsClientSide();
@@ -452,6 +457,7 @@ namespace Grpc.Core.Internal
using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
{
+ TaskCompletionSource<object> delayedStreamingWriteTcs = null;
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);
@@ -465,13 +471,23 @@ namespace Grpc.Core.Internal
}
finishedStatus = receivedStatus;
+ if (isStreamingWriteCompletionDelayed)
+ {
+ delayedStreamingWriteTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
+ }
+
ReleaseResourcesIfPossible();
}
responseHeadersTcs.SetResult(responseHeaders);
- var status = receivedStatus.Status;
+ if (delayedStreamingWriteTcs != null)
+ {
+ delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
+ }
+ var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
@@ -490,16 +506,27 @@ namespace Grpc.Core.Internal
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true.
+ TaskCompletionSource<object> delayedStreamingWriteTcs = null;
+
lock (myLock)
{
finished = true;
finishedStatus = receivedStatus;
+ if (isStreamingWriteCompletionDelayed)
+ {
+ delayedStreamingWriteTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
+ }
ReleaseResourcesIfPossible();
}
- var status = receivedStatus.Status;
+ if (delayedStreamingWriteTcs != null)
+ {
+ delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
+ }
+ var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
streamingCallFinishedTcs.SetException(new RpcException(status));
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index eb9c3ea62d..9f9d260e7e 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -69,6 +69,7 @@ namespace Grpc.Core.Internal
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
+ protected bool isStreamingWriteCompletionDelayed; // Only used for the client side.
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
@@ -200,6 +201,12 @@ namespace Grpc.Core.Internal
get;
}
+ /// <summary>
+ /// Returns an exception to throw for a failed send operation.
+ /// It is only allowed to call this method for a call that has already finished.
+ /// </summary>
+ protected abstract Exception GetRpcExceptionClientOnly();
+
private void ReleaseResources()
{
if (call != null)
@@ -252,18 +259,43 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendFinished(bool success)
{
+ bool delayCompletion = false;
TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
- origTcs = streamingWriteTcs;
- streamingWriteTcs = null;
+ if (!success && !finished && IsClient) {
+ // We should be setting this only once per call, following writes will be short circuited
+ // because they cannot start until the entire call finishes.
+ GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed);
+
+ // leave streamingWriteTcs set, it will be completed once call finished.
+ isStreamingWriteCompletionDelayed = true;
+ delayCompletion = true;
+ }
+ else
+ {
+ origTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
+ }
ReleaseResourcesIfPossible();
}
if (!success)
{
- origTcs.SetException(new InvalidOperationException("Send failed"));
+ if (!delayCompletion)
+ {
+ if (IsClient)
+ {
+ GrpcPreconditions.CheckState(finished); // implied by !success && !delayCompletion && IsClient
+ origTcs.SetException(GetRpcExceptionClientOnly());
+ }
+ else
+ {
+ origTcs.SetException (new IOException("Error sending from server."));
+ }
+ }
+ // if delayCompletion == true, postpone SetException until call finishes.
}
else
{
@@ -283,7 +315,7 @@ namespace Grpc.Core.Internal
if (!success)
{
- sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server."));
+ sendStatusFromServerTcs.SetException(new IOException("Error sending status from server."));
}
else
{
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 56c23ba3ef..50fdfa9006 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -33,6 +33,7 @@
using System;
using System.Diagnostics;
+using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
@@ -193,6 +194,11 @@ namespace Grpc.Core.Internal
get { return false; }
}
+ protected override Exception GetRpcExceptionClientOnly()
+ {
+ throw new InvalidOperationException("Call be only called for client calls");
+ }
+
protected override void OnAfterReleaseResources()
{
server.RemoveCallReference(this);
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 66614922f1..b1d3ce92f6 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -80,11 +80,14 @@ const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
public:
- static const char kMetadataKey[];
+ static const char kGoodMetadataKey[];
+ static const char kBadMetadataKey[];
- TestMetadataCredentialsPlugin(grpc::string_ref metadata_value,
+ TestMetadataCredentialsPlugin(grpc::string_ref metadata_key,
+ grpc::string_ref metadata_value,
bool is_blocking, bool is_successful)
- : metadata_value_(metadata_value.data(), metadata_value.length()),
+ : metadata_key_(metadata_key.data(), metadata_key.length()),
+ metadata_value_(metadata_value.data(), metadata_value.length()),
is_blocking_(is_blocking),
is_successful_(is_successful) {}
@@ -99,7 +102,7 @@ class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
EXPECT_TRUE(metadata != nullptr);
if (is_successful_) {
- metadata->insert(std::make_pair(kMetadataKey, metadata_value_));
+ metadata->insert(std::make_pair(metadata_key_, metadata_value_));
return Status::OK;
} else {
return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
@@ -107,12 +110,16 @@ class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
}
private:
+ grpc::string metadata_key_;
grpc::string metadata_value_;
bool is_blocking_;
bool is_successful_;
};
-const char TestMetadataCredentialsPlugin::kMetadataKey[] = "TestPluginMetadata";
+const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
+ "TestPluginMetadata";
+const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
+ "test-plugin-metadata";
class TestAuthMetadataProcessor : public AuthMetadataProcessor {
public:
@@ -123,13 +130,17 @@ class TestAuthMetadataProcessor : public AuthMetadataProcessor {
std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
return MetadataCredentialsFromPlugin(
std::unique_ptr<MetadataCredentialsPlugin>(
- new TestMetadataCredentialsPlugin(kGoodGuy, is_blocking_, true)));
+ new TestMetadataCredentialsPlugin(
+ TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
+ is_blocking_, true)));
}
std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
return MetadataCredentialsFromPlugin(
std::unique_ptr<MetadataCredentialsPlugin>(
- new TestMetadataCredentialsPlugin("Mr Hyde", is_blocking_, true)));
+ new TestMetadataCredentialsPlugin(
+ TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
+ is_blocking_, true)));
}
// Interface implementation
@@ -142,7 +153,7 @@ class TestAuthMetadataProcessor : public AuthMetadataProcessor {
EXPECT_TRUE(context != nullptr);
EXPECT_TRUE(response_metadata != nullptr);
auto auth_md =
- auth_metadata.find(TestMetadataCredentialsPlugin::kMetadataKey);
+ auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
EXPECT_NE(auth_md, auth_metadata.end());
string_ref auth_md_value = auth_md->second;
if (auth_md_value == kGoodGuy) {
@@ -1322,6 +1333,40 @@ TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
EXPECT_TRUE(s.ok());
}
+TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ context.set_credentials(
+ MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
+ new TestMetadataCredentialsPlugin(
+ TestMetadataCredentialsPlugin::kBadMetadataKey,
+ "Does not matter, will fail the key is invalid.", false, true))));
+ request.set_message("Hello");
+
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
+}
+
+TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) {
+ ResetStub();
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ context.set_credentials(
+ MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
+ new TestMetadataCredentialsPlugin(
+ TestMetadataCredentialsPlugin::kGoodMetadataKey,
+ "With illegal \n value.", false, true))));
+ request.set_message("Hello");
+
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
+}
+
TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
ResetStub();
EchoRequest request;
@@ -1330,6 +1375,7 @@ TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
context.set_credentials(
MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
new TestMetadataCredentialsPlugin(
+ TestMetadataCredentialsPlugin::kGoodMetadataKey,
"Does not matter, will fail anyway (see 3rd param)", false,
false))));
request.set_message("Hello");
@@ -1388,6 +1434,7 @@ TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
context.set_credentials(
MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
new TestMetadataCredentialsPlugin(
+ TestMetadataCredentialsPlugin::kGoodMetadataKey,
"Does not matter, will fail anyway (see 3rd param)", true,
false))));
request.set_message("Hello");