aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-09-23 09:32:49 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-09-23 09:32:49 -0700
commit65bcbd386ed4bb9c1f81023f69c63dd666b83ea9 (patch)
tree21ac6b511ae60961f72669596d45aa471cdcbde8
parent82f9bd84e5303c78b4b766655653ddb3e1e669f4 (diff)
parent6ecd8ad519c360d18b32c8b6a2dbb3f6e9e6b2ef (diff)
Merge branch 'shindig' of https://github.com/ctiller/grpc into shindig
Conflicts: src/core/iomgr/tcp_client_windows.c
-rw-r--r--include/grpc++/security/auth_metadata_processor.h2
-rw-r--r--include/grpc++/security/credentials.h2
-rw-r--r--src/core/channel/channel_args.h2
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c66
-rw-r--r--src/core/iomgr/exec_ctx.h26
-rw-r--r--src/core/iomgr/tcp_client_windows.c2
-rw-r--r--src/core/iomgr/tcp_server_windows.c6
-rw-r--r--src/cpp/client/secure_credentials.cc2
-rw-r--r--test/cpp/end2end/end2end_test.cc8
9 files changed, 71 insertions, 45 deletions
diff --git a/include/grpc++/security/auth_metadata_processor.h b/include/grpc++/security/auth_metadata_processor.h
index 18ad922321..9b9c06e3b6 100644
--- a/include/grpc++/security/auth_metadata_processor.h
+++ b/include/grpc++/security/auth_metadata_processor.h
@@ -45,7 +45,7 @@ namespace grpc {
class AuthMetadataProcessor {
public:
typedef std::multimap<grpc::string_ref, grpc::string_ref> InputMetadata;
- typedef std::multimap<grpc::string, grpc::string_ref> OutputMetadata;
+ typedef std::multimap<grpc::string, grpc::string> OutputMetadata;
virtual ~AuthMetadataProcessor() {}
diff --git a/include/grpc++/security/credentials.h b/include/grpc++/security/credentials.h
index fafcfdc906..ff41bc597e 100644
--- a/include/grpc++/security/credentials.h
+++ b/include/grpc++/security/credentials.h
@@ -180,7 +180,7 @@ class MetadataCredentialsPlugin {
// Gets the auth metatada produced by this plugin.
virtual Status GetMetadata(
grpc::string_ref service_url,
- std::multimap<grpc::string, grpc::string_ref>* metadata) = 0;
+ std::multimap<grpc::string, grpc::string>* metadata) = 0;
};
std::shared_ptr<Credentials> MetadataCredentialsFromPlugin(
diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h
index f9e7b05860..480cc9aec2 100644
--- a/src/core/channel/channel_args.h
+++ b/src/core/channel/channel_args.h
@@ -71,7 +71,7 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
* compression algorithms are enabled. It's an error to disable an algorithm set
* by grpc_channel_args_set_compression_algorithm.
*
- * Returns an instance will the updated algorithm states. The \a a pointer is
+ * Returns an instance with the updated algorithm states. The \a a pointer is
* modified to point to the returned instance (which may be different from the
* input value of \a a). */
grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 213d5a172f..f640a0084a 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -87,24 +87,27 @@ typedef struct {
int resolved_num;
} zookeeper_resolver;
-static void zookeeper_destroy(grpc_resolver *r);
+static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
static void zookeeper_start_resolving_locked(zookeeper_resolver *r);
-static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r)
- GRPC_MUST_USE_RESULT;
+static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
+ zookeeper_resolver *r);
-static void zookeeper_shutdown(grpc_resolver *r);
-static void zookeeper_channel_saw_error(grpc_resolver *r,
+static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r);
+static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
-static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config,
+static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r,
+ grpc_client_config **target_config,
grpc_closure *on_complete);
static const grpc_resolver_vtable zookeeper_resolver_vtable = {
zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error,
zookeeper_next};
-static void zookeeper_shutdown(grpc_resolver *resolver) {
+static void zookeeper_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
@@ -116,11 +119,12 @@ static void zookeeper_shutdown(grpc_resolver *resolver) {
zookeeper_close(r->zookeeper_handle);
gpr_mu_unlock(&r->mu);
if (call != NULL) {
- call->cb(call->cb_arg, 1);
+ call->cb(exec_ctx, call->cb_arg, 1);
}
}
-static void zookeeper_channel_saw_error(grpc_resolver *resolver,
+static void zookeeper_channel_saw_error(grpc_exec_ctx *exec_ctx,
+ grpc_resolver *resolver,
struct sockaddr *sa, int len) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
gpr_mu_lock(&r->mu);
@@ -130,11 +134,10 @@ static void zookeeper_channel_saw_error(grpc_resolver *resolver,
gpr_mu_unlock(&r->mu);
}
-static void zookeeper_next(grpc_resolver *resolver,
+static void zookeeper_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_closure *on_complete) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
- grpc_closure *call;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->next_completion == NULL);
r->next_completion = on_complete;
@@ -142,10 +145,9 @@ static void zookeeper_next(grpc_resolver *resolver,
if (r->resolved_version == 0 && r->resolving == 0) {
zookeeper_start_resolving_locked(r);
} else {
- call = zookeeper_maybe_finish_next_locked(r);
+ zookeeper_maybe_finish_next_locked(exec_ctx, r);
}
gpr_mu_unlock(&r->mu);
- if (call) call->cb(call->cb_arg, 1);
}
/** Zookeeper global watcher for connection management
@@ -180,14 +182,13 @@ static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
/** Callback function after getting all resolved addresses
Creates a subchannel for each address */
-static void zookeeper_on_resolved(void *arg,
+static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolved_addresses *addresses) {
zookeeper_resolver *r = arg;
grpc_client_config *config = NULL;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
- grpc_closure *call;
size_t i;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
@@ -198,13 +199,13 @@ static void zookeeper_on_resolved(void *arg,
args.addr = (struct sockaddr *)(addresses->addrs[i].addr);
args.addr_len = addresses->addrs[i].len;
subchannels[i] = grpc_subchannel_factory_create_subchannel(
- r->subchannel_factory, &args);
+ exec_ctx, r->subchannel_factory, &args);
}
lb_policy_args.subchannels = subchannels;
lb_policy_args.num_subchannels = addresses->naddrs;
lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(config, lb_policy);
- GRPC_LB_POLICY_UNREF(lb_policy, "construction");
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
grpc_resolved_addresses_destroy(addresses);
gpr_free(subchannels);
}
@@ -212,20 +213,18 @@ static void zookeeper_on_resolved(void *arg,
GPR_ASSERT(r->resolving == 1);
r->resolving = 0;
if (r->resolved_config != NULL) {
- grpc_client_config_unref(r->resolved_config);
+ grpc_client_config_unref(exec_ctx, r->resolved_config);
}
r->resolved_config = config;
r->resolved_version++;
- call = zookeeper_maybe_finish_next_locked(r);
+ zookeeper_maybe_finish_next_locked(exec_ctx, r);
gpr_mu_unlock(&r->mu);
- if (call) call->cb(call->cb_arg, 1);
-
- GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving");
+ GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "zookeeper-resolving");
}
/** Callback function for each DNS resolved address */
-static void zookeeper_dns_resolved(void *arg,
+static void zookeeper_dns_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolved_addresses *addresses) {
size_t i;
zookeeper_resolver *r = arg;
@@ -251,7 +250,7 @@ static void zookeeper_dns_resolved(void *arg,
resolve_done = (r->resolved_num == r->resolved_total);
gpr_mu_unlock(&r->mu);
if (resolve_done) {
- zookeeper_on_resolved(r, r->resolved_addrs);
+ zookeeper_on_resolved(exec_ctx, r, r->resolved_addrs);
}
}
@@ -300,9 +299,11 @@ static void zookeeper_get_children_node_completion(int rc, const char *value,
char *address = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
int resolve_done = 0;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (rc != 0) {
gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name);
+ grpc_exec_ctx_finish(&exec_ctx);
return;
}
@@ -318,9 +319,11 @@ static void zookeeper_get_children_node_completion(int rc, const char *value,
resolve_done = (r->resolved_num == r->resolved_total);
gpr_mu_unlock(&r->mu);
if (resolve_done) {
- zookeeper_on_resolved(r, r->resolved_addrs);
+ zookeeper_on_resolved(&exec_ctx, r, r->resolved_addrs);
}
}
+
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void zookeeper_get_children_completion(
@@ -411,28 +414,27 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
zookeeper_resolve_address(r);
}
-static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
- grpc_closure *call = NULL;
+static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
+ zookeeper_resolver *r) {
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;
if (r->resolved_config != NULL) {
grpc_client_config_ref(r->resolved_config);
}
- call = r->next_completion;
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
- return call;
}
-static void zookeeper_destroy(grpc_resolver *gr) {
+static void zookeeper_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
zookeeper_resolver *r = (zookeeper_resolver *)gr;
gpr_mu_destroy(&r->mu);
if (r->resolved_config != NULL) {
- grpc_client_config_unref(r->resolved_config);
+ grpc_client_config_unref(exec_ctx, r->resolved_config);
}
- grpc_subchannel_factory_unref(r->subchannel_factory);
+ grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
gpr_free(r->name);
gpr_free(r->lb_policy_name);
gpr_free(r);
diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h
index 0cebc1665e..bfa08c41dd 100644
--- a/src/core/iomgr/exec_ctx.h
+++ b/src/core/iomgr/exec_ctx.h
@@ -36,6 +36,23 @@
#include "src/core/iomgr/closure.h"
+/** Execution context.
+ * A bag of data that collects information along a callstack.
+ * Generally created at public API entry points, and passed down as
+ * pointer to child functions that manipulate it.
+ *
+ * Specific responsibilities (this may grow in the future):
+ * - track a list of work that needs to be delayed until the top of the
+ * call stack (this provides a convenient mechanism to run callbacks
+ * without worrying about locking issues)
+ *
+ * CONVENTIONS:
+ * Instance of this must ALWAYS be constructed on the stack, never
+ * heap allocated. Instances and pointers to them must always be called
+ * exec_ctx. Instances are always passed as the first argument
+ * to a function that takes it, and always as a pointer (grpc_exec_ctx
+ * is never copied).
+ */
struct grpc_exec_ctx {
grpc_closure_list closure_list;
};
@@ -43,10 +60,17 @@ struct grpc_exec_ctx {
#define GRPC_EXEC_CTX_INIT \
{ GRPC_CLOSURE_LIST_INIT }
-void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
+/** Flush any work that has been enqueued onto this grpc_exec_ctx.
+ * Caller must guarantee that no interfering locks are held. */
void grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
+/** Finish any pending work for a grpc_exec_ctx. Must be called before
+ * the instance is destroyed, or work may be lost. */
+void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
+/** Add a closure to be executed at the next flush/finish point */
void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
int success);
+/** Add a list of closures to be executed at the next flush/finish point.
+ * Leaves \a list empty. */
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
grpc_closure_list *list);
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index 4dfab3b954..15fe30fb72 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -179,7 +179,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
socket = grpc_winsocket_create(sock, "client");
info = &socket->write_info;
- success = ConnectEx(sock, addr, addr_len, NULL, 0, NULL, &info->overlapped);
+ success = ConnectEx(sock, addr, (int)addr_len, NULL, 0, NULL, &info->overlapped);
/* It wouldn't be unusual to get a success immediately. But we'll still get
an IOCP notification, so let's ignore it. */
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index 9881a41152..55cd1a5d6e 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -152,7 +152,7 @@ void grpc_tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
/* Prepare (bind) a recently-created socket for listening. */
static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
- int addr_len) {
+ size_t addr_len) {
struct sockaddr_storage sockname_temp;
socklen_t sockname_len;
@@ -165,7 +165,7 @@ static int prepare_socket(SOCKET sock, const struct sockaddr *addr,
goto error;
}
- if (bind(sock, addr, addr_len) == SOCKET_ERROR) {
+ if (bind(sock, addr, (int)addr_len) == SOCKET_ERROR) {
char *addr_str;
char *utf8_message = gpr_format_message(WSAGetLastError());
grpc_sockaddr_to_string(&addr_str, addr, 0);
@@ -349,7 +349,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
}
static int add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
- const struct sockaddr *addr, int addr_len) {
+ const struct sockaddr *addr, size_t addr_len) {
server_port *sp;
int port;
int status;
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index 99b7468e86..1693cf740b 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -173,7 +173,7 @@ void MetadataCredentialsPluginWrapper::GetMetadata(
void MetadataCredentialsPluginWrapper::InvokePlugin(
const char* service_url, grpc_credentials_plugin_metadata_cb cb,
void* user_data) {
- std::multimap<grpc::string, grpc::string_ref> metadata;
+ std::multimap<grpc::string, grpc::string> metadata;
Status status = plugin_->GetMetadata(service_url, &metadata);
std::vector<grpc_metadata> md;
for (auto it = metadata.begin(); it != metadata.end(); ++it) {
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 7f047998c3..2659b0e213 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -121,7 +121,7 @@ class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
bool IsBlocking() const GRPC_OVERRIDE { return is_blocking_; }
Status GetMetadata(grpc::string_ref service_url,
- std::multimap<grpc::string, grpc::string_ref>* metadata)
+ std::multimap<grpc::string, grpc::string>* metadata)
GRPC_OVERRIDE {
EXPECT_GT(service_url.length(), 0UL);
EXPECT_TRUE(metadata != nullptr);
@@ -175,9 +175,9 @@ class TestAuthMetadataProcessor : public AuthMetadataProcessor {
if (auth_md_value == kGoodGuy) {
context->AddProperty(kIdentityPropName, kGoodGuy);
context->SetPeerIdentityPropertyName(kIdentityPropName);
- consumed_auth_metadata->insert(
- std::make_pair(string(auth_md->first.data(), auth_md->first.length()),
- auth_md->second));
+ consumed_auth_metadata->insert(std::make_pair(
+ string(auth_md->first.data(), auth_md->first.length()),
+ string(auth_md->second.data(), auth_md->second.length())));
return Status::OK;
} else {
return Status(StatusCode::UNAUTHENTICATED,