aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/client_channel/README.md21
-rw-r--r--src/core/ext/client_channel/client_channel.c61
-rw-r--r--src/core/ext/client_channel/client_channel_factory.c4
-rw-r--r--src/core/ext/client_channel/client_channel_factory.h9
-rw-r--r--src/core/ext/client_channel/lb_policy_factory.c78
-rw-r--r--src/core/ext/client_channel/lb_policy_factory.h40
-rw-r--r--src/core/ext/client_channel/resolver.c3
-rw-r--r--src/core/ext/client_channel/resolver.h18
-rw-r--r--src/core/ext/client_channel/resolver_factory.h7
-rw-r--r--src/core/ext/client_channel/resolver_registry.c12
-rw-r--r--src/core/ext/client_channel/resolver_registry.h7
-rw-r--r--src/core/ext/client_channel/resolver_result.c94
-rw-r--r--src/core/ext/client_channel/resolver_result.h68
-rw-r--r--src/core/ext/client_channel/subchannel.c2
-rw-r--r--src/core/ext/client_channel/subchannel.h2
-rw-r--r--src/core/ext/client_channel/subchannel_index.c6
-rw-r--r--src/core/ext/client_channel/subchannel_index.h4
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c106
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c30
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c29
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c50
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c35
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c63
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c73
-rw-r--r--src/core/lib/channel/channel_args.c55
-rw-r--r--src/core/lib/channel/channel_args.h11
-rw-r--r--src/core/lib/channel/message_size_filter.c2
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c6
-rw-r--r--src/core/lib/iomgr/tcp_uv.c3
-rw-r--r--src/core/lib/transport/method_config.c (renamed from src/core/ext/client_channel/method_config.c)2
-rw-r--r--src/core/lib/transport/method_config.h (renamed from src/core/ext/client_channel/method_config.h)6
-rw-r--r--src/node/performance/benchmark_client_express.js291
-rw-r--r--src/node/performance/benchmark_server.js5
-rw-r--r--src/node/performance/benchmark_server_express.js109
-rw-r--r--src/node/performance/worker.js10
-rw-r--r--src/node/performance/worker_service_impl.js228
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py3
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb146
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb12
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb43
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb22
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb123
-rw-r--r--src/ruby/spec/generic/rpc_desc_spec.rb24
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb1
-rw-r--r--src/ruby/spec/pb/health/checker_spec.rb38
45 files changed, 1267 insertions, 695 deletions
diff --git a/src/core/ext/client_channel/README.md b/src/core/ext/client_channel/README.md
index eda01e3e71..7c209db12e 100644
--- a/src/core/ext/client_channel/README.md
+++ b/src/core/ext/client_channel/README.md
@@ -5,28 +5,27 @@ This library provides high level configuration machinery to construct client
channels and load balance between them.
Each grpc_channel is created with a grpc_resolver. It is the resolver's duty
-to resolve a name into configuration data for the channel. Such configuration
-data might include:
+to resolve a name into a set of arguments for the channel. Such arguments
+might include:
- a list of (ip, port) addresses to connect to
- a load balancing policy to decide which server to send a request to
- a set of filters to mutate outgoing requests (say, by adding metadata)
-The resolver provides this data as a stream of grpc_resolver_result objects to
-the channel. We represent configuration as a stream so that it can be changed
-by the resolver during execution, by reacting to external events (such as a
-new configuration file being pushed to some store).
+The resolver provides this data as a stream of grpc_channel_args objects to
+the channel. We represent arguments as a stream so that they can be changed
+by the resolver during execution, by reacting to external events (such as
+new service configuration data being pushed to some store).
Load Balancing
--------------
-Load balancing configuration is provided by a grpc_lb_policy object, stored as
-part of grpc_resolver_result.
+Load balancing configuration is provided by a grpc_lb_policy object.
-The primary job of the load balancing policies is to pick a target server given only the
-initial metadata for a request. It does this by providing a grpc_subchannel
-object to the owning channel.
+The primary job of the load balancing policies is to pick a target server
+given only the initial metadata for a request. It does this by providing
+a grpc_subchannel object to the owning channel.
Sub-Channels
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 55bb877576..80b4f048c2 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -43,7 +43,6 @@
#include <grpc/support/useful.h>
#include "src/core/ext/client_channel/lb_policy_registry.h"
-#include "src/core/ext/client_channel/method_config.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
@@ -56,6 +55,7 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/method_config.h"
#include "src/core/lib/transport/static_metadata.h"
/* Client channel implementation */
@@ -127,7 +127,7 @@ typedef struct client_channel_channel_data {
/** maps method names to method_parameters structs */
grpc_mdstr_hash_table *method_params_table;
/** incoming resolver result - set by resolver.next() */
- grpc_resolver_result *resolver_result;
+ grpc_channel_args *resolver_result;
/** a list of closures that are all waiting for config to come in */
grpc_closure_list waiting_for_config_closures;
/** resolver callback */
@@ -232,35 +232,42 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
if (chand->resolver_result != NULL) {
grpc_lb_policy_args lb_policy_args;
- lb_policy_args.server_name =
- grpc_resolver_result_get_server_name(chand->resolver_result);
- lb_policy_args.addresses =
- grpc_resolver_result_get_addresses(chand->resolver_result);
- lb_policy_args.additional_args =
- grpc_resolver_result_get_lb_policy_args(chand->resolver_result);
+ lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
+ // Find LB policy name.
+ const char *lb_policy_name = NULL;
+ const grpc_arg *channel_arg =
+ grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_POLICY_NAME);
+ if (channel_arg != NULL) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ lb_policy_name = channel_arg->value.string;
+ }
// Special case: If all of the addresses are balancer addresses,
// assume that we should use the grpclb policy, regardless of what the
// resolver actually specified.
- const char *lb_policy_name =
- grpc_resolver_result_get_lb_policy_name(chand->resolver_result);
- bool found_backend_address = false;
- for (size_t i = 0; i < lb_policy_args.addresses->num_addresses; ++i) {
- if (!lb_policy_args.addresses->addresses[i].is_balancer) {
- found_backend_address = true;
- break;
+ channel_arg =
+ grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_ADDRESSES);
+ if (channel_arg != NULL) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
+ grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
+ bool found_backend_address = false;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (!addresses->addresses[i].is_balancer) {
+ found_backend_address = true;
+ break;
+ }
}
- }
- if (!found_backend_address) {
- if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
- gpr_log(GPR_INFO,
- "resolver requested LB policy %s but provided only balancer "
- "addresses, no backend addresses -- forcing use of grpclb LB "
- "policy",
- (lb_policy_name == NULL ? "(none)" : lb_policy_name));
+ if (!found_backend_address) {
+ if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) {
+ gpr_log(GPR_INFO,
+ "resolver requested LB policy %s but provided only balancer "
+ "addresses, no backend addresses -- forcing use of grpclb LB "
+ "policy",
+ lb_policy_name);
+ }
+ lb_policy_name = "grpclb";
}
- lb_policy_name = "grpclb";
}
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
@@ -274,15 +281,15 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
- const grpc_arg *channel_arg = grpc_channel_args_find(
- lb_policy_args.additional_args, GRPC_ARG_SERVICE_CONFIG);
+ channel_arg =
+ grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_SERVICE_CONFIG);
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
method_params_table = grpc_method_config_table_convert(
(grpc_method_config_table *)channel_arg->value.pointer.p,
method_config_convert_value, &method_parameters_vtable);
}
- grpc_resolver_result_unref(exec_ctx, chand->resolver_result);
+ grpc_channel_args_destroy(chand->resolver_result);
chand->resolver_result = NULL;
}
diff --git a/src/core/ext/client_channel/client_channel_factory.c b/src/core/ext/client_channel/client_channel_factory.c
index db1cc9093c..4900832d57 100644
--- a/src/core/ext/client_channel/client_channel_factory.c
+++ b/src/core/ext/client_channel/client_channel_factory.c
@@ -44,14 +44,14 @@ void grpc_client_channel_factory_unref(grpc_exec_ctx* exec_ctx,
grpc_subchannel* grpc_client_channel_factory_create_subchannel(
grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
- grpc_subchannel_args* args) {
+ const grpc_subchannel_args* args) {
return factory->vtable->create_subchannel(exec_ctx, factory, args);
}
grpc_channel* grpc_client_channel_factory_create_channel(
grpc_exec_ctx* exec_ctx, grpc_client_channel_factory* factory,
const char* target, grpc_client_channel_type type,
- grpc_channel_args* args) {
+ const grpc_channel_args* args) {
return factory->vtable->create_client_channel(exec_ctx, factory, target, type,
args);
}
diff --git a/src/core/ext/client_channel/client_channel_factory.h b/src/core/ext/client_channel/client_channel_factory.h
index 28828c2eb6..2b8fc577b3 100644
--- a/src/core/ext/client_channel/client_channel_factory.h
+++ b/src/core/ext/client_channel/client_channel_factory.h
@@ -60,12 +60,12 @@ struct grpc_client_channel_factory_vtable {
void (*unref)(grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory);
grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx,
grpc_client_channel_factory *factory,
- grpc_subchannel_args *args);
+ const grpc_subchannel_args *args);
grpc_channel *(*create_client_channel)(grpc_exec_ctx *exec_ctx,
grpc_client_channel_factory *factory,
const char *target,
grpc_client_channel_type type,
- grpc_channel_args *args);
+ const grpc_channel_args *args);
};
void grpc_client_channel_factory_ref(grpc_client_channel_factory *factory);
@@ -75,11 +75,12 @@ void grpc_client_channel_factory_unref(grpc_exec_ctx *exec_ctx,
/** Create a new grpc_subchannel */
grpc_subchannel *grpc_client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
- grpc_subchannel_args *args);
+ const grpc_subchannel_args *args);
/** Create a new grpc_channel */
grpc_channel *grpc_client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *factory,
- const char *target, grpc_client_channel_type type, grpc_channel_args *args);
+ const char *target, grpc_client_channel_type type,
+ const grpc_channel_args *args);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_CLIENT_CHANNEL_FACTORY_H */
diff --git a/src/core/ext/client_channel/lb_policy_factory.c b/src/core/ext/client_channel/lb_policy_factory.c
index 45753e6942..8a474c8818 100644
--- a/src/core/ext/client_channel/lb_policy_factory.c
+++ b/src/core/ext/client_channel/lb_policy_factory.c
@@ -38,19 +38,20 @@
#include "src/core/ext/client_channel/lb_policy_factory.h"
-grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) {
+grpc_lb_addresses* grpc_lb_addresses_create(
+ size_t num_addresses, const grpc_lb_user_data_vtable* user_data_vtable) {
grpc_lb_addresses* addresses = gpr_malloc(sizeof(grpc_lb_addresses));
addresses->num_addresses = num_addresses;
+ addresses->user_data_vtable = user_data_vtable;
const size_t addresses_size = sizeof(grpc_lb_address) * num_addresses;
addresses->addresses = gpr_malloc(addresses_size);
memset(addresses->addresses, 0, addresses_size);
return addresses;
}
-grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses,
- void* (*user_data_copy)(void*)) {
- grpc_lb_addresses* new_addresses =
- grpc_lb_addresses_create(addresses->num_addresses);
+grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) {
+ grpc_lb_addresses* new_addresses = grpc_lb_addresses_create(
+ addresses->num_addresses, addresses->user_data_vtable);
memcpy(new_addresses->addresses, addresses->addresses,
sizeof(grpc_lb_address) * addresses->num_addresses);
for (size_t i = 0; i < addresses->num_addresses; ++i) {
@@ -58,9 +59,9 @@ grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses,
new_addresses->addresses[i].balancer_name =
gpr_strdup(new_addresses->addresses[i].balancer_name);
}
- if (user_data_copy != NULL) {
- new_addresses->addresses[i].user_data =
- user_data_copy(new_addresses->addresses[i].user_data);
+ if (new_addresses->addresses[i].user_data != NULL) {
+ new_addresses->addresses[i].user_data = addresses->user_data_vtable->copy(
+ new_addresses->addresses[i].user_data);
}
}
return new_addresses;
@@ -71,6 +72,7 @@ void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
bool is_balancer, char* balancer_name,
void* user_data) {
GPR_ASSERT(index < addresses->num_addresses);
+ if (user_data != NULL) GPR_ASSERT(addresses->user_data_vtable != NULL);
grpc_lb_address* target = &addresses->addresses[index];
memcpy(target->address.addr, address, address_len);
target->address.len = address_len;
@@ -79,18 +81,70 @@ void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
target->user_data = user_data;
}
-void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses,
- void (*user_data_destroy)(void*)) {
+int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1,
+ const grpc_lb_addresses* addresses2) {
+ if (addresses1->num_addresses > addresses2->num_addresses) return 1;
+ if (addresses1->num_addresses < addresses2->num_addresses) return -1;
+ if (addresses1->user_data_vtable > addresses2->user_data_vtable) return 1;
+ if (addresses1->user_data_vtable < addresses2->user_data_vtable) return -1;
+ for (size_t i = 0; i < addresses1->num_addresses; ++i) {
+ const grpc_lb_address* target1 = &addresses1->addresses[i];
+ const grpc_lb_address* target2 = &addresses2->addresses[i];
+ if (target1->address.len > target2->address.len) return 1;
+ if (target1->address.len < target2->address.len) return -1;
+ int retval = memcmp(target1->address.addr, target2->address.addr,
+ target1->address.len);
+ if (retval != 0) return retval;
+ if (target1->is_balancer > target2->is_balancer) return 1;
+ if (target1->is_balancer < target2->is_balancer) return -1;
+ const char* balancer_name1 =
+ target1->balancer_name != NULL ? target1->balancer_name : "";
+ const char* balancer_name2 =
+ target2->balancer_name != NULL ? target2->balancer_name : "";
+ retval = strcmp(balancer_name1, balancer_name2);
+ if (retval != 0) return retval;
+ if (addresses1->user_data_vtable != NULL) {
+ retval = addresses1->user_data_vtable->cmp(target1->user_data,
+ target2->user_data);
+ if (retval != 0) return retval;
+ }
+ }
+ return 0;
+}
+
+void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) {
for (size_t i = 0; i < addresses->num_addresses; ++i) {
gpr_free(addresses->addresses[i].balancer_name);
- if (user_data_destroy != NULL) {
- user_data_destroy(addresses->addresses[i].user_data);
+ if (addresses->addresses[i].user_data != NULL) {
+ addresses->user_data_vtable->destroy(addresses->addresses[i].user_data);
}
}
gpr_free(addresses->addresses);
gpr_free(addresses);
}
+static void* lb_addresses_copy(void* addresses) {
+ return grpc_lb_addresses_copy(addresses);
+}
+static void lb_addresses_destroy(void* addresses) {
+ grpc_lb_addresses_destroy(addresses);
+}
+static int lb_addresses_cmp(void* addresses1, void* addresses2) {
+ return grpc_lb_addresses_cmp(addresses1, addresses2);
+}
+static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = {
+ lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp};
+
+grpc_arg grpc_lb_addresses_create_channel_arg(
+ const grpc_lb_addresses* addresses) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_POINTER;
+ arg.key = GRPC_ARG_LB_ADDRESSES;
+ arg.value.pointer.p = (void*)addresses;
+ arg.value.pointer.vtable = &lb_addresses_arg_vtable;
+ return arg;
+}
+
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {
factory->vtable->ref(factory);
}
diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/client_channel/lb_policy_factory.h
index 3e53972445..e2b8080a32 100644
--- a/src/core/ext/client_channel/lb_policy_factory.h
+++ b/src/core/ext/client_channel/lb_policy_factory.h
@@ -59,19 +59,26 @@ typedef struct grpc_lb_address {
void *user_data;
} grpc_lb_address;
+typedef struct grpc_lb_user_data_vtable {
+ void *(*copy)(void *);
+ void (*destroy)(void *);
+ int (*cmp)(void *, void *);
+} grpc_lb_user_data_vtable;
+
typedef struct grpc_lb_addresses {
size_t num_addresses;
grpc_lb_address *addresses;
+ const grpc_lb_user_data_vtable *user_data_vtable;
} grpc_lb_addresses;
/** Returns a grpc_addresses struct with enough space for
- * \a num_addresses addresses. */
-grpc_lb_addresses *grpc_lb_addresses_create(size_t num_addresses);
+ \a num_addresses addresses. The \a user_data_vtable argument may be
+ NULL if no user data will be added. */
+grpc_lb_addresses *grpc_lb_addresses_create(
+ size_t num_addresses, const grpc_lb_user_data_vtable *user_data_vtable);
-/** Creates a copy of \a addresses. If \a user_data_copy is not NULL,
- * it will be invoked to copy the \a user_data field of each address. */
-grpc_lb_addresses *grpc_lb_addresses_copy(grpc_lb_addresses *addresses,
- void *(*user_data_copy)(void *));
+/** Creates a copy of \a addresses. */
+grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses);
/** Sets the value of the address at index \a index of \a addresses.
* \a address is a socket address of length \a address_len.
@@ -81,20 +88,21 @@ void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index,
bool is_balancer, char *balancer_name,
void *user_data);
-/** Destroys \a addresses. If \a user_data_destroy is not NULL, it will
- * be invoked to destroy the \a user_data field of each address. */
-void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses,
- void (*user_data_destroy)(void *));
+/** Compares \a addresses1 and \a addresses2. */
+int grpc_lb_addresses_cmp(const grpc_lb_addresses *addresses1,
+ const grpc_lb_addresses *addresses2);
+
+/** Destroys \a addresses. */
+void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses);
+
+/** Returns a channel arg containing \a addresses. */
+grpc_arg grpc_lb_addresses_create_channel_arg(
+ const grpc_lb_addresses *addresses);
/** Arguments passed to LB policies. */
-/* TODO(roth, ctiller): Consider replacing this struct with
- grpc_channel_args. See comment in resolver_result.h for details. */
typedef struct grpc_lb_policy_args {
- const char *server_name;
- grpc_lb_addresses *addresses;
grpc_client_channel_factory *client_channel_factory;
- /* Can be used to pass implementation-specific parameters to the LB policy. */
- grpc_channel_args *additional_args;
+ grpc_channel_args *args;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/client_channel/resolver.c
index 0c26d95f2f..2ae4fe862e 100644
--- a/src/core/ext/client_channel/resolver.c
+++ b/src/core/ext/client_channel/resolver.c
@@ -76,7 +76,6 @@ void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx,
}
void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_resolver_result **result,
- grpc_closure *on_complete) {
+ grpc_channel_args **result, grpc_closure *on_complete) {
resolver->vtable->next(exec_ctx, resolver, result, on_complete);
}
diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h
index 0951613175..96ece92b9d 100644
--- a/src/core/ext/client_channel/resolver.h
+++ b/src/core/ext/client_channel/resolver.h
@@ -34,15 +34,13 @@
#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H
#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H
-#include "src/core/ext/client_channel/resolver_result.h"
#include "src/core/ext/client_channel/subchannel.h"
#include "src/core/lib/iomgr/iomgr.h"
typedef struct grpc_resolver grpc_resolver;
typedef struct grpc_resolver_vtable grpc_resolver_vtable;
-/** grpc_resolver provides grpc_resolver_result objects to grpc_channel
- objects */
+/** \a grpc_resolver provides \a grpc_channel_args objects to its caller */
struct grpc_resolver {
const grpc_resolver_vtable *vtable;
gpr_refcount refs;
@@ -53,7 +51,7 @@ struct grpc_resolver_vtable {
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_resolver_result **result, grpc_closure *on_complete);
+ grpc_channel_args **result, grpc_closure *on_complete);
};
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
@@ -81,14 +79,12 @@ void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver);
void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx,
grpc_resolver *resolver);
-/** Get the next client config. Called by the channel to fetch a new
- configuration. Expected to set *result with a new configuration,
- and then schedule on_complete for execution.
+/** Get the next result from the resolver. Expected to set \a *result with
+ new channel args and then schedule \a on_complete for execution.
- If resolution is fatally broken, set *result to NULL and
- schedule on_complete. */
+ If resolution is fatally broken, set \a *result to NULL and
+ schedule \a on_complete. */
void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_resolver_result **result,
- grpc_closure *on_complete);
+ grpc_channel_args **result, grpc_closure *on_complete);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */
diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/client_channel/resolver_factory.h
index 1ad018ce1f..4da42e84d2 100644
--- a/src/core/ext/client_channel/resolver_factory.h
+++ b/src/core/ext/client_channel/resolver_factory.h
@@ -41,13 +41,14 @@
typedef struct grpc_resolver_factory grpc_resolver_factory;
typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable;
-/** grpc_resolver provides grpc_resolver_result objects to grpc_channel
- objects */
struct grpc_resolver_factory {
const grpc_resolver_factory_vtable *vtable;
};
-typedef struct grpc_resolver_args { grpc_uri *uri; } grpc_resolver_args;
+typedef struct grpc_resolver_args {
+ grpc_uri *uri;
+ const grpc_channel_args *args;
+} grpc_resolver_args;
struct grpc_resolver_factory_vtable {
void (*ref)(grpc_resolver_factory *factory);
diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c
index ee43329874..d0f0fc3f33 100644
--- a/src/core/ext/client_channel/resolver_registry.c
+++ b/src/core/ext/client_channel/resolver_registry.c
@@ -131,14 +131,16 @@ static grpc_resolver_factory *resolve_factory(const char *target,
return factory;
}
-grpc_resolver *grpc_resolver_create(const char *target) {
+grpc_resolver *grpc_resolver_create(const char *target,
+ const grpc_channel_args *args) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver;
- grpc_resolver_args args;
- memset(&args, 0, sizeof(args));
- args.uri = uri;
- resolver = grpc_resolver_factory_create_resolver(factory, &args);
+ grpc_resolver_args resolver_args;
+ memset(&resolver_args, 0, sizeof(resolver_args));
+ resolver_args.uri = uri;
+ resolver_args.args = args;
+ resolver = grpc_resolver_factory_create_resolver(factory, &resolver_args);
grpc_uri_destroy(uri);
return resolver;
}
diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h
index 0a39d4a187..2a95a669f0 100644
--- a/src/core/ext/client_channel/resolver_registry.h
+++ b/src/core/ext/client_channel/resolver_registry.h
@@ -57,8 +57,11 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
was not NULL).
If a resolver factory was found, use it to instantiate a resolver and
return it.
- If a resolver factory was not found, return NULL. */
-grpc_resolver *grpc_resolver_create(const char *target);
+ If a resolver factory was not found, return NULL.
+ \a args is a set of channel arguments to be included in the result
+ (typically the set of arguments passed in from the client API). */
+grpc_resolver *grpc_resolver_create(const char *target,
+ const grpc_channel_args *args);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */
diff --git a/src/core/ext/client_channel/resolver_result.c b/src/core/ext/client_channel/resolver_result.c
deleted file mode 100644
index ab1c215955..0000000000
--- a/src/core/ext/client_channel/resolver_result.c
+++ /dev/null
@@ -1,94 +0,0 @@
-//
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-#include "src/core/ext/client_channel/resolver_result.h"
-
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/string_util.h>
-
-#include "src/core/lib/channel/channel_args.h"
-
-struct grpc_resolver_result {
- gpr_refcount refs;
- char* server_name;
- grpc_lb_addresses* addresses;
- char* lb_policy_name;
- grpc_channel_args* lb_policy_args;
-};
-
-grpc_resolver_result* grpc_resolver_result_create(
- const char* server_name, grpc_lb_addresses* addresses,
- const char* lb_policy_name, grpc_channel_args* lb_policy_args) {
- grpc_resolver_result* result = gpr_malloc(sizeof(*result));
- memset(result, 0, sizeof(*result));
- gpr_ref_init(&result->refs, 1);
- result->server_name = gpr_strdup(server_name);
- result->addresses = addresses;
- result->lb_policy_name = gpr_strdup(lb_policy_name);
- result->lb_policy_args = lb_policy_args;
- return result;
-}
-
-void grpc_resolver_result_ref(grpc_resolver_result* result) {
- gpr_ref(&result->refs);
-}
-
-void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
- grpc_resolver_result* result) {
- if (gpr_unref(&result->refs)) {
- gpr_free(result->server_name);
- grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */);
- gpr_free(result->lb_policy_name);
- grpc_channel_args_destroy(result->lb_policy_args);
- gpr_free(result);
- }
-}
-
-const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result) {
- return result->server_name;
-}
-
-grpc_lb_addresses* grpc_resolver_result_get_addresses(
- grpc_resolver_result* result) {
- return result->addresses;
-}
-
-const char* grpc_resolver_result_get_lb_policy_name(
- grpc_resolver_result* result) {
- return result->lb_policy_name;
-}
-
-grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
- grpc_resolver_result* result) {
- return result->lb_policy_args;
-}
diff --git a/src/core/ext/client_channel/resolver_result.h b/src/core/ext/client_channel/resolver_result.h
deleted file mode 100644
index 5421fd9470..0000000000
--- a/src/core/ext/client_channel/resolver_result.h
+++ /dev/null
@@ -1,68 +0,0 @@
-//
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_RESULT_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_RESULT_H
-
-#include "src/core/ext/client_channel/lb_policy_factory.h"
-
-// TODO(roth, ctiller): In the long term, we are considering replacing
-// the resolver_result data structure with grpc_channel_args. The idea is
-// that the resolver will return a set of channel args that contains the
-// information that is currently in the resolver_result struct. For
-// example, there will be specific args indicating the set of addresses
-// and the name of the LB policy to instantiate. Note that if we did
-// this, we would probably want to change the data structure of
-// grpc_channel_args such to a hash table or AVL or some other data
-// structure that does not require linear search to find keys.
-
-/// Results reported from a grpc_resolver.
-typedef struct grpc_resolver_result grpc_resolver_result;
-
-/// Takes ownership of \a addresses and \a lb_policy_args.
-grpc_resolver_result* grpc_resolver_result_create(
- const char* server_name, grpc_lb_addresses* addresses,
- const char* lb_policy_name, grpc_channel_args* lb_policy_args);
-
-void grpc_resolver_result_ref(grpc_resolver_result* result);
-void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
- grpc_resolver_result* result);
-
-/// Accessors. Caller does NOT take ownership of results.
-const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result);
-grpc_lb_addresses* grpc_resolver_result_get_addresses(
- grpc_resolver_result* result);
-const char* grpc_resolver_result_get_lb_policy_name(
- grpc_resolver_result* result);
-grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
- grpc_resolver_result* result);
-
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_RESULT_H */
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index 157f78877f..ab6aede23a 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -297,7 +297,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
- grpc_subchannel_args *args) {
+ const grpc_subchannel_args *args) {
grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args);
grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key);
if (c) {
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 0350d5574c..5652074074 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -173,6 +173,6 @@ struct grpc_subchannel_args {
/** create a subchannel given a connector */
grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
grpc_connector *connector,
- grpc_subchannel_args *args);
+ const grpc_subchannel_args *args);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H */
diff --git a/src/core/ext/client_channel/subchannel_index.c b/src/core/ext/client_channel/subchannel_index.c
index 7933760e0e..227013a7d7 100644
--- a/src/core/ext/client_channel/subchannel_index.c
+++ b/src/core/ext/client_channel/subchannel_index.c
@@ -73,7 +73,7 @@ static grpc_exec_ctx *current_ctx() {
}
static grpc_subchannel_key *create_key(
- grpc_connector *connector, grpc_subchannel_args *args,
+ grpc_connector *connector, const grpc_subchannel_args *args,
grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) {
grpc_subchannel_key *k = gpr_malloc(sizeof(*k));
k->connector = grpc_connector_ref(connector);
@@ -96,8 +96,8 @@ static grpc_subchannel_key *create_key(
return k;
}
-grpc_subchannel_key *grpc_subchannel_key_create(grpc_connector *connector,
- grpc_subchannel_args *args) {
+grpc_subchannel_key *grpc_subchannel_key_create(
+ grpc_connector *connector, const grpc_subchannel_args *args) {
return create_key(connector, args, grpc_channel_args_normalize);
}
diff --git a/src/core/ext/client_channel/subchannel_index.h b/src/core/ext/client_channel/subchannel_index.h
index 5b3af6b054..a67bd5e219 100644
--- a/src/core/ext/client_channel/subchannel_index.h
+++ b/src/core/ext/client_channel/subchannel_index.h
@@ -43,8 +43,8 @@
typedef struct grpc_subchannel_key grpc_subchannel_key;
/** Create a key that can be used to uniquely identify a subchannel */
-grpc_subchannel_key *grpc_subchannel_key_create(grpc_connector *con,
- grpc_subchannel_args *args);
+grpc_subchannel_key *grpc_subchannel_key_create(
+ grpc_connector *con, const grpc_subchannel_args *args);
/** Destroy a subchannel key */
void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 36f0a90bb8..6da4febf26 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -343,6 +343,21 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
return true;
}
+/* vtable for LB tokens in grpc_lb_addresses. */
+static void *lb_token_copy(void *token) {
+ return token == NULL ? NULL : GRPC_MDELEM_REF(token);
+}
+static void lb_token_destroy(void *token) {
+ if (token != NULL) GRPC_MDELEM_UNREF(token);
+}
+static int lb_token_cmp(void *token1, void *token2) {
+ if (token1 > token2) return 1;
+ if (token1 < token2) return -1;
+ return 0;
+}
+static const grpc_lb_user_data_vtable lb_token_vtable = {
+ lb_token_copy, lb_token_destroy, lb_token_cmp};
+
/* Returns addresses extracted from \a serverlist. */
static grpc_lb_addresses *process_serverlist(
const grpc_grpclb_serverlist *serverlist) {
@@ -354,7 +369,8 @@ static grpc_lb_addresses *process_serverlist(
}
if (num_valid == 0) return NULL;
- grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid);
+ grpc_lb_addresses *lb_addresses =
+ grpc_lb_addresses_create(num_valid, &lb_token_vtable);
/* second pass: actually populate the addresses and LB tokens (aka user data
* to the outside world) to be read by the RR policy during its creation.
@@ -415,11 +431,6 @@ static grpc_lb_addresses *process_serverlist(
return lb_addresses;
}
-/* A plugin for grpc_lb_addresses_destroy that unrefs the LB token metadata. */
-static void lb_token_destroy(void *token) {
- if (token != NULL) GRPC_MDELEM_UNREF(token);
-}
-
/* perform a pick over \a rr_policy. Given that a pick can return immediately
* (ignoring its completion callback) we need to perform the cleanups this
* callback would be otherwise resposible for */
@@ -459,20 +470,27 @@ static grpc_lb_policy *create_rr_locked(
glb_lb_policy *glb_policy) {
GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0);
+ if (glb_policy->addresses != NULL) {
+ /* dispose of the previous version */
+ grpc_lb_addresses_destroy(glb_policy->addresses);
+ }
+ glb_policy->addresses = process_serverlist(serverlist);
+
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
- args.server_name = glb_policy->server_name;
args.client_channel_factory = glb_policy->cc_factory;
- args.addresses = process_serverlist(serverlist);
- args.additional_args = glb_policy->args;
- grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
+ // Replace the LB addresses in the channel args that we pass down to
+ // the subchannel.
+ static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES};
+ const grpc_arg arg =
+ grpc_lb_addresses_create_channel_arg(glb_policy->addresses);
+ args.args = grpc_channel_args_copy_and_add_and_remove(
+ glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg,
+ 1);
- if (glb_policy->addresses != NULL) {
- /* dispose of the previous version */
- grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy);
- }
- glb_policy->addresses = args.addresses;
+ grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
+ grpc_channel_args_destroy(args.args);
return rr;
}
@@ -568,6 +586,12 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
+ /* Get server name. */
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
+ const char *server_name =
+ arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL;
+
/* Count the number of gRPC-LB addresses. There must be at least one.
* TODO(roth): For now, we ignore non-balancer addresses, but in the
* future, we may change the behavior such that we fall back to using
@@ -575,24 +599,27 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* time, this should be changed to allow a list with no balancer addresses,
* since the resolver might fail to return a balancer address even when
* this is the right LB policy to use. */
+ arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
+ grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_grpclb_addrs = 0;
- for (size_t i = 0; i < args->addresses->num_addresses; ++i) {
- if (args->addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
}
if (num_grpclb_addrs == 0) return NULL;
glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy));
memset(glb_policy, 0, sizeof(*glb_policy));
- /* All input addresses in args->addresses come from a resolver that claims
+ /* All input addresses in addresses come from a resolver that claims
* they are LB services. It's the resolver's responsibility to make sure
* this
* policy is only instantiated and used in that case.
*
* Create a client channel over them to communicate with a LB service */
- glb_policy->server_name = gpr_strdup(args->server_name);
+ glb_policy->server_name = gpr_strdup(server_name);
glb_policy->cc_factory = args->client_channel_factory;
- glb_policy->args = grpc_channel_args_copy(args->additional_args);
+ glb_policy->args = grpc_channel_args_copy(args->args);
GPR_ASSERT(glb_policy->cc_factory != NULL);
/* construct a target from the addresses in args, given in the form
@@ -600,19 +627,19 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* TODO(dgq): support mixed ip version */
char **addr_strs = gpr_malloc(sizeof(char *) * num_grpclb_addrs);
size_t addr_index = 0;
- for (size_t i = 0; i < args->addresses->num_addresses; i++) {
- if (args->addresses->addresses[i].user_data != NULL) {
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
- if (args->addresses->addresses[i].is_balancer) {
+ if (addresses->addresses[i].is_balancer) {
if (addr_index == 0) {
addr_strs[addr_index++] =
- grpc_sockaddr_to_uri(&args->addresses->addresses[i].address);
+ grpc_sockaddr_to_uri(&addresses->addresses[i].address);
} else {
- GPR_ASSERT(grpc_sockaddr_to_string(
- &addr_strs[addr_index++],
- &args->addresses->addresses[i].address, true) > 0);
+ GPR_ASSERT(grpc_sockaddr_to_string(&addr_strs[addr_index++],
+ &addresses->addresses[i].address,
+ true) > 0);
}
}
}
@@ -620,10 +647,29 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
char *target_uri_str = gpr_strjoin_sep((const char **)addr_strs,
num_grpclb_addrs, ",", &uri_path_len);
- /* will pick using pick_first */
+ /* Create a channel to talk to the LBs.
+ *
+ * We strip out the channel arg for the LB policy name, since we want
+ * to use the default (pick_first) in this case.
+ *
+ * We also strip out the channel arg for the resolved addresses, since
+ * that will be generated by the name resolver used in the LB channel.
+ * Note that the LB channel will use the sockaddr resolver, so this
+ * won't actually generate a query to DNS (or some other name service).
+ * However, the addresses returned by the sockaddr resolver will have
+ * is_balancer=false, whereas our own addresses have is_balancer=true.
+ * We need the LB channel to return addresses with is_balancer=false
+ * so that it does not wind up recursively using the grpclb LB policy,
+ * as per the special case logic in client_channel.c.
+ */
+ static const char *keys_to_remove[] = {GRPC_ARG_LB_POLICY_NAME,
+ GRPC_ARG_LB_ADDRESSES};
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_remove(
+ args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove));
glb_policy->lb_channel = grpc_client_channel_factory_create_channel(
exec_ctx, glb_policy->cc_factory, target_uri_str,
- GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL);
+ GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args);
+ grpc_channel_args_destroy(new_args);
gpr_free(target_uri_str);
for (size_t i = 0; i < num_grpclb_addrs; i++) {
@@ -664,7 +710,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
gpr_mu_destroy(&glb_policy->mu);
- grpc_lb_addresses_destroy(glb_policy->addresses, lb_token_destroy);
+ grpc_lb_addresses_destroy(glb_policy->addresses);
gpr_free(glb_policy);
}
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 7b53f8813f..5d3433df74 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -34,7 +34,9 @@
#include <string.h>
#include <grpc/support/alloc.h>
+
#include "src/core/ext/client_channel/lb_policy_registry.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/transport/connectivity_state.h"
typedef struct pending_pick {
@@ -432,14 +434,22 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
- GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
+ /* Get server name. */
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
+ const char *server_name =
+ arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL;
+
/* Find the number of backend addresses. We ignore balancer
* addresses, since we don't know how to handle them. */
+ arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
+ grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0;
- for (size_t i = 0; i < args->addresses->num_addresses; i++) {
- if (!args->addresses->addresses[i].is_balancer) ++num_addrs;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ if (!addresses->addresses[i].is_balancer) ++num_addrs;
}
if (num_addrs == 0) return NULL;
@@ -450,21 +460,21 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs);
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */
- if (args->addresses->addresses[i].is_balancer) continue;
+ if (addresses->addresses[i].is_balancer) continue;
- if (args->addresses->addresses[i].user_data != NULL) {
+ if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
}
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
/* server_name will be copied as part of the subchannel creation. This makes
- * the copying of args->server_name (a borrowed pointer) OK. */
- sc_args.server_name = args->server_name;
- sc_args.addr = &args->addresses->addresses[i].address;
- sc_args.args = args->additional_args;
+ * the copying of server_name (a borrowed pointer) OK. */
+ sc_args.server_name = server_name;
+ sc_args.addr = &addresses->addresses[i].address;
+ sc_args.args = args->args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 3073c13e67..c0743b00e8 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -64,6 +64,7 @@
#include <grpc/support/alloc.h>
#include "src/core/ext/client_channel/lb_policy_registry.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/static_metadata.h"
@@ -598,14 +599,22 @@ static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
- GPR_ASSERT(args->addresses != NULL);
GPR_ASSERT(args->client_channel_factory != NULL);
+ /* Get server name. */
+ const grpc_arg *arg =
+ grpc_channel_args_find(args->args, GRPC_ARG_SERVER_NAME);
+ const char *server_name =
+ arg != NULL && arg->type == GRPC_ARG_STRING ? arg->value.string : NULL;
+
/* Find the number of backend addresses. We ignore balancer
* addresses, since we don't know how to handle them. */
+ arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
+ GPR_ASSERT(arg != NULL && arg->type == GRPC_ARG_POINTER);
+ grpc_lb_addresses *addresses = arg->value.pointer.p;
size_t num_addrs = 0;
- for (size_t i = 0; i < args->addresses->num_addresses; i++) {
- if (!args->addresses->addresses[i].is_balancer) ++num_addrs;
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
+ if (!addresses->addresses[i].is_balancer) ++num_addrs;
}
if (num_addrs == 0) return NULL;
@@ -618,16 +627,16 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
grpc_subchannel_args sc_args;
size_t subchannel_idx = 0;
- for (size_t i = 0; i < args->addresses->num_addresses; i++) {
+ for (size_t i = 0; i < addresses->num_addresses; i++) {
/* Skip balancer addresses, since we only know how to handle backends. */
- if (args->addresses->addresses[i].is_balancer) continue;
+ if (addresses->addresses[i].is_balancer) continue;
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
/* server_name will be copied as part of the subchannel creation. This makes
- * the copying of args->server_name (a borrowed pointer) OK. */
- sc_args.server_name = args->server_name;
- sc_args.addr = &args->addresses->addresses[i].address;
- sc_args.args = args->additional_args;
+ * the copying of server_name (a borrowed pointer) OK. */
+ sc_args.server_name = server_name;
+ sc_args.addr = &addresses->addresses[i].address;
+ sc_args.args = args->args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
@@ -639,7 +648,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sd->policy = p;
sd->index = subchannel_idx;
sd->subchannel = subchannel;
- sd->user_data = args->addresses->addresses[i].user_data;
+ sd->user_data = addresses->addresses[i].user_data;
++subchannel_idx;
grpc_closure_init(&sd->connectivity_changed_closure,
rr_connectivity_changed, sd);
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index b4ceee340e..958b8af8b2 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -40,6 +40,7 @@
#include "src/core/ext/client_channel/http_connect_handshaker.h"
#include "src/core/ext/client_channel/lb_policy_registry.h"
#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/backoff.h"
@@ -53,12 +54,12 @@
typedef struct {
/** base class: must be first */
grpc_resolver base;
- /** target name */
- char *target_name;
- /** name to resolve (usually the same as target_name) */
+ /** name to resolve */
char *name_to_resolve;
/** default port to use */
char *default_port;
+ /** channel args. */
+ grpc_channel_args *channel_args;
/** mutex guarding the rest of the state */
gpr_mu mu;
@@ -71,9 +72,9 @@ typedef struct {
/** pending next completion, or NULL */
grpc_closure *next_completion;
/** target result address for next completion */
- grpc_resolver_result **target_result;
+ grpc_channel_args **target_result;
/** current (fully resolved) result */
- grpc_resolver_result *resolved_result;
+ grpc_channel_args *resolved_result;
/** retry timer */
bool have_retry_timer;
grpc_timer retry_timer;
@@ -94,7 +95,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
- grpc_resolver_result **target_result,
+ grpc_channel_args **target_result,
grpc_closure *on_complete);
static const grpc_resolver_vtable dns_resolver_vtable = {
@@ -127,7 +128,7 @@ static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx,
}
static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_resolver_result **target_result,
+ grpc_channel_args **target_result,
grpc_closure *on_complete) {
dns_resolver *r = (dns_resolver *)resolver;
gpr_mu_lock(&r->mu);
@@ -162,22 +163,23 @@ static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg,
static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
dns_resolver *r = arg;
- grpc_resolver_result *result = NULL;
+ grpc_channel_args *result = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = false;
if (r->addresses != NULL) {
- grpc_lb_addresses *addresses =
- grpc_lb_addresses_create(r->addresses->naddrs);
+ grpc_lb_addresses *addresses = grpc_lb_addresses_create(
+ r->addresses->naddrs, NULL /* user_data_vtable */);
for (size_t i = 0; i < r->addresses->naddrs; ++i) {
grpc_lb_addresses_set_address(
addresses, i, &r->addresses->addrs[i].addr,
r->addresses->addrs[i].len, false /* is_balancer */,
NULL /* balancer_name */, NULL /* user_data */);
}
+ grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses);
+ result = grpc_channel_args_copy_and_add(r->channel_args, &new_arg, 1);
grpc_resolved_addresses_destroy(r->addresses);
- result = grpc_resolver_result_create(r->target_name, addresses,
- NULL /* lb_policy_name */, NULL);
+ grpc_lb_addresses_destroy(addresses);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
@@ -197,8 +199,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_timer_init(exec_ctx, &r->retry_timer, next_try, dns_on_retry_timer, r,
now);
}
- if (r->resolved_result) {
- grpc_resolver_result_unref(exec_ctx, r->resolved_result);
+ if (r->resolved_result != NULL) {
+ grpc_channel_args_destroy(r->resolved_result);
}
r->resolved_result = result;
r->resolved_version++;
@@ -222,10 +224,9 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
dns_resolver *r) {
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
- *r->target_result = r->resolved_result;
- if (r->resolved_result) {
- grpc_resolver_result_ref(r->resolved_result);
- }
+ *r->target_result = r->resolved_result == NULL
+ ? NULL
+ : grpc_channel_args_copy(r->resolved_result);
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
r->published_version = r->resolved_version;
@@ -235,12 +236,12 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
dns_resolver *r = (dns_resolver *)gr;
gpr_mu_destroy(&r->mu);
- if (r->resolved_result) {
- grpc_resolver_result_unref(exec_ctx, r->resolved_result);
+ if (r->resolved_result != NULL) {
+ grpc_channel_args_destroy(r->resolved_result);
}
- gpr_free(r->target_name);
gpr_free(r->name_to_resolve);
gpr_free(r->default_port);
+ grpc_channel_args_destroy(r->channel_args);
gpr_free(r);
}
@@ -260,9 +261,14 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
memset(r, 0, sizeof(*r));
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &dns_resolver_vtable);
- r->target_name = gpr_strdup(path);
r->name_to_resolve = proxy_name == NULL ? gpr_strdup(path) : proxy_name;
r->default_port = gpr_strdup(default_port);
+ grpc_arg server_name_arg;
+ server_name_arg.type = GRPC_ARG_STRING;
+ server_name_arg.key = GRPC_ARG_SERVER_NAME;
+ server_name_arg.value.string = (char *)path;
+ r->channel_args =
+ grpc_channel_args_copy_and_add(args->args, &server_name_arg, 1);
gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
return &r->base;
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 6148cbded9..5fec03a8e4 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -41,8 +41,10 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
+#include "src/core/ext/client_channel/lb_policy_factory.h"
#include "src/core/ext/client_channel/parse_address.h"
#include "src/core/ext/client_channel/resolver_registry.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/string.h"
@@ -50,10 +52,10 @@
typedef struct {
/** base class: must be first */
grpc_resolver base;
- /** the path component of the uri passed in */
- char *target_name;
/** the addresses that we've 'resolved' */
grpc_lb_addresses *addresses;
+ /** channel args */
+ grpc_channel_args *channel_args;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** have we published? */
@@ -61,7 +63,7 @@ typedef struct {
/** pending next completion, or NULL */
grpc_closure *next_completion;
/** target result address for next completion */
- grpc_resolver_result **target_result;
+ grpc_channel_args **target_result;
} sockaddr_resolver;
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
@@ -73,7 +75,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
grpc_resolver *r);
static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
- grpc_resolver_result **target_result,
+ grpc_channel_args **target_result,
grpc_closure *on_complete);
static const grpc_resolver_vtable sockaddr_resolver_vtable = {
@@ -102,7 +104,7 @@ static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx,
}
static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
- grpc_resolver_result **target_result,
+ grpc_channel_args **target_result,
grpc_closure *on_complete) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
gpr_mu_lock(&r->mu);
@@ -117,10 +119,9 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
sockaddr_resolver *r) {
if (r->next_completion != NULL && !r->published) {
r->published = true;
- *r->target_result = grpc_resolver_result_create(
- r->target_name,
- grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
- NULL /* lb_policy_name */, NULL /* lb_policy_args */);
+ grpc_arg arg = grpc_lb_addresses_create_channel_arg(r->addresses);
+ *r->target_result =
+ grpc_channel_args_copy_and_add(r->channel_args, &arg, 1);
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}
@@ -129,8 +130,8 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
- gpr_free(r->target_name);
- grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
+ grpc_lb_addresses_destroy(r->addresses);
+ grpc_channel_args_destroy(r->channel_args);
gpr_free(r);
}
@@ -173,7 +174,8 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args,
gpr_slice_buffer path_parts;
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
- grpc_lb_addresses *addresses = grpc_lb_addresses_create(path_parts.count);
+ grpc_lb_addresses *addresses =
+ grpc_lb_addresses_create(path_parts.count, NULL /* user_data_vtable */);
bool errors_found = false;
for (size_t i = 0; i < addresses->num_addresses; i++) {
grpc_uri ith_uri = *args->uri;
@@ -188,14 +190,19 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args,
gpr_slice_buffer_destroy(&path_parts);
gpr_slice_unref(path_slice);
if (errors_found) {
- grpc_lb_addresses_destroy(addresses, NULL /* user_data_destroy */);
+ grpc_lb_addresses_destroy(addresses);
return NULL;
}
/* Instantiate resolver. */
sockaddr_resolver *r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
- r->target_name = gpr_strdup(args->uri->path);
r->addresses = addresses;
+ grpc_arg server_name_arg;
+ server_name_arg.type = GRPC_ARG_STRING;
+ server_name_arg.key = GRPC_ARG_SERVER_NAME;
+ server_name_arg.value.string = args->uri->path;
+ r->channel_args =
+ grpc_channel_args_copy_and_add(args->args, &server_name_arg, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
return &r->base;
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index bd9eab8ed1..372fb7bf4c 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -52,6 +52,10 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
+//
+// connector
+//
+
typedef struct {
grpc_connector base;
gpr_refcount refs;
@@ -156,35 +160,20 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
-typedef struct {
- grpc_client_channel_factory base;
- gpr_refcount refs;
- grpc_channel_args *merge_args;
-} client_channel_factory;
+//
+// client_channel_factory
+//
static void client_channel_factory_ref(
- grpc_client_channel_factory *cc_factory) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- gpr_ref(&f->refs);
-}
+ grpc_client_channel_factory *cc_factory) {}
static void client_channel_factory_unref(
- grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- if (gpr_unref(&f->refs)) {
- grpc_channel_args_destroy(f->merge_args);
- gpr_free(f);
- }
-}
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {}
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
- grpc_subchannel_args *args) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
+ const grpc_subchannel_args *args) {
connector *c = gpr_malloc(sizeof(*c));
- grpc_channel_args *final_args =
- grpc_channel_args_merge(args->args, f->merge_args);
- grpc_subchannel *s;
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1);
@@ -196,23 +185,18 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_http_connect_handshaker_create(proxy_name, args->server_name));
gpr_free(proxy_name);
}
- args->args = final_args;
- s = grpc_subchannel_create(exec_ctx, &c->base, args);
+ grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
- grpc_channel_args_destroy(final_args);
return s;
}
static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
- grpc_channel_args *args) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
- grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
- grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
- GRPC_CLIENT_CHANNEL, NULL);
- grpc_channel_args_destroy(final_args);
- grpc_resolver *resolver = grpc_resolver_create(target);
+ const grpc_channel_args *args) {
+ grpc_channel *channel =
+ grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
+ grpc_resolver *resolver = grpc_resolver_create(target, args);
if (!resolver) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
"client_channel_factory_create_channel");
@@ -220,7 +204,7 @@ static grpc_channel *client_channel_factory_create_channel(
}
grpc_client_channel_finish_initialization(
- exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver, cc_factory);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
return channel;
@@ -231,6 +215,9 @@ static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
client_channel_factory_create_subchannel,
client_channel_factory_create_channel};
+static grpc_client_channel_factory client_channel_factory = {
+ &client_channel_factory_vtable};
+
/* Create a client channel:
Asynchronously: - resolve target
- connect to it (trying alternatives as presented)
@@ -244,16 +231,12 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
(target, args, reserved));
GPR_ASSERT(!reserved);
- client_channel_factory *f = gpr_malloc(sizeof(*f));
- memset(f, 0, sizeof(*f));
- f->base.vtable = &client_channel_factory_vtable;
- gpr_ref_init(&f->refs, 1);
- f->merge_args = grpc_channel_args_copy(args);
-
+ grpc_client_channel_factory *factory =
+ (grpc_client_channel_factory *)&client_channel_factory;
grpc_channel *channel = client_channel_factory_create_channel(
- &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
+ &exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, args);
- grpc_client_channel_factory_unref(&exec_ctx, &f->base);
+ grpc_client_channel_factory_unref(&exec_ctx, factory);
grpc_exec_ctx_finish(&exec_ctx);
return channel != NULL ? channel : grpc_lame_client_channel_create(
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index c5f4d49754..29095af207 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -54,6 +54,10 @@
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/tsi/transport_security_interface.h"
+//
+// connector
+//
+
typedef struct {
grpc_connector base;
gpr_refcount refs;
@@ -216,10 +220,13 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
static const grpc_connector_vtable connector_vtable = {
connector_ref, connector_unref, connector_shutdown, connector_connect};
+//
+// client_channel_factory
+//
+
typedef struct {
grpc_client_channel_factory base;
gpr_refcount refs;
- grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector;
} client_channel_factory;
@@ -235,19 +242,15 @@ static void client_channel_factory_unref(
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"client_channel_factory");
- grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
- grpc_subchannel_args *args) {
+ const grpc_subchannel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
- grpc_channel_args *final_args =
- grpc_channel_args_merge(args->args, f->merge_args);
- grpc_subchannel *s;
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
@@ -261,25 +264,19 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
}
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
- args->args = final_args;
- s = grpc_subchannel_create(exec_ctx, &c->base, args);
+ grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base);
- grpc_channel_args_destroy(final_args);
return s;
}
static grpc_channel *client_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
- grpc_channel_args *args) {
+ const grpc_channel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
-
- grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
- grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
- GRPC_CLIENT_CHANNEL, NULL);
- grpc_channel_args_destroy(final_args);
-
- grpc_resolver *resolver = grpc_resolver_create(target);
+ grpc_channel *channel =
+ grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL);
+ grpc_resolver *resolver = grpc_resolver_create(target, args);
if (resolver != NULL) {
grpc_client_channel_finish_initialization(
exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
@@ -289,7 +286,6 @@ static grpc_channel *client_channel_factory_create_channel(
"client_channel_factory_create_channel");
channel = NULL;
}
-
return channel;
}
@@ -306,19 +302,13 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
const char *target,
const grpc_channel_args *args,
void *reserved) {
- grpc_arg connector_arg;
- grpc_channel_args *args_copy;
- grpc_channel_args *new_args_from_connector;
- grpc_channel_security_connector *security_connector;
- client_channel_factory *f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
-
GRPC_API_TRACE(
"grpc_secure_channel_create(creds=%p, target=%s, args=%p, "
"reserved=%p)",
4, (creds, target, args, reserved));
GPR_ASSERT(reserved == NULL);
-
+ // Make sure security connector does not already exist in args.
if (grpc_find_security_connector_in_args(args) != NULL) {
gpr_log(GPR_ERROR, "Cannot set security context in channel args.");
grpc_exec_ctx_finish(&exec_ctx);
@@ -326,7 +316,9 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
target, GRPC_STATUS_INTERNAL,
"Security connector exists in channel args.");
}
-
+ // Create security connector and construct new channel args.
+ grpc_channel_security_connector *security_connector;
+ grpc_channel_args *new_args_from_connector;
if (grpc_channel_credentials_create_security_connector(
creds, target, args, &security_connector, &new_args_from_connector) !=
GRPC_SECURITY_OK) {
@@ -334,35 +326,30 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
return grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL, "Failed to create security connector.");
}
-
- connector_arg = grpc_security_connector_to_arg(&security_connector->base);
- args_copy = grpc_channel_args_copy_and_add(
+ grpc_arg connector_arg =
+ grpc_security_connector_to_arg(&security_connector->base);
+ grpc_channel_args *new_args = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1);
-
- f = gpr_malloc(sizeof(*f));
- memset(f, 0, sizeof(*f));
- f->base.vtable = &client_channel_factory_vtable;
- gpr_ref_init(&f->refs, 1);
-
- f->merge_args = grpc_channel_args_copy(args_copy);
- grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) {
grpc_channel_args_destroy(new_args_from_connector);
}
-
+ // Create client channel factory.
+ client_channel_factory *f = gpr_malloc(sizeof(*f));
+ memset(f, 0, sizeof(*f));
+ f->base.vtable = &client_channel_factory_vtable;
+ gpr_ref_init(&f->refs, 1);
GRPC_SECURITY_CONNECTOR_REF(&security_connector->base,
"grpc_secure_channel_create");
f->security_connector = security_connector;
-
+ // Create channel.
grpc_channel *channel = client_channel_factory_create_channel(
- &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
-
+ &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
+ // Clean up.
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"client_channel_factory_create_channel");
-
+ grpc_channel_args_destroy(new_args);
grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
-
return channel; /* may be NULL */
}
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 2957d2c818..cfc072c0b5 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -66,22 +66,59 @@ static grpc_arg copy_arg(const grpc_arg *src) {
grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
const grpc_arg *to_add,
size_t num_to_add) {
+ return grpc_channel_args_copy_and_add_and_remove(src, NULL, 0, to_add,
+ num_to_add);
+}
+
+grpc_channel_args *grpc_channel_args_copy_and_remove(
+ const grpc_channel_args *src, const char **to_remove,
+ size_t num_to_remove) {
+ return grpc_channel_args_copy_and_add_and_remove(src, to_remove,
+ num_to_remove, NULL, 0);
+}
+
+static bool should_remove_arg(const grpc_arg *arg, const char **to_remove,
+ size_t num_to_remove) {
+ for (size_t i = 0; i < num_to_remove; ++i) {
+ if (strcmp(arg->key, to_remove[i]) == 0) return true;
+ }
+ return false;
+}
+
+grpc_channel_args *grpc_channel_args_copy_and_add_and_remove(
+ const grpc_channel_args *src, const char **to_remove, size_t num_to_remove,
+ const grpc_arg *to_add, size_t num_to_add) {
+ // Figure out how many args we'll be copying.
+ size_t num_args_to_copy = 0;
+ if (src != NULL) {
+ for (size_t i = 0; i < src->num_args; ++i) {
+ if (!should_remove_arg(&src->args[i], to_remove, num_to_remove)) {
+ ++num_args_to_copy;
+ }
+ }
+ }
+ // Create result.
grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args));
- size_t i;
- size_t src_num_args = (src == NULL) ? 0 : src->num_args;
- if (!src && !to_add) {
- dst->num_args = 0;
+ dst->num_args = num_args_to_copy + num_to_add;
+ if (dst->num_args == 0) {
dst->args = NULL;
return dst;
}
- dst->num_args = src_num_args + num_to_add;
dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args);
- for (i = 0; i < src_num_args; i++) {
- dst->args[i] = copy_arg(&src->args[i]);
+ // Copy args from src that are not being removed.
+ size_t dst_idx = 0;
+ if (src != NULL) {
+ for (size_t i = 0; i < src->num_args; ++i) {
+ if (!should_remove_arg(&src->args[i], to_remove, num_to_remove)) {
+ dst->args[dst_idx++] = copy_arg(&src->args[i]);
+ }
+ }
}
- for (i = 0; i < num_to_add; i++) {
- dst->args[i + src_num_args] = copy_arg(&to_add[i]);
+ // Add args from to_add.
+ for (size_t i = 0; i < num_to_add; ++i) {
+ dst->args[dst_idx++] = copy_arg(&to_add[i]);
}
+ GPR_ASSERT(dst_idx == dst->num_args);
return dst;
}
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index a80340c0fa..1e05303471 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -51,6 +51,17 @@ grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
const grpc_arg *to_add,
size_t num_to_add);
+/** Copies the arguments in \a src except for those whose keys are in
+ \a to_remove. */
+grpc_channel_args *grpc_channel_args_copy_and_remove(
+ const grpc_channel_args *src, const char **to_remove, size_t num_to_remove);
+
+/** Copies the arguments from \a src except for those whose keys are in
+ \a to_remove and appends the arguments in \a to_add. */
+grpc_channel_args *grpc_channel_args_copy_and_add_and_remove(
+ const grpc_channel_args *src, const char **to_remove, size_t num_to_remove,
+ const grpc_arg *to_add, size_t num_to_add);
+
/** Concatenate args from \a a and \a b into a new instance */
grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
const grpc_channel_args *b);
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index b8b2546035..7dc5ae0df1 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -38,8 +38,8 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_channel/method_config.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/transport/method_config.h"
#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited.
// The protobuf library will (by default) start warning at 100 megs.
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 8381f4a63a..db51ec4939 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -1711,6 +1711,12 @@ retry:
"pollset_add_fd: Raced creating new polling island. pi_new: %p "
"(fd: %d, pollset: %p)",
(void *)pi_new, fd->fd, (void *)pollset);
+
+ /* No need to lock 'pi_new' here since this is a new polling island and
+ * no one has a reference to it yet */
+ polling_island_remove_all_fds_locked(pi_new, true, &error);
+
+ /* Ref and unref so that the polling island gets deleted during unref */
PI_ADD_REF(pi_new, "dance_of_destruction");
PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
goto retry;
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 05d6eeb240..3860fe3e9b 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -315,6 +315,9 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) {
gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp);
}
+ /* Disable Nagle's Algorithm */
+ uv_tcp_nodelay(handle, 1);
+
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->handle = handle;
diff --git a/src/core/ext/client_channel/method_config.c b/src/core/lib/transport/method_config.c
index 4313ad5e0e..57d97700bf 100644
--- a/src/core/ext/client_channel/method_config.c
+++ b/src/core/lib/transport/method_config.c
@@ -29,7 +29,7 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-#include "src/core/ext/client_channel/method_config.h"
+#include "src/core/lib/transport/method_config.h"
#include <string.h>
diff --git a/src/core/ext/client_channel/method_config.h b/src/core/lib/transport/method_config.h
index 4cbeee5625..58fedd9436 100644
--- a/src/core/ext/client_channel/method_config.h
+++ b/src/core/lib/transport/method_config.h
@@ -29,8 +29,8 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
-#ifndef GRPC_CORE_EXT_CLIENT_CHANNEL_METHOD_CONFIG_H
-#define GRPC_CORE_EXT_CLIENT_CHANNEL_METHOD_CONFIG_H
+#ifndef GRPC_CORE_LIB_TRANSPORT_METHOD_CONFIG_H
+#define GRPC_CORE_LIB_TRANSPORT_METHOD_CONFIG_H
#include <stdbool.h>
@@ -133,4 +133,4 @@ grpc_mdstr_hash_table* grpc_method_config_table_convert(
void* (*convert_value)(const grpc_method_config* method_config),
const grpc_mdstr_hash_table_vtable* vtable);
-#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_METHOD_CONFIG_H */
+#endif /* GRPC_CORE_LIB_TRANSPORT_METHOD_CONFIG_H */
diff --git a/src/node/performance/benchmark_client_express.js b/src/node/performance/benchmark_client_express.js
new file mode 100644
index 0000000000..675eb5f288
--- /dev/null
+++ b/src/node/performance/benchmark_client_express.js
@@ -0,0 +1,291 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Benchmark client module
+ * @module
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+var util = require('util');
+var EventEmitter = require('events');
+var http = require('http');
+var https = require('https');
+
+var async = require('async');
+var _ = require('lodash');
+var PoissonProcess = require('poisson-process');
+var Histogram = require('./histogram');
+
+/**
+ * Convert a time difference, as returned by process.hrtime, to a number of
+ * nanoseconds.
+ * @param {Array.<number>} time_diff The time diff, represented as
+ * [seconds, nanoseconds]
+ * @return {number} The total number of nanoseconds
+ */
+function timeDiffToNanos(time_diff) {
+ return time_diff[0] * 1e9 + time_diff[1];
+}
+
+function BenchmarkClient(server_targets, channels, histogram_params,
+ security_params) {
+ var options = {
+ method: 'PUT',
+ headers: {
+ 'Content-Type': 'application/json'
+ }
+ };
+ var protocol;
+ if (security_params) {
+ var ca_path;
+ protocol = https;
+ this.request = _.bind(https.request, https);
+ if (security_params.use_test_ca) {
+ ca_path = path.join(__dirname, '../test/data/ca.pem');
+ var ca_data = fs.readFileSync(ca_path);
+ options.ca = ca_data;
+ }
+ if (security_params.server_host_override) {
+ var host_override = security_params.server_host_override;
+ options.servername = host_override;
+ }
+ } else {
+ protocol = http;
+ }
+
+ this.request = _.bind(protocol.request, protocol);
+
+ this.client_options = [];
+
+ for (var i = 0; i < channels; i++) {
+ var host_port;
+ host_port = server_targets[i % server_targets.length].split(':')
+ var new_options = _.assign({hostname: host_port[0], port: +host_port[1]}, options);
+ new_options.agent = new protocol.Agent(new_options);
+ this.client_options[i] = new_options;
+ }
+
+ this.histogram = new Histogram(histogram_params.resolution,
+ histogram_params.max_possible);
+
+ this.running = false;
+
+ this.pending_calls = 0;
+}
+
+util.inherits(BenchmarkClient, EventEmitter);
+
+function startAllClients(client_options_list, outstanding_rpcs_per_channel,
+ makeCall, emitter) {
+ _.each(client_options_list, function(client_options) {
+ _.times(outstanding_rpcs_per_channel, function() {
+ makeCall(client_options);
+ });
+ });
+}
+
+BenchmarkClient.prototype.startClosedLoop = function(
+ outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) {
+ var self = this;
+
+ var options = {};
+
+ self.running = true;
+
+ if (rpc_type == 'UNARY') {
+ options.path = '/serviceProto.BenchmarkService.service/unaryCall';
+ } else {
+ self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
+ }
+
+ if (generic) {
+ self.emit('error', new Error('Generic client not supported'));
+ }
+
+ self.last_wall_time = process.hrtime();
+
+ var argument = {
+ response_size: resp_size,
+ payload: {
+ body: '0'.repeat(req_size)
+ }
+ };
+
+ function makeCall(client_options) {
+ if (self.running) {
+ self.pending_calls++;
+ var start_time = process.hrtime();
+ var req = self.request(client_options, function(res) {
+ var res_data = '';
+ res.on('data', function(data) {
+ res_data += data;
+ });
+ res.on('end', function() {
+ JSON.parse(res_data);
+ var time_diff = process.hrtime(start_time);
+ self.histogram.add(timeDiffToNanos(time_diff));
+ makeCall(client_options);
+ self.pending_calls--;
+ if ((!self.running) && self.pending_calls == 0) {
+ self.emit('finished');
+ }
+ });
+ });
+ req.write(JSON.stringify(argument));
+ req.end();
+ req.on('error', function(error) {
+ self.emit('error', new Error('Client error: ' + error.message));
+ self.running = false;
+ });
+ }
+ }
+
+ startAllClients(_.map(self.client_options, _.partial(_.assign, options)),
+ outstanding_rpcs_per_channel, makeCall, self);
+};
+
+BenchmarkClient.prototype.startPoisson = function(
+ outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load,
+ generic) {
+ var self = this;
+
+ var options = {};
+
+ self.running = true;
+
+ if (rpc_type == 'UNARY') {
+ options.path = '/serviceProto.BenchmarkService.service/unaryCall';
+ } else {
+ self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
+ }
+
+ if (generic) {
+ self.emit('error', new Error('Generic client not supported'));
+ }
+
+ self.last_wall_time = process.hrtime();
+
+ var argument = {
+ response_size: resp_size,
+ payload: {
+ body: '0'.repeat(req_size)
+ }
+ };
+
+ function makeCall(client_options, poisson) {
+ if (self.running) {
+ self.pending_calls++;
+ var start_time = process.hrtime();
+ var req = self.request(client_options, function(res) {
+ var res_data = '';
+ res.on('data', function(data) {
+ res_data += data;
+ });
+ res.on('end', function() {
+ JSON.parse(res_data);
+ var time_diff = process.hrtime(start_time);
+ self.histogram.add(timeDiffToNanos(time_diff));
+ self.pending_calls--;
+ if ((!self.running) && self.pending_calls == 0) {
+ self.emit('finished');
+ }
+ });
+ });
+ req.write(JSON.stringify(argument));
+ req.end();
+ req.on('error', function(error) {
+ self.emit('error', new Error('Client error: ' + error.message));
+ self.running = false;
+ });
+ } else {
+ poisson.stop();
+ }
+ }
+
+ var averageIntervalMs = (1 / offered_load) * 1000;
+
+ startAllClients(_.map(self.client_options, _.partial(_.assign, options)),
+ outstanding_rpcs_per_channel, function(opts){
+ var p = PoissonProcess.create(averageIntervalMs, function() {
+ makeCall(opts, p);
+ });
+ p.start();
+ }, self);
+};
+
+/**
+ * Return curent statistics for the client. If reset is set, restart
+ * statistic collection.
+ * @param {boolean} reset Indicates that statistics should be reset
+ * @return {object} Client statistics
+ */
+BenchmarkClient.prototype.mark = function(reset) {
+ var wall_time_diff = process.hrtime(this.last_wall_time);
+ var histogram = this.histogram;
+ if (reset) {
+ this.last_wall_time = process.hrtime();
+ this.histogram = new Histogram(histogram.resolution,
+ histogram.max_possible);
+ }
+
+ return {
+ latencies: {
+ bucket: histogram.getContents(),
+ min_seen: histogram.minimum(),
+ max_seen: histogram.maximum(),
+ sum: histogram.getSum(),
+ sum_of_squares: histogram.sumOfSquares(),
+ count: histogram.getCount()
+ },
+ time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
+ // Not sure how to measure these values
+ time_user: 0,
+ time_system: 0
+ };
+};
+
+/**
+ * Stop the clients.
+ * @param {function} callback Called when the clients have finished shutting
+ * down
+ */
+BenchmarkClient.prototype.stop = function(callback) {
+ this.running = false;
+ this.on('finished', callback);
+};
+
+module.exports = BenchmarkClient;
diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js
index 70cee9979b..6abde2e17a 100644
--- a/src/node/performance/benchmark_server.js
+++ b/src/node/performance/benchmark_server.js
@@ -40,6 +40,8 @@
var fs = require('fs');
var path = require('path');
+var EventEmitter = require('events');
+var util = require('util');
var genericService = require('./generic_service');
@@ -138,12 +140,15 @@ function BenchmarkServer(host, port, tls, generic, response_size) {
this.server = server;
}
+util.inherits(BenchmarkServer, EventEmitter);
+
/**
* Start the benchmark server.
*/
BenchmarkServer.prototype.start = function() {
this.server.start();
this.last_wall_time = process.hrtime();
+ this.emit('started');
};
/**
diff --git a/src/node/performance/benchmark_server_express.js b/src/node/performance/benchmark_server_express.js
new file mode 100644
index 0000000000..065bcf660b
--- /dev/null
+++ b/src/node/performance/benchmark_server_express.js
@@ -0,0 +1,109 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/**
+ * Benchmark server module
+ * @module
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+var http = require('http');
+var https = require('https');
+var EventEmitter = require('events');
+var util = require('util');
+
+var express = require('express');
+var bodyParser = require('body-parser')
+
+function unaryCall(req, res) {
+ var reqObj = req.body;
+ var payload = {body: '0'.repeat(reqObj.response_size)};
+ res.json(payload);
+}
+
+function BenchmarkServer(host, port, tls, generic, response_size) {
+ var app = express();
+ app.use(bodyParser.json())
+ app.put('/serviceProto.BenchmarkService.service/unaryCall', unaryCall);
+ this.input_host = host;
+ this.input_port = port;
+ if (tls) {
+ var credentials = {};
+ var key_path = path.join(__dirname, '../test/data/server1.key');
+ var pem_path = path.join(__dirname, '../test/data/server1.pem');
+
+ var key_data = fs.readFileSync(key_path);
+ var pem_data = fs.readFileSync(pem_path);
+ credentials['key'] = key_data;
+ credentials['cert'] = pem_data;
+ this.server = https.createServer(credentials, app);
+ } else {
+ this.server = http.createServer(app);
+ }
+}
+
+util.inherits(BenchmarkServer, EventEmitter);
+
+BenchmarkServer.prototype.start = function() {
+ var self = this;
+ this.server.listen(this.input_port, this.input_hostname, function() {
+ self.last_wall_time = process.hrtime();
+ self.emit('started');
+ });
+};
+
+BenchmarkServer.prototype.getPort = function() {
+ return this.server.address().port;
+};
+
+BenchmarkServer.prototype.mark = function(reset) {
+ var wall_time_diff = process.hrtime(this.last_wall_time);
+ if (reset) {
+ this.last_wall_time = process.hrtime();
+ }
+ return {
+ time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
+ // Not sure how to measure these values
+ time_user: 0,
+ time_system: 0
+ };
+};
+
+BenchmarkServer.prototype.stop = function(callback) {
+ this.server.close(callback);
+};
+
+module.exports = BenchmarkServer;
diff --git a/src/node/performance/worker.js b/src/node/performance/worker.js
index 7ef9b84fe7..030bf7d7ba 100644
--- a/src/node/performance/worker.js
+++ b/src/node/performance/worker.js
@@ -34,18 +34,18 @@
'use strict';
var console = require('console');
-var worker_service_impl = require('./worker_service_impl');
+var WorkerServiceImpl = require('./worker_service_impl');
var grpc = require('../../../');
var serviceProto = grpc.load({
root: __dirname + '/../../..',
file: 'src/proto/grpc/testing/services.proto'}).grpc.testing;
-function runServer(port) {
+function runServer(port, benchmark_impl) {
var server_creds = grpc.ServerCredentials.createInsecure();
var server = new grpc.Server();
server.addProtoService(serviceProto.WorkerService.service,
- worker_service_impl);
+ new WorkerServiceImpl(benchmark_impl, server));
var address = '0.0.0.0:' + port;
server.bind(address, server_creds);
server.start();
@@ -57,9 +57,9 @@ if (require.main === module) {
Error.stackTraceLimit = Infinity;
var parseArgs = require('minimist');
var argv = parseArgs(process.argv, {
- string: ['driver_port']
+ string: ['driver_port', 'benchmark_impl']
});
- runServer(argv.driver_port);
+ runServer(argv.driver_port, argv.benchmark_impl);
}
exports.runServer = runServer;
diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js
index 4b5cb8f9c2..3f317f6429 100644
--- a/src/node/performance/worker_service_impl.js
+++ b/src/node/performance/worker_service_impl.js
@@ -38,121 +38,141 @@ var console = require('console');
var BenchmarkClient = require('./benchmark_client');
var BenchmarkServer = require('./benchmark_server');
-exports.quitWorker = function quitWorker(call, callback) {
- callback(null, {});
- process.exit(0);
-}
+module.exports = function WorkerServiceImpl(benchmark_impl, server) {
+ var BenchmarkClient;
+ var BenchmarkServer;
+ switch (benchmark_impl) {
+ case 'grpc':
+ BenchmarkClient = require('./benchmark_client');
+ BenchmarkServer = require('./benchmark_server');
+ break;
+ case 'express':
+ BenchmarkClient = require('./benchmark_client_express');
+ BenchmarkServer = require('./benchmark_server_express');
+ break;
+ default:
+ throw new Error('Unrecognized benchmark impl: ' + benchmark_impl);
+ }
-exports.runClient = function runClient(call) {
- var client;
- call.on('data', function(request) {
- var stats;
- switch (request.argtype) {
- case 'setup':
- var setup = request.setup;
- console.log('ClientConfig %j', setup);
- client = new BenchmarkClient(setup.server_targets,
- setup.client_channels,
- setup.histogram_params,
- setup.security_params);
- client.on('error', function(error) {
- call.emit('error', error);
- });
- var req_size, resp_size, generic;
- switch (setup.payload_config.payload) {
- case 'bytebuf_params':
- req_size = setup.payload_config.bytebuf_params.req_size;
- resp_size = setup.payload_config.bytebuf_params.resp_size;
- generic = true;
+ this.quitWorker = function quitWorker(call, callback) {
+ server.tryShutdown(function() {
+ callback(null, {});
+ });
+ };
+
+ this.runClient = function runClient(call) {
+ var client;
+ call.on('data', function(request) {
+ var stats;
+ switch (request.argtype) {
+ case 'setup':
+ var setup = request.setup;
+ console.log('ClientConfig %j', setup);
+ client = new BenchmarkClient(setup.server_targets,
+ setup.client_channels,
+ setup.histogram_params,
+ setup.security_params);
+ client.on('error', function(error) {
+ call.emit('error', error);
+ });
+ var req_size, resp_size, generic;
+ switch (setup.payload_config.payload) {
+ case 'bytebuf_params':
+ req_size = setup.payload_config.bytebuf_params.req_size;
+ resp_size = setup.payload_config.bytebuf_params.resp_size;
+ generic = true;
+ break;
+ case 'simple_params':
+ req_size = setup.payload_config.simple_params.req_size;
+ resp_size = setup.payload_config.simple_params.resp_size;
+ generic = false;
+ break;
+ default:
+ call.emit('error', new Error('Unsupported PayloadConfig type' +
+ setup.payload_config.payload));
+ }
+ switch (setup.load_params.load) {
+ case 'closed_loop':
+ client.startClosedLoop(setup.outstanding_rpcs_per_channel,
+ setup.rpc_type, req_size, resp_size, generic);
+ break;
+ case 'poisson':
+ client.startPoisson(setup.outstanding_rpcs_per_channel,
+ setup.rpc_type, req_size, resp_size,
+ setup.load_params.poisson.offered_load, generic);
+ break;
+ default:
+ call.emit('error', new Error('Unsupported LoadParams type' +
+ setup.load_params.load));
+ }
+ stats = client.mark();
+ call.write({
+ stats: stats
+ });
break;
- case 'simple_params':
- req_size = setup.payload_config.simple_params.req_size;
- resp_size = setup.payload_config.simple_params.resp_size;
- generic = false;
+ case 'mark':
+ if (client) {
+ stats = client.mark(request.mark.reset);
+ call.write({
+ stats: stats
+ });
+ } else {
+ call.emit('error', new Error('Got Mark before ClientConfig'));
+ }
break;
default:
- call.emit('error', new Error('Unsupported PayloadConfig type' +
- setup.payload_config.payload));
+ throw new Error('Nonexistent client argtype option: ' + request.argtype);
}
- switch (setup.load_params.load) {
- case 'closed_loop':
- client.startClosedLoop(setup.outstanding_rpcs_per_channel,
- setup.rpc_type, req_size, resp_size, generic);
+ });
+ call.on('end', function() {
+ client.stop(function() {
+ call.end();
+ });
+ });
+ };
+
+ this.runServer = function runServer(call) {
+ var server;
+ call.on('data', function(request) {
+ var stats;
+ switch (request.argtype) {
+ case 'setup':
+ console.log('ServerConfig %j', request.setup);
+ server = new BenchmarkServer('[::]', request.setup.port,
+ request.setup.security_params);
+ server.on('started', function() {
+ stats = server.mark();
+ call.write({
+ stats: stats,
+ port: server.getPort()
+ });
+ });
+ server.start();
break;
- case 'poisson':
- client.startPoisson(setup.outstanding_rpcs_per_channel,
- setup.rpc_type, req_size, resp_size,
- setup.load_params.poisson.offered_load, generic);
+ case 'mark':
+ if (server) {
+ stats = server.mark(request.mark.reset);
+ call.write({
+ stats: stats,
+ port: server.getPort(),
+ cores: 1
+ });
+ } else {
+ call.emit('error', new Error('Got Mark before ServerConfig'));
+ }
break;
default:
- call.emit('error', new Error('Unsupported LoadParams type' +
- setup.load_params.load));
+ throw new Error('Nonexistent server argtype option');
}
- stats = client.mark();
- call.write({
- stats: stats
- });
- break;
- case 'mark':
- if (client) {
- stats = client.mark(request.mark.reset);
- call.write({
- stats: stats
- });
- } else {
- call.emit('error', new Error('Got Mark before ClientConfig'));
- }
- break;
- default:
- throw new Error('Nonexistent client argtype option: ' + request.argtype);
- }
- });
- call.on('end', function() {
- client.stop(function() {
- call.end();
});
- });
-};
-
-exports.runServer = function runServer(call) {
- var server;
- call.on('data', function(request) {
- var stats;
- switch (request.argtype) {
- case 'setup':
- console.log('ServerConfig %j', request.setup);
- server = new BenchmarkServer('[::]', request.setup.port,
- request.setup.security_params);
- server.start();
- stats = server.mark();
- call.write({
- stats: stats,
- port: server.getPort()
+ call.on('end', function() {
+ server.stop(function() {
+ call.end();
});
- break;
- case 'mark':
- if (server) {
- stats = server.mark(request.mark.reset);
- call.write({
- stats: stats,
- port: server.getPort(),
- cores: 1
- });
- } else {
- call.emit('error', new Error('Got Mark before ServerConfig'));
- }
- break;
- default:
- throw new Error('Nonexistent server argtype option');
- }
- });
- call.on('end', function() {
- server.stop(function() {
- call.end();
});
- });
-};
+ };
-exports.coreCount = function coreCount(call, callback) {
- callback(null, {cores: os.cpus().length});
+ this.coreCount = function coreCount(call, callback) {
+ callback(null, {cores: os.cpus().length});
+ };
};
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 0ffb0e351c..7ffe0f558a 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -179,6 +179,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/transport/mdstr_hash_table.c',
'src/core/lib/transport/metadata.c',
'src/core/lib/transport/metadata_batch.c',
+ 'src/core/lib/transport/method_config.c',
'src/core/lib/transport/static_metadata.c',
'src/core/lib/transport/timeout_encoding.c',
'src/core/lib/transport/transport.c',
@@ -245,12 +246,10 @@ CORE_SOURCE_FILES = [
'src/core/ext/client_channel/lb_policy.c',
'src/core/ext/client_channel/lb_policy_factory.c',
'src/core/ext/client_channel/lb_policy_registry.c',
- 'src/core/ext/client_channel/method_config.c',
'src/core/ext/client_channel/parse_address.c',
'src/core/ext/client_channel/resolver.c',
'src/core/ext/client_channel/resolver_factory.c',
'src/core/ext/client_channel/resolver_registry.c',
- 'src/core/ext/client_channel/resolver_result.c',
'src/core/ext/client_channel/subchannel.c',
'src/core/ext/client_channel/subchannel_index.c',
'src/core/ext/client_channel/uri_parser.c',
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index dfc2644c46..f5c426ebfc 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -43,7 +43,8 @@ class Struct
GRPC.logger.debug("Failing with status #{status}")
# raise BadStatus, propagating the metadata if present.
md = status.metadata
- fail GRPC::BadStatus.new(status.code, status.details, md)
+ fail GRPC::BadStatus.new(status.code, status.details, md),
+ "status code: #{status.code}, details: #{status.details}"
end
status
end
@@ -156,41 +157,25 @@ module GRPC
Operation.new(self)
end
- # writes_done indicates that all writes are completed.
- #
- # It blocks until the remote endpoint acknowledges with at status unless
- # assert_finished is set to false. Any calls to #remote_send after this
- # call will fail.
- #
- # @param assert_finished [true, false] when true(default), waits for
- # FINISHED.
- def writes_done(assert_finished = true)
- ops = {
- SEND_CLOSE_FROM_CLIENT => nil
- }
- ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
- batch_result = @call.run_batch(ops)
- return unless assert_finished
- unless batch_result.status.nil?
- @call.trailing_metadata = batch_result.status.metadata
- end
- @call.status = batch_result.status
- op_is_done
- batch_result.check_status
- end
-
# finished waits until a client call is completed.
#
# It blocks until the remote endpoint acknowledges by sending a status.
def finished
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
- unless batch_result.status.nil?
- @call.trailing_metadata = batch_result.status.metadata
+ attach_status_results_and_complete_call(batch_result)
+ end
+
+ def attach_status_results_and_complete_call(recv_status_batch_result)
+ unless recv_status_batch_result.status.nil?
+ @call.trailing_metadata = recv_status_batch_result.status.metadata
end
- @call.status = batch_result.status
- op_is_done
- batch_result.check_status
+ @call.status = recv_status_batch_result.status
@call.close
+ op_is_done
+
+ # The RECV_STATUS in run_batch always succeeds
+ # Check the status for a bad status or failed run batch
+ recv_status_batch_result.check_status
end
# remote_send sends a request to the remote endpoint.
@@ -226,6 +211,23 @@ module GRPC
nil
end
+ def server_unary_response(req, trailing_metadata: {},
+ code: Core::StatusCodes::OK, details: 'OK')
+ ops = {}
+ @send_initial_md_mutex.synchronize do
+ ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent
+ @metadata_sent = true
+ end
+
+ payload = @marshal.call(req)
+ ops[SEND_MESSAGE] = payload
+ ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new(
+ code, details, trailing_metadata)
+ ops[RECV_CLOSE_ON_SERVER] = nil
+
+ @call.run_batch(ops)
+ end
+
# remote_read reads a response from the remote endpoint.
#
# It blocks until the remote endpoint replies with a message or status.
@@ -240,9 +242,13 @@ module GRPC
@call.metadata = batch_result.metadata
@metadata_received = true
end
- unless batch_result.nil? || batch_result.message.nil?
- res = @unmarshal.call(batch_result.message)
- return res
+ get_message_from_batch_result(batch_result)
+ end
+
+ def get_message_from_batch_result(recv_message_batch_result)
+ unless recv_message_batch_result.nil? ||
+ recv_message_batch_result.message.nil?
+ return @unmarshal.call(recv_message_batch_result.message)
end
GRPC.logger.debug('found nil; the final response has been sent')
nil
@@ -298,7 +304,6 @@ module GRPC
return enum_for(:each_remote_read_then_finish) unless block_given?
loop do
resp = remote_read
- break if resp.is_a? Struct::Status # is an OK status
if resp.nil? # the last response was received, but not finished yet
finished
break
@@ -315,15 +320,25 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def request_response(req, metadata: {})
- merge_metadata_to_send(metadata) && send_initial_metadata
- remote_send(req)
- writes_done(false)
- response = remote_read
- finished unless response.is_a? Struct::Status
- response
- rescue GRPC::Core::CallError => e
- finished # checks for Cancelled
- raise e
+ ops = {
+ SEND_MESSAGE => @marshal.call(req),
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_INITIAL_METADATA => nil,
+ RECV_MESSAGE => nil,
+ RECV_STATUS_ON_CLIENT => nil
+ }
+ @send_initial_md_mutex.synchronize do
+ # Metadata might have already been sent if this is an operation view
+ unless @metadata_sent
+ ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
+ end
+ @metadata_sent = true
+ end
+ batch_result = @call.run_batch(ops)
+
+ @call.metadata = batch_result.metadata
+ attach_status_results_and_complete_call(batch_result)
+ get_message_from_batch_result(batch_result)
end
# client_streamer sends a stream of requests to a GRPC server, and
@@ -339,12 +354,20 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def client_streamer(requests, metadata: {})
- merge_metadata_to_send(metadata) && send_initial_metadata
- requests.each { |r| remote_send(r) }
- writes_done(false)
- response = remote_read
- finished unless response.is_a? Struct::Status
- response
+ # Metadata might have already been sent if this is an operation view
+ merge_metadata_and_send_if_not_already_sent(metadata)
+
+ requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
+ batch_result = @call.run_batch(
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_INITIAL_METADATA => nil,
+ RECV_MESSAGE => nil,
+ RECV_STATUS_ON_CLIENT => nil
+ )
+
+ @call.metadata = batch_result.metadata
+ attach_status_results_and_complete_call(batch_result)
+ get_message_from_batch_result(batch_result)
rescue GRPC::Core::CallError => e
finished # checks for Cancelled
raise e
@@ -365,9 +388,18 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, metadata: {})
- merge_metadata_to_send(metadata) && send_initial_metadata
- remote_send(req)
- writes_done(false)
+ ops = {
+ SEND_MESSAGE => @marshal.call(req),
+ SEND_CLOSE_FROM_CLIENT => nil
+ }
+ @send_initial_md_mutex.synchronize do
+ # Metadata might have already been sent if this is an operation view
+ unless @metadata_sent
+ ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
+ end
+ @metadata_sent = true
+ end
+ @call.run_batch(ops)
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
@@ -404,7 +436,8 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk)
- merge_metadata_to_send(metadata) && send_initial_metadata
+ # Metadata might have already been sent if this is an operation view
+ merge_metadata_and_send_if_not_already_sent(metadata)
bd = BidiCall.new(@call,
@marshal,
@unmarshal,
@@ -457,6 +490,15 @@ module GRPC
end
end
+ def merge_metadata_and_send_if_not_already_sent(new_metadata = {})
+ @send_initial_md_mutex.synchronize do
+ return if @metadata_sent
+ @metadata_to_send.merge!(new_metadata)
+ @call.run_batch(SEND_INITIAL_METADATA => @metadata_to_send)
+ @metadata_sent = true
+ end
+ end
+
private
# Starts the call if not already started
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index 0d7c1f7805..6934257cbc 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -168,6 +168,7 @@ module GRPC
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #request_response.
+ c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
c.request_response(req, metadata: metadata)
@@ -231,9 +232,10 @@ module GRPC
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #client_streamer.
+ c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
- c.client_streamer(requests, metadata: metadata)
+ c.client_streamer(requests)
end
op
end
@@ -309,9 +311,10 @@ module GRPC
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #server_streamer
+ c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
- c.server_streamer(req, metadata: metadata, &blk)
+ c.server_streamer(req, &blk)
end
op
end
@@ -417,15 +420,15 @@ module GRPC
deadline: deadline,
parent: parent,
credentials: credentials)
-
return c.bidi_streamer(requests, metadata: metadata,
&blk) unless return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #bidi_streamer
+ c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
- c.bidi_streamer(requests, metadata: metadata, &blk)
+ c.bidi_streamer(requests, &blk)
end
op
end
@@ -445,7 +448,6 @@ module GRPC
deadline: nil,
parent: nil,
credentials: nil)
-
deadline = from_relative_time(@timeout) if deadline.nil?
# Provide each new client call with its own completion queue
call = @ch.create_call(parent, # parent call
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 584fe78169..cd17aed8e7 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -62,25 +62,44 @@ module GRPC
proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
end
+ def handle_request_response(active_call, mth)
+ req = active_call.remote_read
+ resp = mth.call(req, active_call.single_req_view)
+ active_call.server_unary_response(
+ resp, trailing_metadata: active_call.output_metadata)
+ end
+
+ def handle_client_streamer(active_call, mth)
+ resp = mth.call(active_call.multi_req_view)
+ active_call.server_unary_response(
+ resp, trailing_metadata: active_call.output_metadata)
+ end
+
+ def handle_server_streamer(active_call, mth)
+ req = active_call.remote_read
+ replys = mth.call(req, active_call.single_req_view)
+ replys.each { |r| active_call.remote_send(r) }
+ send_status(active_call, OK, 'OK', active_call.output_metadata)
+ end
+
+ def handle_bidi_streamer(active_call, mth)
+ active_call.run_server_bidi(mth)
+ send_status(active_call, OK, 'OK', active_call.output_metadata)
+ end
+
def run_server_method(active_call, mth)
# While a server method is running, it might be cancelled, its deadline
# might be reached, the handler could throw an unknown error, or a
# well-behaved handler could throw a StatusError.
if request_response?
- req = active_call.remote_read
- resp = mth.call(req, active_call.single_req_view)
- active_call.remote_send(resp)
+ handle_request_response(active_call, mth)
elsif client_streamer?
- resp = mth.call(active_call.multi_req_view)
- active_call.remote_send(resp)
+ handle_client_streamer(active_call, mth)
elsif server_streamer?
- req = active_call.remote_read
- replys = mth.call(req, active_call.single_req_view)
- replys.each { |r| active_call.remote_send(r) }
+ handle_server_streamer(active_call, mth)
else # is a bidi_stream
- active_call.run_server_bidi(mth)
+ handle_bidi_streamer(active_call, mth)
end
- send_status(active_call, OK, 'OK', active_call.output_metadata)
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application error
# code and detail message and some additional app-specific metadata.
@@ -91,7 +110,7 @@ module GRPC
# Log it, but don't notify the other endpoint..
GRPC.logger.warn("failed call: #{active_call}\n#{e}")
rescue Core::OutOfTime
- # This is raised when active_call#method.call exceeeds the deadline
+ # This is raised when active_call#method.call exceeds the deadline
# event. Send a status of deadline exceeded
GRPC.logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
@@ -100,7 +119,7 @@ module GRPC
# Send back a UNKNOWN status to the client
GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
GRPC.logger.warn(e)
- send_status(active_call, UNKNOWN, 'no reason given')
+ send_status(active_call, UNKNOWN, 'unkown error handling call on server')
end
def assert_arity_matches(mth)
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 5ae4f25537..aa51d9d7b1 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -402,7 +402,7 @@ describe GRPC::ActiveCall do
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
- client_call.writes_done(false)
+ call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response')
server_call.send_status(OK, 'OK')
@@ -460,7 +460,7 @@ describe GRPC::ActiveCall do
msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
- client_call.writes_done(false)
+ call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
server_call = expect_server_to_receive(msg)
e = client_call.each_remote_read
n = 3 # arbitrary value > 1
@@ -473,7 +473,7 @@ describe GRPC::ActiveCall do
end
end
- describe '#writes_done' do
+ describe '#closing the call from the client' do
it 'finishes ok if the server sends a status response' do
call = make_test_call
ActiveCall.client_invoke(call)
@@ -481,7 +481,9 @@ describe GRPC::ActiveCall do
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
- expect { client_call.writes_done(false) }.to_not raise_error
+ expect do
+ call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
+ end.to_not raise_error
server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('server_response')
@@ -500,11 +502,13 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response')
server_call.send_status(OK, 'status code is OK')
expect(client_call.remote_read).to eq('server_response')
- expect { client_call.writes_done(false) }.to_not raise_error
+ expect do
+ call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
+ end.to_not raise_error
expect { client_call.finished }.to_not raise_error
end
- it 'finishes ok if writes_done is true' do
+ it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
@@ -515,7 +519,11 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response')
server_call.send_status(OK, 'status code is OK')
expect(client_call.remote_read).to eq('server_response')
- expect { client_call.writes_done(true) }.to_not raise_error
+ expect do
+ call.run_batch(
+ CallOps::SEND_CLOSE_FROM_CLIENT => nil,
+ CallOps::RECV_STATUS_ON_CLIENT => nil)
+ end.to_not raise_error
end
end
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 6034b5419c..e68b8db7ab 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -180,30 +180,44 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
- def get_response(stub)
+ def get_response(stub, run_start_call_first: false)
op = stub.request_response(@method, @sent_msg, noop, noop,
return_op: true,
metadata: { k1: 'v1', k2: 'v2' },
deadline: from_relative_time(2))
expect(op).to be_a(GRPC::ActiveCall::Operation)
- op.execute
+ op.start_call if run_start_call_first
+ result = op.execute
+ op.wait # make sure wait doesn't hang
+ result
end
it_behaves_like 'request response'
- end
- end
- describe '#client_streamer' do
- shared_examples 'client streaming' do
- before(:each) do
+ it 'sends metadata to the server ok when running start_call first' do
server_port = create_test_server
host = "localhost:#{server_port}"
- @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- @metadata = { k1: 'v1', k2: 'v2' }
- @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
- @resp = 'a_reply'
+ th = run_request_response(@sent_msg, @resp, @pass,
+ k1: 'v1', k2: 'v2')
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ expect(get_response(stub)).to eq(@resp)
+ th.join
end
+ end
+ end
+
+ describe '#client_streamer' do
+ before(:each) do
+ Thread.abort_on_exception = true
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ @metadata = { k1: 'v1', k2: 'v2' }
+ @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
+ @resp = 'a_reply'
+ end
+ shared_examples 'client streaming' do
it 'should send requests to/receive a reply from a server' do
th = run_client_streamer(@sent_msgs, @resp, @pass)
expect(get_response(@stub)).to eq(@resp)
@@ -242,24 +256,33 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
- def get_response(stub)
+ def get_response(stub, run_start_call_first: false)
op = stub.client_streamer(@method, @sent_msgs, noop, noop,
return_op: true, metadata: @metadata)
expect(op).to be_a(GRPC::ActiveCall::Operation)
- op.execute
+ op.start_call if run_start_call_first
+ result = op.execute
+ op.wait # make sure wait doesn't hang
+ result
end
it_behaves_like 'client streaming'
+
+ it 'sends metadata to the server ok when running start_call first' do
+ th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
+ expect(get_response(@stub, run_start_call_first: true)).to eq(@resp)
+ th.join
+ end
end
end
describe '#server_streamer' do
- shared_examples 'server streaming' do
- before(:each) do
- @sent_msg = 'a_msg'
- @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
- end
+ before(:each) do
+ @sent_msg = 'a_msg'
+ @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
+ end
+ shared_examples 'server streaming' do
it 'should send a request to/receive replies from a server' do
server_port = create_test_server
host = "localhost:#{server_port}"
@@ -303,29 +326,44 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
- def get_responses(stub)
- op = stub.server_streamer(@method, @sent_msg, noop, noop,
- return_op: true,
- metadata: { k1: 'v1', k2: 'v2' })
- expect(op).to be_a(GRPC::ActiveCall::Operation)
- e = op.execute
+ after(:each) do
+ @op.wait # make sure wait doesn't hang
+ end
+ def get_responses(stub, run_start_call_first: false)
+ @op = stub.server_streamer(@method, @sent_msg, noop, noop,
+ return_op: true,
+ metadata: { k1: 'v1', k2: 'v2' })
+ expect(@op).to be_a(GRPC::ActiveCall::Operation)
+ @op.start_call if run_start_call_first
+ e = @op.execute
expect(e).to be_a(Enumerator)
e
end
it_behaves_like 'server streaming'
+
+ it 'should send metadata to the server ok when start_call is run first' do
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ th = run_server_streamer(@sent_msg, @replys, @fail,
+ k1: 'v1', k2: 'v2')
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ e = get_responses(stub, run_start_call_first: true)
+ expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+ th.join
+ end
end
end
describe '#bidi_streamer' do
- shared_examples 'bidi streaming' do
- before(:each) do
- @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
- @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
- server_port = create_test_server
- @host = "localhost:#{server_port}"
- end
+ before(:each) do
+ @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
+ @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
+ server_port = create_test_server
+ @host = "localhost:#{server_port}"
+ end
+ shared_examples 'bidi streaming' do
it 'supports sending all the requests first', bidi: true do
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
@@ -363,16 +401,29 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
- def get_responses(stub)
- op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
- return_op: true)
- expect(op).to be_a(GRPC::ActiveCall::Operation)
- e = op.execute
+ after(:each) do
+ @op.wait # make sure wait doesn't hang
+ end
+ def get_responses(stub, run_start_call_first: false)
+ @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
+ return_op: true)
+ expect(@op).to be_a(GRPC::ActiveCall::Operation)
+ @op.start_call if run_start_call_first
+ e = @op.execute
expect(e).to be_a(Enumerator)
e
end
it_behaves_like 'bidi streaming'
+
+ it 'can run start_call before executing the call' do
+ th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
+ @pass)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ e = get_responses(stub, run_start_call_first: true)
+ expect(e.collect { |r| r }).to eq(@replys)
+ th.join
+ end
end
end
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 1a895005bc..a3f0efa603 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -48,7 +48,7 @@ describe GRPC::RpcDesc do
@bidi_streamer = RpcDesc.new('ss', Stream.new(Object.new),
Stream.new(Object.new), 'encode', 'decode')
@bs_code = INTERNAL
- @no_reason = 'no reason given'
+ @no_reason = 'unkown error handling call on server'
@ok_response = Object.new
end
@@ -83,6 +83,7 @@ describe GRPC::RpcDesc do
before(:each) do
@call = double('active_call')
allow(@call).to receive(:single_req_view).and_return(@call)
+ allow(@call).to receive(:output_metadata).and_return(@call)
end
it_behaves_like 'it handles errors'
@@ -90,10 +91,10 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req)
- expect(@call).to receive(:remote_send).once.with(@ok_response)
- expect(@call).to receive(:output_metadata).and_return(fake_md)
- expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
- metadata: fake_md)
+ expect(@call).to receive(:output_metadata).once.and_return(fake_md)
+ expect(@call).to receive(:server_unary_response).once
+ .with(@ok_response, trailing_metadata: fake_md)
+
this_desc.run_server_method(@call, method(:fake_reqresp))
end
end
@@ -117,7 +118,9 @@ describe GRPC::RpcDesc do
end
it 'absorbs CallError with no further action' do
- expect(@call).to receive(:remote_send).once.and_raise(CallError)
+ expect(@call).to receive(:server_unary_response).once.and_raise(
+ CallError)
+ allow(@call).to receive(:output_metadata).and_return({})
blk = proc do
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
@@ -125,10 +128,11 @@ describe GRPC::RpcDesc do
end
it 'sends a response and closes the stream if there no errors' do
- expect(@call).to receive(:remote_send).once.with(@ok_response)
- expect(@call).to receive(:output_metadata).and_return(fake_md)
- expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
- metadata: fake_md)
+ expect(@call).to receive(:output_metadata).and_return(
+ fake_md)
+ expect(@call).to receive(:server_unary_response).once
+ .with(@ok_response, trailing_metadata: fake_md)
+
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index d362e48dee..c5694790fd 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -462,6 +462,7 @@ describe GRPC::RpcServer do
'connect_k1' => 'connect_v1'
}
wanted_md.each do |key, value|
+ puts "key: #{key}"
expect(op.metadata[key]).to eq(value)
end
@srv.stop
diff --git a/src/ruby/spec/pb/health/checker_spec.rb b/src/ruby/spec/pb/health/checker_spec.rb
index 1b2fa96827..4711e09e88 100644
--- a/src/ruby/spec/pb/health/checker_spec.rb
+++ b/src/ruby/spec/pb/health/checker_spec.rb
@@ -97,15 +97,17 @@ describe Grpc::Health::Checker do
context 'initialization' do
it 'can be constructed with no args' do
- expect(subject).to_not be(nil)
+ checker = Grpc::Health::Checker.new
+ expect(checker).to_not be(nil)
end
end
context 'method `add_status` and `check`' do
success_tests.each do |t|
it "should succeed when #{t[:desc]}" do
- subject.add_status(t[:service], ServingStatus::NOT_SERVING)
- got = subject.check(HCReq.new(service: t[:service]), nil)
+ checker = Grpc::Health::Checker.new
+ checker.add_status(t[:service], ServingStatus::NOT_SERVING)
+ got = checker.check(HCReq.new(service: t[:service]), nil)
want = HCResp.new(status: ServingStatus::NOT_SERVING)
expect(got).to eq(want)
end
@@ -115,8 +117,9 @@ describe Grpc::Health::Checker do
context 'method `check`' do
success_tests.each do |t|
it "should fail with NOT_FOUND when #{t[:desc]}" do
+ checker = Grpc::Health::Checker.new
blk = proc do
- subject.check(HCReq.new(service: t[:service]), nil)
+ checker.check(HCReq.new(service: t[:service]), nil)
end
expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg
@@ -127,14 +130,15 @@ describe Grpc::Health::Checker do
context 'method `clear_status`' do
success_tests.each do |t|
it "should fail after clearing status when #{t[:desc]}" do
- subject.add_status(t[:service], ServingStatus::NOT_SERVING)
- got = subject.check(HCReq.new(service: t[:service]), nil)
+ checker = Grpc::Health::Checker.new
+ checker.add_status(t[:service], ServingStatus::NOT_SERVING)
+ got = checker.check(HCReq.new(service: t[:service]), nil)
want = HCResp.new(status: ServingStatus::NOT_SERVING)
expect(got).to eq(want)
- subject.clear_status(t[:service])
+ checker.clear_status(t[:service])
blk = proc do
- subject.check(HCReq.new(service: t[:service]), nil)
+ checker.check(HCReq.new(service: t[:service]), nil)
end
expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg
@@ -144,18 +148,19 @@ describe Grpc::Health::Checker do
context 'method `clear_all`' do
it 'should return NOT_FOUND after being invoked' do
+ checker = Grpc::Health::Checker.new
success_tests.each do |t|
- subject.add_status(t[:service], ServingStatus::NOT_SERVING)
- got = subject.check(HCReq.new(service: t[:service]), nil)
+ checker.add_status(t[:service], ServingStatus::NOT_SERVING)
+ got = checker.check(HCReq.new(service: t[:service]), nil)
want = HCResp.new(status: ServingStatus::NOT_SERVING)
expect(got).to eq(want)
end
- subject.clear_all
+ checker.clear_all
success_tests.each do |t|
blk = proc do
- subject.check(HCReq.new(service: t[:service]), nil)
+ checker.check(HCReq.new(service: t[:service]), nil)
end
expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg
@@ -184,8 +189,10 @@ describe Grpc::Health::Checker do
end
it 'should receive the correct status', server: true do
- @srv.handle(subject)
- subject.add_status('', ServingStatus::NOT_SERVING)
+ Thread.abort_on_exception = true
+ checker = Grpc::Health::Checker.new
+ @srv.handle(checker)
+ checker.add_status('', ServingStatus::NOT_SERVING)
t = Thread.new { @srv.run }
@srv.wait_till_running
@@ -198,7 +205,8 @@ describe Grpc::Health::Checker do
end
it 'should fail on unknown services', server: true do
- @srv.handle(subject)
+ checker = Grpc::Health::Checker.new
+ @srv.handle(checker)
t = Thread.new { @srv.run }
@srv.wait_till_running
blk = proc do