aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-09-21 08:01:05 -0700
committerGravatar Mark D. Roth <roth@google.com>2016-09-21 08:01:05 -0700
commit3c09810ac95d7aac9eb13e09ae6c59a68fc1217e (patch)
tree2de8790d0609c3ea75d3d6c0e6af40bf83be6b26 /src
parent219db50d39c2b5754e86838a56d7c8b0bc7273df (diff)
parent5bedd48c0e8b9582841c511037bc20781a6c681c (diff)
Merge remote-tracking branch 'upstream/master' into deadline_filter
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/census/trace_context.c86
-rw-r--r--src/core/ext/census/trace_context.h68
-rw-r--r--src/core/ext/census/tracing.c16
-rw-r--r--src/core/ext/client_config/client_channel.c32
-rw-r--r--src/core/ext/client_config/client_channel.h13
-rw-r--r--src/core/ext/client_config/lb_policy_factory.c20
-rw-r--r--src/core/ext/client_config/lb_policy_factory.h11
-rw-r--r--src/core/ext/client_config/resolver_factory.h5
-rw-r--r--src/core/ext/client_config/resolver_registry.c4
-rw-r--r--src/core/ext/client_config/resolver_registry.h3
-rw-r--r--src/core/ext/client_config/resolver_result.c58
-rw-r--r--src/core/ext/client_config/resolver_result.h32
-rw-r--r--src/core/ext/resolver/dns/native/dns_resolver.c25
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c19
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c17
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c15
-rw-r--r--src/php/lib/Grpc/BaseStub.php1
-rwxr-xr-xsrc/php/tests/interop/interop_client.php102
-rw-r--r--src/php/tests/unit_tests/CallCredentials2Test.php3
-rw-r--r--src/python/grpcio/grpc/__init__.py12
-rw-r--r--src/python/grpcio/grpc/_channel.py17
-rw-r--r--src/python/grpcio/grpc/_common.py10
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi5
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi4
-rw-r--r--src/python/grpcio/grpc/_server.py5
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--src/python/grpcio_tests/tests/tests.json1
-rw-r--r--src/python/grpcio_tests/tests/unit/_channel_args_test.py53
-rw-r--r--src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py10
-rw-r--r--src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py5
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py5
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py9
33 files changed, 463 insertions, 206 deletions
diff --git a/src/core/ext/census/trace_context.c b/src/core/ext/census/trace_context.c
new file mode 100644
index 0000000000..fbb20d3448
--- /dev/null
+++ b/src/core/ext/census/trace_context.c
@@ -0,0 +1,86 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/census/trace_context.h"
+
+#include <grpc/census.h>
+#include <grpc/support/log.h>
+#include <stdbool.h>
+
+#include "third_party/nanopb/pb_decode.h"
+#include "third_party/nanopb/pb_encode.h"
+
+// This function assumes the TraceContext is valid.
+size_t encode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer,
+ const size_t buf_size) {
+ // Create a stream that will write to our buffer.
+ pb_ostream_t stream = pb_ostream_from_buffer(buffer, buf_size);
+
+ // encode message
+ bool status = pb_encode(&stream, google_trace_TraceContext_fields, ctxt);
+
+ if (!status) {
+ gpr_log(GPR_DEBUG, "TraceContext encoding failed: %s",
+ PB_GET_ERROR(&stream));
+ return 0;
+ }
+
+ return stream.bytes_written;
+}
+
+bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer,
+ const size_t nbytes) {
+ // Create a stream that reads nbytes from the buffer.
+ pb_istream_t stream = pb_istream_from_buffer(buffer, nbytes);
+
+ // decode message
+ bool status = pb_decode(&stream, google_trace_TraceContext_fields, ctxt);
+
+ if (!status) {
+ gpr_log(GPR_DEBUG, "TraceContext decoding failed: %s",
+ PB_GET_ERROR(&stream));
+ return false;
+ }
+
+ // check fields
+ if (!ctxt->has_trace_id) {
+ gpr_log(GPR_DEBUG, "Invalid TraceContext: missing trace_id");
+ return false;
+ }
+ if (!ctxt->has_span_id) {
+ gpr_log(GPR_DEBUG, "Invalid TraceContext: missing span_id");
+ return false;
+ }
+
+ return true;
+}
diff --git a/src/core/ext/census/trace_context.h b/src/core/ext/census/trace_context.h
new file mode 100644
index 0000000000..ee71fef460
--- /dev/null
+++ b/src/core/ext/census/trace_context.h
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* Functions for manipulating trace contexts as defined in
+ src/proto/census/trace.proto */
+#ifndef GRPC_CORE_EXT_CENSUS_TRACE_CONTEXT_H
+#define GRPC_CORE_EXT_CENSUS_TRACE_CONTEXT_H
+
+#include "src/core/ext/census/gen/trace_context.pb.h"
+
+/* Maximum number of bytes required to encode a TraceContext (31)
+1 byte for trace_id field
+1 byte for trace_id length
+1 byte for trace_id.hi field
+8 bytes for trace_id.hi (uint64_t)
+1 byte for trace_id.lo field
+8 bytes for trace_id.lo (uint64_t)
+1 byte for span_id field
+8 bytes for span_id (uint64_t)
+1 byte for is_sampled field
+1 byte for is_sampled (bool) */
+#define TRACE_MAX_CONTEXT_SIZE 31
+
+/* Encode a trace context (ctxt) into proto format to the buffer provided. The
+size of buffer must be at least TRACE_MAX_CONTEXT_SIZE. On success, returns the
+number of bytes successfully encoded into buffer. On failure, returns 0. */
+size_t encode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer,
+ const size_t buf_size);
+
+/* Decode a proto-encoded TraceContext from the provided buffer into the
+TraceContext structure (ctxt). The function expects to be supplied the number
+of bytes to be read from buffer (nbytes). This function will also validate that
+the TraceContext has a span_id and a trace_id, and will return false if either
+of these do not exist. On success, returns true and false otherwise. */
+bool decode_trace_context(google_trace_TraceContext *ctxt, uint8_t *buffer,
+ const size_t nbytes);
+
+#endif
diff --git a/src/core/ext/census/tracing.c b/src/core/ext/census/tracing.c
index 3b5d6dab2b..8f0e12296d 100644
--- a/src/core/ext/census/tracing.c
+++ b/src/core/ext/census/tracing.c
@@ -31,15 +31,19 @@
*
*/
+//#include "src/core/ext/census/tracing.h"
+
#include <grpc/census.h>
/* TODO(aveitch): These are all placeholder implementations. */
-int census_trace_mask(const census_context *context) {
- return CENSUS_TRACE_MASK_NONE;
-}
+// int census_trace_mask(const census_context *context) {
+// return CENSUS_TRACE_MASK_NONE;
+// }
+
+// void census_set_trace_mask(int trace_mask) {}
-void census_set_trace_mask(int trace_mask) {}
+// void census_trace_print(census_context *context, uint32_t type,
+// const char *buffer, size_t n) {}
-void census_trace_print(census_context *context, uint32_t type,
- const char *buffer, size_t n) {}
+// void SetTracerParams(const Params& params);
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 3ed66a3313..2ec37a1a4d 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -42,6 +42,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
@@ -64,6 +65,8 @@ typedef struct client_channel_channel_data {
grpc_resolver *resolver;
/** have we started resolving this channel */
bool started_resolving;
+ /** client channel factory */
+ grpc_client_channel_factory *client_channel_factory;
/** mutex protecting client configuration, including all
variables below in this data structure */
@@ -174,20 +177,26 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
if (chand->resolver_result != NULL) {
- lb_policy = grpc_resolver_result_get_lb_policy(chand->resolver_result);
+ grpc_lb_policy_args lb_policy_args;
+ lb_policy_args.addresses =
+ grpc_resolver_result_get_addresses(chand->resolver_result);
+ lb_policy_args.additional_args =
+ grpc_resolver_result_get_lb_policy_args(chand->resolver_result);
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
+ lb_policy = grpc_lb_policy_create(
+ exec_ctx,
+ grpc_resolver_result_get_lb_policy_name(chand->resolver_result),
+ &lb_policy_args);
if (lb_policy != NULL) {
- GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
GRPC_ERROR_UNREF(state_error);
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
-
grpc_resolver_result_unref(exec_ctx, chand->resolver_result);
+ chand->resolver_result = NULL;
}
- chand->resolver_result = NULL;
-
if (lb_policy != NULL) {
grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties,
chand->interested_parties);
@@ -347,6 +356,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_resolver_shutdown(exec_ctx, chand->resolver);
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
}
+ if (chand->client_channel_factory != NULL) {
+ grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory);
+ }
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
chand->lb_policy->interested_parties,
@@ -796,10 +808,12 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
-void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- grpc_resolver *resolver) {
+void grpc_client_channel_finish_initialization(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver,
+ grpc_client_channel_factory *client_channel_factory) {
/* post construction initialization: set the transport setup pointer */
+ GPR_ASSERT(client_channel_factory != NULL);
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
@@ -813,6 +827,8 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
&chand->on_resolver_result_changed);
}
+ chand->client_channel_factory = client_channel_factory;
+ grpc_client_channel_factory_ref(client_channel_factory);
gpr_mu_unlock(&chand->mu);
}
diff --git a/src/core/ext/client_config/client_channel.h b/src/core/ext/client_config/client_channel.h
index 1e47ad34ad..abb5ac4d87 100644
--- a/src/core/ext/client_config/client_channel.h
+++ b/src/core/ext/client_config/client_channel.h
@@ -34,6 +34,7 @@
#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H
#define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H
+#include "src/core/ext/client_config/client_channel_factory.h"
#include "src/core/ext/client_config/resolver.h"
#include "src/core/lib/channel/channel_stack.h"
@@ -46,12 +47,12 @@
extern const grpc_channel_filter grpc_client_channel_filter;
-/* post-construction initializer to let the client channel know which
- transport setup it should cancel upon destruction, or initiate when it needs
- a connection */
-void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack,
- grpc_resolver *resolver);
+/* Post-construction initializer to give the client channel its resolver
+ and factory. */
+void grpc_client_channel_finish_initialization(
+ grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+ grpc_resolver *resolver,
+ grpc_client_channel_factory *client_channel_factory);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect);
diff --git a/src/core/ext/client_config/lb_policy_factory.c b/src/core/ext/client_config/lb_policy_factory.c
index f86117aa38..c17af91a09 100644
--- a/src/core/ext/client_config/lb_policy_factory.c
+++ b/src/core/ext/client_config/lb_policy_factory.c
@@ -34,6 +34,7 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
#include "src/core/ext/client_config/lb_policy_factory.h"
@@ -46,6 +47,25 @@ grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) {
return addresses;
}
+grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses,
+ void* (*user_data_copy)(void*)) {
+ grpc_lb_addresses* new_addresses =
+ grpc_lb_addresses_create(addresses->num_addresses);
+ memcpy(new_addresses->addresses, addresses->addresses,
+ sizeof(grpc_lb_address) * addresses->num_addresses);
+ for (size_t i = 0; i < addresses->num_addresses; ++i) {
+ if (new_addresses->addresses[i].balancer_name != NULL) {
+ new_addresses->addresses[i].balancer_name =
+ gpr_strdup(new_addresses->addresses[i].balancer_name);
+ }
+ if (user_data_copy != NULL) {
+ new_addresses->addresses[i].user_data =
+ user_data_copy(new_addresses->addresses[i].user_data);
+ }
+ }
+ return new_addresses;
+}
+
void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index,
void* address, size_t address_len,
bool is_balancer, char* balancer_name,
diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h
index b31179cf80..54408c6308 100644
--- a/src/core/ext/client_config/lb_policy_factory.h
+++ b/src/core/ext/client_config/lb_policy_factory.h
@@ -36,9 +36,9 @@
#include "src/core/ext/client_config/client_channel_factory.h"
#include "src/core/ext/client_config/lb_policy.h"
-#include "src/core/ext/client_config/resolver_result.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/resolve_address.h"
typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
@@ -68,6 +68,11 @@ typedef struct grpc_lb_addresses {
* \a num_addresses addresses. */
grpc_lb_addresses *grpc_lb_addresses_create(size_t num_addresses);
+/** Creates a copy of \a addresses. If \a user_data_copy is not NULL,
+ * it will be invoked to copy the \a user_data field of each address. */
+grpc_lb_addresses *grpc_lb_addresses_copy(grpc_lb_addresses *addresses,
+ void *(*user_data_copy)(void *));
+
/** Sets the value of the address at index \a index of \a addresses.
* \a address is a socket address of length \a address_len.
* Takes ownership of \a balancer_name. */
@@ -82,9 +87,13 @@ void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses,
void (*user_data_destroy)(void *));
/** Arguments passed to LB policies. */
+/* TODO(roth, ctiller): Consider replacing this struct with
+ grpc_channel_args. See comment in resolver_result.h for details. */
typedef struct grpc_lb_policy_args {
grpc_lb_addresses *addresses;
grpc_client_channel_factory *client_channel_factory;
+ /* Can be used to pass implementation-specific parameters to the LB policy. */
+ grpc_channel_args *additional_args;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
diff --git a/src/core/ext/client_config/resolver_factory.h b/src/core/ext/client_config/resolver_factory.h
index f69bf79564..9ec5b9a70e 100644
--- a/src/core/ext/client_config/resolver_factory.h
+++ b/src/core/ext/client_config/resolver_factory.h
@@ -47,10 +47,7 @@ struct grpc_resolver_factory {
const grpc_resolver_factory_vtable *vtable;
};
-typedef struct grpc_resolver_args {
- grpc_uri *uri;
- grpc_client_channel_factory *client_channel_factory;
-} grpc_resolver_args;
+typedef struct grpc_resolver_args { grpc_uri *uri; } grpc_resolver_args;
struct grpc_resolver_factory_vtable {
void (*ref)(grpc_resolver_factory *factory);
diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c
index e7a4abd568..b5308a140c 100644
--- a/src/core/ext/client_config/resolver_registry.c
+++ b/src/core/ext/client_config/resolver_registry.c
@@ -128,15 +128,13 @@ static grpc_resolver_factory *resolve_factory(const char *target,
return factory;
}
-grpc_resolver *grpc_resolver_create(
- const char *target, grpc_client_channel_factory *client_channel_factory) {
+grpc_resolver *grpc_resolver_create(const char *target) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver;
grpc_resolver_args args;
memset(&args, 0, sizeof(args));
args.uri = uri;
- args.client_channel_factory = client_channel_factory;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
grpc_uri_destroy(uri);
return resolver;
diff --git a/src/core/ext/client_config/resolver_registry.h b/src/core/ext/client_config/resolver_registry.h
index 5ef1383cd3..92e248d548 100644
--- a/src/core/ext/client_config/resolver_registry.h
+++ b/src/core/ext/client_config/resolver_registry.h
@@ -55,8 +55,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
If a resolver factory was found, use it to instantiate a resolver and
return it.
If a resolver factory was not found, return NULL. */
-grpc_resolver *grpc_resolver_create(
- const char *target, grpc_client_channel_factory *client_channel_factory);
+grpc_resolver *grpc_resolver_create(const char *target);
/** Find a resolver factory given a name and return an (owned-by-the-caller)
* reference to it */
diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c
index 242989a0da..59c9e7dc25 100644
--- a/src/core/ext/client_config/resolver_result.c
+++ b/src/core/ext/client_config/resolver_result.c
@@ -34,40 +34,54 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/channel/channel_args.h"
struct grpc_resolver_result {
gpr_refcount refs;
- grpc_lb_policy* lb_policy;
+ grpc_lb_addresses* addresses;
+ char* lb_policy_name;
+ grpc_channel_args* lb_policy_args;
};
-grpc_resolver_result* grpc_resolver_result_create() {
- grpc_resolver_result* c = gpr_malloc(sizeof(*c));
- memset(c, 0, sizeof(*c));
- gpr_ref_init(&c->refs, 1);
- return c;
+grpc_resolver_result* grpc_resolver_result_create(
+ grpc_lb_addresses* addresses, const char* lb_policy_name,
+ grpc_channel_args* lb_policy_args) {
+ grpc_resolver_result* result = gpr_malloc(sizeof(*result));
+ memset(result, 0, sizeof(*result));
+ gpr_ref_init(&result->refs, 1);
+ result->addresses = addresses;
+ result->lb_policy_name = gpr_strdup(lb_policy_name);
+ result->lb_policy_args = lb_policy_args;
+ return result;
}
-void grpc_resolver_result_ref(grpc_resolver_result* c) { gpr_ref(&c->refs); }
+void grpc_resolver_result_ref(grpc_resolver_result* result) {
+ gpr_ref(&result->refs);
+}
void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
- grpc_resolver_result* c) {
- if (gpr_unref(&c->refs)) {
- if (c->lb_policy != NULL) {
- GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "resolver_result");
- }
- gpr_free(c);
+ grpc_resolver_result* result) {
+ if (gpr_unref(&result->refs)) {
+ grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */);
+ gpr_free(result->lb_policy_name);
+ grpc_channel_args_destroy(result->lb_policy_args);
+ gpr_free(result);
}
}
-void grpc_resolver_result_set_lb_policy(grpc_resolver_result* c,
- grpc_lb_policy* lb_policy) {
- GPR_ASSERT(c->lb_policy == NULL);
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "resolver_result");
- }
- c->lb_policy = lb_policy;
+grpc_lb_addresses* grpc_resolver_result_get_addresses(
+ grpc_resolver_result* result) {
+ return result->addresses;
+}
+
+const char* grpc_resolver_result_get_lb_policy_name(
+ grpc_resolver_result* result) {
+ return result->lb_policy_name;
}
-grpc_lb_policy* grpc_resolver_result_get_lb_policy(grpc_resolver_result* c) {
- return c->lb_policy;
+grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
+ grpc_resolver_result* result) {
+ return result->lb_policy_args;
}
diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h
index 5a69d81990..d4118b90e8 100644
--- a/src/core/ext/client_config/resolver_result.h
+++ b/src/core/ext/client_config/resolver_result.h
@@ -32,22 +32,40 @@
#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
#define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H
-#include <stdbool.h>
-
-#include "src/core/ext/client_config/lb_policy.h"
+#include "src/core/ext/client_config/lb_policy_factory.h"
#include "src/core/lib/iomgr/resolve_address.h"
+// TODO(roth, ctiller): In the long term, we are considering replacing
+// the resolver_result data structure with grpc_channel_args. The idea is
+// that the resolver will return a set of channel args that contains the
+// information that is currently in the resolver_result struct. For
+// example, there will be specific args indicating the set of addresses
+// and the name of the LB policy to instantiate. Note that if we did
+// this, we would probably want to change the data structure of
+// grpc_channel_args such to a hash table or AVL or some other data
+// structure that does not require linear search to find keys.
+
/// Results reported from a grpc_resolver.
typedef struct grpc_resolver_result grpc_resolver_result;
-grpc_resolver_result* grpc_resolver_result_create();
+/// Takes ownership of \a addresses and \a lb_policy_args.
+grpc_resolver_result* grpc_resolver_result_create(
+ grpc_lb_addresses* addresses, const char* lb_policy_name,
+ grpc_channel_args* lb_policy_args);
void grpc_resolver_result_ref(grpc_resolver_result* result);
void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
grpc_resolver_result* result);
-void grpc_resolver_result_set_lb_policy(grpc_resolver_result* result,
- grpc_lb_policy* lb_policy);
-grpc_lb_policy* grpc_resolver_result_get_lb_policy(
+/// Caller does NOT take ownership of result.
+grpc_lb_addresses* grpc_resolver_result_get_addresses(
+ grpc_resolver_result* result);
+
+/// Caller does NOT take ownership of result.
+const char* grpc_resolver_result_get_lb_policy_name(
+ grpc_resolver_result* result);
+
+/// Caller does NOT take ownership of result.
+grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
grpc_resolver_result* result);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */
diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c
index 63682db7de..1b85d2d634 100644
--- a/src/core/ext/resolver/dns/native/dns_resolver.c
+++ b/src/core/ext/resolver/dns/native/dns_resolver.c
@@ -37,7 +37,6 @@
#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
@@ -58,8 +57,6 @@ typedef struct {
char *name;
/** default port to use */
char *default_port;
- /** subchannel factory */
- grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -166,31 +163,20 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
dns_resolver *r = arg;
grpc_resolver_result *result = NULL;
- grpc_lb_policy *lb_policy;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = 0;
if (r->addresses != NULL) {
- grpc_lb_policy_args lb_policy_args;
- memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.addresses = grpc_lb_addresses_create(r->addresses->naddrs);
+ grpc_lb_addresses *addresses =
+ grpc_lb_addresses_create(r->addresses->naddrs);
for (size_t i = 0; i < r->addresses->naddrs; ++i) {
grpc_lb_addresses_set_address(
- lb_policy_args.addresses, i, &r->addresses->addrs[i].addr,
+ addresses, i, &r->addresses->addrs[i].addr,
r->addresses->addrs[i].len, false /* is_balancer */,
NULL /* balancer_name */, NULL /* user_data */);
}
grpc_resolved_addresses_destroy(r->addresses);
- lb_policy_args.client_channel_factory = r->client_channel_factory;
- lb_policy =
- grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
- grpc_lb_addresses_destroy(lb_policy_args.addresses,
- NULL /* user_data_destroy */);
- result = grpc_resolver_result_create();
- if (lb_policy != NULL) {
- grpc_resolver_result_set_lb_policy(result, lb_policy);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
- }
+ result = grpc_resolver_result_create(addresses, r->lb_policy_name, NULL);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
@@ -251,7 +237,6 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
if (r->resolved_result) {
grpc_resolver_result_unref(exec_ctx, r->resolved_result);
}
- grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
@@ -278,10 +263,8 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->name = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
- r->client_channel_factory = args->client_channel_factory;
gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER,
BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000);
- grpc_client_channel_factory_ref(r->client_channel_factory);
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index fbfe5d774b..ced4fb024c 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -40,7 +40,6 @@
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
-#include "src/core/ext/client_config/lb_policy_registry.h"
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/lib/iomgr/resolve_address.h"
@@ -52,8 +51,6 @@ typedef struct {
grpc_resolver base;
/** refcount */
gpr_refcount refs;
- /** subchannel factory */
- grpc_client_channel_factory *client_channel_factory;
/** load balancing policy name */
char *lb_policy_name;
@@ -122,17 +119,10 @@ static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
sockaddr_resolver *r) {
if (r->next_completion != NULL && !r->published) {
- grpc_resolver_result *result = grpc_resolver_result_create();
- grpc_lb_policy_args lb_policy_args;
- memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.addresses = r->addresses;
- lb_policy_args.client_channel_factory = r->client_channel_factory;
- grpc_lb_policy *lb_policy =
- grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
- grpc_resolver_result_set_lb_policy(result, lb_policy);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = true;
- *r->target_result = result;
+ *r->target_result = grpc_resolver_result_create(
+ grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
+ r->lb_policy_name, NULL);
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}
@@ -141,7 +131,6 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
- grpc_client_channel_factory_unref(exec_ctx, r->client_channel_factory);
grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r->lb_policy_name);
gpr_free(r);
@@ -243,8 +232,6 @@ static grpc_resolver *sockaddr_create(
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
- r->client_channel_factory = args->client_channel_factory;
- grpc_client_channel_factory_ref(r->client_channel_factory);
return &r->base;
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index cbaa75a90a..ddc00bd79f 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -160,7 +160,6 @@ typedef struct {
grpc_client_channel_factory base;
gpr_refcount refs;
grpc_channel_args *merge_args;
- grpc_channel *master;
} client_channel_factory;
static void client_channel_factory_ref(
@@ -173,10 +172,6 @@ static void client_channel_factory_unref(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
client_channel_factory *f = (client_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
- if (f->master != NULL) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
- "client_channel_factory");
- }
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
@@ -210,15 +205,15 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(final_args);
- grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
+ grpc_resolver *resolver = grpc_resolver_create(target);
if (!resolver) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
"client_channel_factory_create_channel");
return NULL;
}
- grpc_client_channel_set_resolver(
- exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ grpc_client_channel_finish_initialization(
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel");
return channel;
@@ -250,12 +245,8 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
grpc_channel *channel = client_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
- if (channel != NULL) {
- f->master = channel;
- GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create");
- }
- grpc_client_channel_factory_unref(&exec_ctx, &f->base);
+ grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
return channel != NULL ? channel : grpc_lame_client_channel_create(
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 9e2bdd758f..f36fbbfc57 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -220,7 +220,6 @@ typedef struct {
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel_security_connector *security_connector;
- grpc_channel *master;
} client_channel_factory;
static void client_channel_factory_ref(
@@ -235,10 +234,6 @@ static void client_channel_factory_unref(
if (gpr_unref(&f->refs)) {
GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base,
"client_channel_factory");
- if (f->master != NULL) {
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
- "client_channel_factory");
- }
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
@@ -276,10 +271,10 @@ static grpc_channel *client_channel_factory_create_channel(
GRPC_CLIENT_CHANNEL, NULL);
grpc_channel_args_destroy(final_args);
- grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
+ grpc_resolver *resolver = grpc_resolver_create(target);
if (resolver != NULL) {
- grpc_client_channel_set_resolver(
- exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
+ grpc_client_channel_finish_initialization(
+ exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base);
GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create");
} else {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
@@ -356,10 +351,6 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds,
grpc_channel *channel = client_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
- if (channel != NULL) {
- f->master = channel;
- GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_secure_channel_create");
- }
grpc_client_channel_factory_unref(&exec_ctx, &f->base);
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php
index d850eebc78..8a7f6572a6 100644
--- a/src/php/lib/Grpc/BaseStub.php
+++ b/src/php/lib/Grpc/BaseStub.php
@@ -182,6 +182,7 @@ class BaseStub
} else {
$hostname = $this->hostname;
}
+
return 'https://'.$hostname.$service_name;
}
diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php
index 94ceeda02c..4bc4589956 100755
--- a/src/php/tests/interop/interop_client.php
+++ b/src/php/tests/interop/interop_client.php
@@ -70,7 +70,8 @@ function hardAssertIfStatusOk($status)
*/
function emptyUnary($stub)
{
- list($result, $status) = $stub->EmptyCall(new grpc\testing\EmptyMessage())->wait();
+ list($result, $status) =
+ $stub->EmptyCall(new grpc\testing\EmptyMessage())->wait();
hardAssertIfStatusOk($status);
hardAssert($result !== null, 'Call completed with a null response');
}
@@ -92,8 +93,8 @@ function largeUnary($stub)
* @param $fillUsername boolean whether to fill result with username
* @param $fillOauthScope boolean whether to fill result with oauth scope
*/
-function performLargeUnary($stub, $fillUsername = false, $fillOauthScope = false,
- $callback = false)
+function performLargeUnary($stub, $fillUsername = false,
+ $fillOauthScope = false, $callback = false)
{
$request_len = 271828;
$response_len = 314159;
@@ -118,11 +119,11 @@ function performLargeUnary($stub, $fillUsername = false, $fillOauthScope = false
hardAssert($result !== null, 'Call returned a null response');
$payload = $result->getPayload();
hardAssert($payload->getType() === grpc\testing\PayloadType::COMPRESSABLE,
- 'Payload had the wrong type');
+ 'Payload had the wrong type');
hardAssert(strlen($payload->getBody()) === $response_len,
- 'Payload had the wrong length');
+ 'Payload had the wrong length');
hardAssert($payload->getBody() === str_repeat("\0", $response_len),
- 'Payload had the wrong content');
+ 'Payload had the wrong content');
return $result;
}
@@ -141,11 +142,12 @@ function serviceAccountCreds($stub, $args)
$jsonKey = json_decode(
file_get_contents(getenv(CredentialsLoader::ENV_VAR)),
true);
- $result = performLargeUnary($stub, $fillUsername = true, $fillOauthScope = true);
- hardAssert($result->getUsername() == $jsonKey['client_email'],
- 'invalid email returned');
+ $result = performLargeUnary($stub, $fillUsername = true,
+ $fillOauthScope = true);
+ hardAssert($result->getUsername() === $jsonKey['client_email'],
+ 'invalid email returned');
hardAssert(strpos($args['oauth_scope'], $result->getOauthScope()) !== false,
- 'invalid oauth scope returned');
+ 'invalid oauth scope returned');
}
/**
@@ -163,9 +165,10 @@ function computeEngineCreds($stub, $args)
if (!array_key_exists('default_service_account', $args)) {
throw new Exception('Missing default_service_account');
}
- $result = performLargeUnary($stub, $fillUsername = true, $fillOauthScope = true);
- hardAssert($args['default_service_account'] == $result->getUsername(),
- 'invalid email returned');
+ $result = performLargeUnary($stub, $fillUsername = true,
+ $fillOauthScope = true);
+ hardAssert($args['default_service_account'] === $result->getUsername(),
+ 'invalid email returned');
}
/**
@@ -179,9 +182,10 @@ function jwtTokenCreds($stub, $args)
$jsonKey = json_decode(
file_get_contents(getenv(CredentialsLoader::ENV_VAR)),
true);
- $result = performLargeUnary($stub, $fillUsername = true, $fillOauthScope = true);
- hardAssert($result->getUsername() == $jsonKey['client_email'],
- 'invalid email returned');
+ $result = performLargeUnary($stub, $fillUsername = true,
+ $fillOauthScope = true);
+ hardAssert($result->getUsername() === $jsonKey['client_email'],
+ 'invalid email returned');
}
/**
@@ -195,9 +199,10 @@ function oauth2AuthToken($stub, $args)
$jsonKey = json_decode(
file_get_contents(getenv(CredentialsLoader::ENV_VAR)),
true);
- $result = performLargeUnary($stub, $fillUsername = true, $fillOauthScope = true);
- hardAssert($result->getUsername() == $jsonKey['client_email'],
- 'invalid email returned');
+ $result = performLargeUnary($stub, $fillUsername = true,
+ $fillOauthScope = true);
+ hardAssert($result->getUsername() === $jsonKey['client_email'],
+ 'invalid email returned');
}
function updateAuthMetadataCallback($context)
@@ -209,8 +214,9 @@ function updateAuthMetadataCallback($context)
$metadata = [];
$result = $auth_credentials->updateMetadata([], $authUri);
foreach ($result as $key => $value) {
- $metadata[strtolower($key)] = $value;
+ $metadata[strtolower($key)] = $value;
}
+
return $metadata;
}
@@ -226,10 +232,11 @@ function perRpcCreds($stub, $args)
file_get_contents(getenv(CredentialsLoader::ENV_VAR)),
true);
- $result = performLargeUnary($stub, $fillUsername = true, $fillOauthScope = true,
+ $result = performLargeUnary($stub, $fillUsername = true,
+ $fillOauthScope = true,
'updateAuthMetadataCallback');
- hardAssert($result->getUsername() == $jsonKey['client_email'],
- 'invalid email returned');
+ hardAssert($result->getUsername() === $jsonKey['client_email'],
+ 'invalid email returned');
}
/**
@@ -258,7 +265,7 @@ function clientStreaming($stub)
list($result, $status) = $call->wait();
hardAssertIfStatusOk($status);
hardAssert($result->getAggregatedPayloadSize() === 74922,
- 'aggregated_payload_size was incorrect');
+ 'aggregated_payload_size was incorrect');
}
/**
@@ -283,10 +290,11 @@ function serverStreaming($stub)
foreach ($call->responses() as $value) {
hardAssert($i < 4, 'Too many responses');
$payload = $value->getPayload();
- hardAssert($payload->getType() === grpc\testing\PayloadType::COMPRESSABLE,
- 'Payload '.$i.' had the wrong type');
+ hardAssert(
+ $payload->getType() === grpc\testing\PayloadType::COMPRESSABLE,
+ 'Payload '.$i.' had the wrong type');
hardAssert(strlen($payload->getBody()) === $sizes[$i],
- 'Response '.$i.' had the wrong length');
+ 'Response '.$i.' had the wrong length');
$i += 1;
}
hardAssertIfStatusOk($call->getStatus());
@@ -318,10 +326,11 @@ function pingPong($stub)
hardAssert($response !== null, 'Server returned too few responses');
$payload = $response->getPayload();
- hardAssert($payload->getType() === grpc\testing\PayloadType::COMPRESSABLE,
- 'Payload '.$i.' had the wrong type');
+ hardAssert(
+ $payload->getType() === grpc\testing\PayloadType::COMPRESSABLE,
+ 'Payload '.$i.' had the wrong type');
hardAssert(strlen($payload->getBody()) === $response_lengths[$i],
- 'Payload '.$i.' had the wrong length');
+ 'Payload '.$i.' had the wrong length');
}
$call->writesDone();
hardAssert($call->read() === null, 'Server returned too many responses');
@@ -352,7 +361,7 @@ function cancelAfterBegin($stub)
$call->cancel();
list($result, $status) = $call->wait();
hardAssert($status->code === Grpc\STATUS_CANCELLED,
- 'Call status was not CANCELLED');
+ 'Call status was not CANCELLED');
}
/**
@@ -377,7 +386,7 @@ function cancelAfterFirstResponse($stub)
$call->cancel();
hardAssert($call->getStatus()->code === Grpc\STATUS_CANCELLED,
- 'Call status was not CANCELLED');
+ 'Call status was not CANCELLED');
}
function timeoutOnSleepingServer($stub)
@@ -396,7 +405,7 @@ function timeoutOnSleepingServer($stub)
$response = $call->read();
hardAssert($call->getStatus()->code === Grpc\STATUS_DEADLINE_EXCEEDED,
- 'Call status was not DEADLINE_EXCEEDED');
+ 'Call status was not DEADLINE_EXCEEDED');
}
function customMetadata($stub)
@@ -425,9 +434,9 @@ function customMetadata($stub)
$initial_metadata = $call->getMetadata();
hardAssert(array_key_exists($ECHO_INITIAL_KEY, $initial_metadata),
'Initial metadata does not contain expected key');
- hardAssert($initial_metadata[$ECHO_INITIAL_KEY][0] ==
- $ECHO_INITIAL_VALUE,
- 'Incorrect initial metadata value');
+ hardAssert(
+ $initial_metadata[$ECHO_INITIAL_KEY][0] === $ECHO_INITIAL_VALUE,
+ 'Incorrect initial metadata value');
list($result, $status) = $call->wait();
hardAssertIfStatusOk($status);
@@ -435,8 +444,9 @@ function customMetadata($stub)
$trailing_metadata = $call->getTrailingMetadata();
hardAssert(array_key_exists($ECHO_TRAILING_KEY, $trailing_metadata),
'Trailing metadata does not contain expected key');
- hardAssert($trailing_metadata[$ECHO_TRAILING_KEY][0] ==
- $ECHO_TRAILING_VALUE, 'Incorrect trailing metadata value');
+ hardAssert(
+ $trailing_metadata[$ECHO_TRAILING_KEY][0] === $ECHO_TRAILING_VALUE,
+ 'Incorrect trailing metadata value');
$streaming_call = $stub->FullDuplexCall($metadata);
@@ -451,7 +461,7 @@ function customMetadata($stub)
hardAssert(array_key_exists($ECHO_TRAILING_KEY,
$streaming_trailing_metadata),
'Trailing metadata does not contain expected key');
- hardAssert($streaming_trailing_metadata[$ECHO_TRAILING_KEY][0] ==
+ hardAssert($streaming_trailing_metadata[$ECHO_TRAILING_KEY][0] ===
$ECHO_TRAILING_VALUE, 'Incorrect trailing metadata value');
}
@@ -506,7 +516,7 @@ function _makeStub($args)
throw new Exception('Missing argument: --test_case is required');
}
- if ($args['server_port'] == 443) {
+ if ($args['server_port'] === 443) {
$server_address = $args['server_host'];
} else {
$server_address = $args['server_host'].':'.$args['server_port'];
@@ -548,7 +558,7 @@ function _makeStub($args)
if (in_array($test_case, ['service_account_creds',
'compute_engine_creds', 'jwt_token_creds', ])) {
- if ($test_case == 'jwt_token_creds') {
+ if ($test_case === 'jwt_token_creds') {
$auth_credentials = ApplicationDefaultCredentials::getCredentials();
} else {
$auth_credentials = ApplicationDefaultCredentials::getCredentials(
@@ -558,7 +568,7 @@ function _makeStub($args)
$opts['update_metadata'] = $auth_credentials->getUpdateMetadataFunc();
}
- if ($test_case == 'oauth2_auth_token') {
+ if ($test_case === 'oauth2_auth_token') {
$auth_credentials = ApplicationDefaultCredentials::getCredentials(
$args['oauth_scope']
);
@@ -578,8 +588,9 @@ function _makeStub($args)
$opts['update_metadata'] = $update_metadata;
}
- if ($test_case == 'unimplemented_method') {
- $stub = new grpc\testing\UnimplementedServiceClient($server_address, $opts);
+ if ($test_case === 'unimplemented_method') {
+ $stub = new grpc\testing\UnimplementedServiceClient($server_address,
+ $opts);
} else {
$stub = new grpc\testing\TestServiceClient($server_address, $opts);
}
@@ -656,7 +667,8 @@ function interop_main($args, $stub = false)
return $stub;
}
-if (isset($_SERVER['PHP_SELF']) && preg_match('/interop_client/', $_SERVER['PHP_SELF'])) {
+if (isset($_SERVER['PHP_SELF']) &&
+ preg_match('/interop_client/', $_SERVER['PHP_SELF'])) {
$args = getopt('', ['server_host:', 'server_port:', 'test_case:',
'use_tls::', 'use_test_ca::',
'server_host_override:', 'oauth_scope:',
diff --git a/src/php/tests/unit_tests/CallCredentials2Test.php b/src/php/tests/unit_tests/CallCredentials2Test.php
index b3b98a22ca..fa31439c54 100644
--- a/src/php/tests/unit_tests/CallCredentials2Test.php
+++ b/src/php/tests/unit_tests/CallCredentials2Test.php
@@ -170,7 +170,7 @@ class CallCredentials2Test extends PHPUnit_Framework_TestCase
$this->assertTrue(is_string($context->service_url));
$this->assertTrue(is_string($context->method_name));
- return "a string";
+ return 'a string';
}
public function testCallbackWithInvalidReturnValue()
@@ -196,5 +196,4 @@ class CallCredentials2Test extends PHPUnit_Framework_TestCase
$this->assertTrue($event->send_close);
$this->assertTrue($event->status->code == Grpc\STATUS_UNAUTHENTICATED);
}
-
}
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index faf3ab5f0d..b9b662c032 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -1189,7 +1189,7 @@ def insecure_channel(target, options=None):
A Channel to the target through which RPCs may be conducted.
"""
from grpc import _channel
- return _channel.Channel(target, options, None)
+ return _channel.Channel(target, () if options is None else options, None)
def secure_channel(target, credentials, options=None):
@@ -1205,10 +1205,11 @@ def secure_channel(target, credentials, options=None):
A Channel to the target through which RPCs may be conducted.
"""
from grpc import _channel
- return _channel.Channel(target, options, credentials._credentials)
+ return _channel.Channel(target, () if options is None else options,
+ credentials._credentials)
-def server(thread_pool, handlers=None):
+def server(thread_pool, handlers=None, options=None):
"""Creates a Server with which RPCs can be serviced.
Args:
@@ -1219,12 +1220,15 @@ def server(thread_pool, handlers=None):
only handlers the server will use to service RPCs; other handlers may
later be added by calling add_generic_rpc_handlers any time before the
returned Server is started.
+ options: A sequence of string-value pairs according to which to configure
+ the created server.
Returns:
A Server with which RPCs can be serviced.
"""
from grpc import _server
- return _server.Server(thread_pool, () if handlers is None else handlers)
+ return _server.Server(thread_pool, () if handlers is None else handlers,
+ () if options is None else options)
################################### __all__ #################################
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 3117dd1cb3..53a26727ab 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -842,18 +842,8 @@ def _unsubscribe(state, callback):
def _options(options):
- if options is None:
- pairs = ((cygrpc.ChannelArgKey.primary_user_agent_string, _USER_AGENT),)
- else:
- pairs = list(options) + [
- (cygrpc.ChannelArgKey.primary_user_agent_string, _USER_AGENT)]
- encoded_pairs = [
- (_common.encode(arg_name), arg_value) if isinstance(arg_value, int)
- else (_common.encode(arg_name), _common.encode(arg_value))
- for arg_name, arg_value in pairs]
- return cygrpc.ChannelArgs([
- cygrpc.ChannelArg(arg_name, arg_value)
- for arg_name, arg_value in encoded_pairs])
+ return list(options) + [
+ (cygrpc.ChannelArgKey.primary_user_agent_string, _USER_AGENT)]
class Channel(grpc.Channel):
@@ -867,7 +857,8 @@ class Channel(grpc.Channel):
credentials: A cygrpc.ChannelCredentials or None.
"""
self._channel = cygrpc.Channel(
- _common.encode(target), _options(options), credentials)
+ _common.encode(target), _common.channel_args(_options(options)),
+ credentials)
self._call_state = _ChannelCallState(self._channel)
self._connectivity_state = _ChannelConnectivityState(self._channel)
diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py
index 4d7d521419..cc0984c8c6 100644
--- a/src/python/grpcio/grpc/_common.py
+++ b/src/python/grpcio/grpc/_common.py
@@ -94,6 +94,16 @@ def decode(b):
return b.decode('latin1')
+def channel_args(options):
+ channel_args = []
+ for key, value in options:
+ if isinstance(value, six.string_types):
+ channel_args.append(cygrpc.ChannelArg(encode(key), encode(value)))
+ else:
+ channel_args.append(cygrpc.ChannelArg(encode(key), value))
+ return cygrpc.ChannelArgs(channel_args)
+
+
def cygrpc_metadata(application_metadata):
return _EMPTY_METADATA if application_metadata is None else cygrpc.Metadata(
cygrpc.Metadatum(encode(key), encode(value))
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 3df937eb14..73d1ff7b97 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -32,15 +32,16 @@ cimport cpython
cdef class Channel:
- def __cinit__(self, bytes target, ChannelArgs arguments=None,
+ def __cinit__(self, bytes target, ChannelArgs arguments,
ChannelCredentials channel_credentials=None):
grpc_init()
cdef grpc_channel_args *c_arguments = NULL
cdef char *c_target = NULL
self.c_channel = NULL
self.references = []
- if arguments is not None:
+ if len(arguments) > 0:
c_arguments = &arguments.c_args
+ self.references.append(arguments)
c_target = target
if channel_credentials is None:
with nogil:
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index ca2b831114..18db38b686 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -34,12 +34,12 @@ import time
cdef class Server:
- def __cinit__(self, ChannelArgs arguments=None):
+ def __cinit__(self, ChannelArgs arguments):
grpc_init()
cdef grpc_channel_args *c_arguments = NULL
self.references = []
self.registered_completion_queues = []
- if arguments is not None:
+ if len(arguments) > 0:
c_arguments = &arguments.c_args
self.references.append(arguments)
with nogil:
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 94a13bfb2f..5f846ce773 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -728,12 +728,11 @@ def _start(state):
cleanup_server, target=_serve, args=(state,))
thread.start()
-
class Server(grpc.Server):
- def __init__(self, thread_pool, generic_handlers):
+ def __init__(self, thread_pool, generic_handlers, options):
completion_queue = cygrpc.CompletionQueue()
- server = cygrpc.Server()
+ server = cygrpc.Server(_common.channel_args(options))
server.register_completion_queue(completion_queue)
self._state = _ServerState(
completion_queue, server, generic_handlers, thread_pool)
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 137177774b..ef9e85ccfb 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -268,6 +268,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/census/operation.c',
'src/core/ext/census/placeholders.c',
'src/core/ext/census/resource.c',
+ 'src/core/ext/census/trace_context.c',
'src/core/ext/census/tracing.c',
'src/core/plugin_registry/grpc_plugin_registry.c',
'src/boringssl/err_data.c',
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index dcaef0db1f..2071a33e13 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -7,6 +7,7 @@
"_beta_features_test.BetaFeaturesTest",
"_beta_features_test.ContextManagementAndLifecycleTest",
"_cancel_many_calls_test.CancelManyCallsTest",
+ "_channel_args_test.ChannelArgsTest",
"_channel_connectivity_test.ChannelConnectivityTest",
"_channel_ready_future_test.ChannelReadyFutureTest",
"_channel_test.ChannelTest",
diff --git a/src/python/grpcio_tests/tests/unit/_channel_args_test.py b/src/python/grpcio_tests/tests/unit/_channel_args_test.py
new file mode 100644
index 0000000000..6a636d7993
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_channel_args_test.py
@@ -0,0 +1,53 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of Channel Args on client/server side."""
+
+import unittest
+
+import grpc
+
+TEST_CHANNEL_ARGS = (
+ ('arg1', b'bytes_val'),
+ ('arg2', 'str_val'),
+ ('arg3', 1),
+ (b'arg4', 'str_val'),
+)
+
+
+class ChannelArgsTest(unittest.TestCase):
+
+ def test_client(self):
+ grpc.insecure_channel('localhost:8080', options=TEST_CHANNEL_ARGS)
+
+ def test_server(self):
+ grpc.server(None, options=TEST_CHANNEL_ARGS)
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
index 9cae96a00d..1324ad37b6 100644
--- a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
+++ b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
@@ -78,7 +78,7 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_lonely_channel_connectivity(self):
callback = _Callback()
- channel = _channel.Channel('localhost:12345', None, None)
+ channel = _channel.Channel('localhost:12345', (), None)
channel.subscribe(callback.update, try_to_connect=False)
first_connectivities = callback.block_until_connectivities_satisfy(bool)
channel.subscribe(callback.update, try_to_connect=True)
@@ -105,13 +105,13 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_immediately_connectable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
- server = _server.Server(thread_pool, ())
+ server = _server.Server(thread_pool, (), ())
port = server.add_insecure_port('[::]:0')
server.start()
first_callback = _Callback()
second_callback = _Callback()
- channel = _channel.Channel('localhost:{}'.format(port), None, None)
+ channel = _channel.Channel('localhost:{}'.format(port), (), None)
channel.subscribe(first_callback.update, try_to_connect=False)
first_connectivities = first_callback.block_until_connectivities_satisfy(
bool)
@@ -146,12 +146,12 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_reachable_then_unreachable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
- server = _server.Server(thread_pool, ())
+ server = _server.Server(thread_pool, (), ())
port = server.add_insecure_port('[::]:0')
server.start()
callback = _Callback()
- channel = _channel.Channel('localhost:{}'.format(port), None, None)
+ channel = _channel.Channel('localhost:{}'.format(port), (), None)
channel.subscribe(callback.update, try_to_connect=True)
callback.block_until_connectivities_satisfy(_ready_in_connectivities)
# Now take down the server and confirm that channel readiness is repudiated.
diff --git a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py
index 24f5b45b18..60d478bcd9 100644
--- a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py
+++ b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py
@@ -79,7 +79,7 @@ class ChannelReadyFutureTest(unittest.TestCase):
def test_immediately_connectable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
- server = _server.Server(thread_pool, ())
+ server = _server.Server(thread_pool, (), ())
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:{}'.format(port))
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
index cf212c5653..20115fb22c 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
@@ -157,11 +157,12 @@ class CancelManyCallsTest(unittest.TestCase):
server_thread_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
server_completion_queue = cygrpc.CompletionQueue()
- server = cygrpc.Server()
+ server = cygrpc.Server(cygrpc.ChannelArgs([]))
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()
- channel = cygrpc.Channel('localhost:{}'.format(port).encode())
+ channel = cygrpc.Channel('localhost:{}'.format(port).encode(),
+ cygrpc.ChannelArgs([]))
state = _State()
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
index 152d8edde3..2ae5285232 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -124,11 +124,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
def testReadSomeButNotAllResponses(self):
server_completion_queue = cygrpc.CompletionQueue()
- server = cygrpc.Server()
+ server = cygrpc.Server(cygrpc.ChannelArgs([]))
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()
- channel = cygrpc.Channel('localhost:{}'.format(port).encode())
+ channel = cygrpc.Channel('localhost:{}'.format(port).encode(),
+ cygrpc.ChannelArgs([]))
server_shutdown_tag = 'server_shutdown_tag'
server_driver = _ServerDriver(server_completion_queue, server_shutdown_tag)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 142387d810..8dedebfabe 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -121,7 +121,7 @@ class TypeSmokeTest(unittest.TestCase):
del call_credentials
def testServerStartNoExplicitShutdown(self):
- server = cygrpc.Server()
+ server = cygrpc.Server(cygrpc.ChannelArgs([]))
completion_queue = cygrpc.CompletionQueue()
server.register_completion_queue(completion_queue)
port = server.add_http2_port(b'[::]:0')
@@ -131,7 +131,7 @@ class TypeSmokeTest(unittest.TestCase):
def testServerStartShutdown(self):
completion_queue = cygrpc.CompletionQueue()
- server = cygrpc.Server()
+ server = cygrpc.Server(cygrpc.ChannelArgs([]))
server.add_http2_port(b'[::]:0')
server.register_completion_queue(completion_queue)
server.start()
@@ -148,7 +148,7 @@ class ServerClientMixin(object):
def setUpMixin(self, server_credentials, client_credentials, host_override):
self.server_completion_queue = cygrpc.CompletionQueue()
- self.server = cygrpc.Server()
+ self.server = cygrpc.Server(cygrpc.ChannelArgs([]))
self.server.register_completion_queue(self.server_completion_queue)
if server_credentials:
self.port = self.server.add_http2_port(b'[::]:0', server_credentials)
@@ -164,7 +164,8 @@ class ServerClientMixin(object):
'localhost:{}'.format(self.port).encode(), client_channel_arguments,
client_credentials)
else:
- self.client_channel = cygrpc.Channel('localhost:{}'.format(self.port).encode())
+ self.client_channel = cygrpc.Channel(
+ 'localhost:{}'.format(self.port).encode(), cygrpc.ChannelArgs([]))
if host_override:
self.host_argument = None # default host
self.expected_host = host_override