diff options
author | 2016-03-23 14:57:52 -0700 | |
---|---|---|
committer | 2016-03-23 14:57:52 -0700 | |
commit | 995bc78441cbd58fe026644265c204d2a2a7551b (patch) | |
tree | dfa81e4a49ab5b77c0313c9acc6b09a260aafb43 /src | |
parent | e5b19fe84f0c4b473872632553b3cf5b21e82e60 (diff) | |
parent | 634e67617b3b445185b5485f493b591b53b35d9d (diff) |
Merge github.com:grpc/grpc into fuzzy-bits
Diffstat (limited to 'src')
75 files changed, 1117 insertions, 1000 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 97295bed45..206a6e1fe5 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -83,6 +83,28 @@ grpc::string FilenameIdentifier(const grpc::string &filename) { } } // namespace +template<class T, size_t N> +T *array_end(T (&array)[N]) { return array + N; } + +void PrintIncludes(grpc::protobuf::io::Printer *printer, const std::vector<grpc::string>& headers, const Parameters ¶ms) { + std::map<grpc::string, grpc::string> vars; + + vars["l"] = params.use_system_headers ? '<' : '"'; + vars["r"] = params.use_system_headers ? '>' : '"'; + + if (!params.grpc_search_path.empty()) { + vars["l"] += params.grpc_search_path; + if (params.grpc_search_path.back() != '/') { + vars["l"] += '/'; + } + } + + for (auto i = headers.begin(); i != headers.end(); i++) { + vars["h"] = *i; + printer->Print(vars, "#include $l$$h$$r$\n"); + } +} + grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { grpc::string output; @@ -111,36 +133,46 @@ grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file, grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { - grpc::string temp = - "#include <grpc++/impl/codegen/async_stream.h>\n" - "#include <grpc++/impl/codegen/async_unary_call.h>\n" - "#include <grpc++/impl/codegen/proto_utils.h>\n" - "#include <grpc++/impl/codegen/rpc_method.h>\n" - "#include <grpc++/impl/codegen/service_type.h>\n" - "#include <grpc++/impl/codegen/status.h>\n" - "#include <grpc++/impl/codegen/stub_options.h>\n" - "#include <grpc++/impl/codegen/sync_stream.h>\n" - "\n" - "namespace grpc {\n" - "class CompletionQueue;\n" - "class RpcService;\n" - "class ServerCompletionQueue;\n" - "class ServerContext;\n" - "} // namespace grpc\n\n"; + grpc::string output; + { + // Scope the output stream so it closes and finalizes output to the string. + grpc::protobuf::io::StringOutputStream output_stream(&output); + grpc::protobuf::io::Printer printer(&output_stream, '$'); + std::map<grpc::string, grpc::string> vars; - if (!file->package().empty()) { - std::vector<grpc::string> parts = - grpc_generator::tokenize(file->package(), "."); + static const char *headers_strs[] = { + "grpc++/impl/codegen/async_stream.h", + "grpc++/impl/codegen/async_unary_call.h", + "grpc++/impl/codegen/proto_utils.h", + "grpc++/impl/codegen/rpc_method.h", + "grpc++/impl/codegen/service_type.h", + "grpc++/impl/codegen/status.h", + "grpc++/impl/codegen/stub_options.h", + "grpc++/impl/codegen/sync_stream.h" + }; + std::vector<grpc::string> headers(headers_strs, array_end(headers_strs)); + PrintIncludes(&printer, headers, params); + printer.Print(vars, "\n"); + printer.Print(vars, "namespace grpc {\n"); + printer.Print(vars, "class CompletionQueue;\n"); + printer.Print(vars, "class Channel;\n"); + printer.Print(vars, "class RpcService;\n"); + printer.Print(vars, "class ServerCompletionQueue;\n"); + printer.Print(vars, "class ServerContext;\n"); + printer.Print(vars, "} // namespace grpc\n\n"); - for (auto part = parts.begin(); part != parts.end(); part++) { - temp.append("namespace "); - temp.append(*part); - temp.append(" {\n"); + if (!file->package().empty()) { + std::vector<grpc::string> parts = + grpc_generator::tokenize(file->package(), "."); + + for (auto part = parts.begin(); part != parts.end(); part++) { + vars["part"] = *part; + printer.Print(vars, "namespace $part$ {\n"); + } + printer.Print(vars, "\n"); } - temp.append("\n"); } - - return temp; + return output; } void PrintHeaderClientMethodInterfaces( @@ -852,7 +884,7 @@ grpc::string GetSourcePrologue(const grpc::protobuf::FileDescriptor *file, } grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file, - const Parameters ¶m) { + const Parameters ¶ms) { grpc::string output; { // Scope the output stream so it closes and finalizes output to the string. @@ -860,16 +892,18 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file, grpc::protobuf::io::Printer printer(&output_stream, '$'); std::map<grpc::string, grpc::string> vars; - printer.Print(vars, "#include <grpc++/impl/codegen/async_stream.h>\n"); - printer.Print(vars, "#include <grpc++/impl/codegen/async_unary_call.h>\n"); - printer.Print(vars, "#include <grpc++/impl/codegen/channel_interface.h>\n"); - printer.Print(vars, "#include <grpc++/impl/codegen/client_unary_call.h>\n"); - printer.Print(vars, - "#include <grpc++/impl/codegen/method_handler_impl.h>\n"); - printer.Print(vars, - "#include <grpc++/impl/codegen/rpc_service_method.h>\n"); - printer.Print(vars, "#include <grpc++/impl/codegen/service_type.h>\n"); - printer.Print(vars, "#include <grpc++/impl/codegen/sync_stream.h>\n"); + static const char *headers_strs[] = { + "grpc++/impl/codegen/async_stream.h", + "grpc++/impl/codegen/async_unary_call.h", + "grpc++/impl/codegen/channel_interface.h", + "grpc++/impl/codegen/client_unary_call.h", + "grpc++/impl/codegen/method_handler_impl.h", + "grpc++/impl/codegen/rpc_service_method.h", + "grpc++/impl/codegen/service_type.h", + "grpc++/impl/codegen/sync_stream.h" + }; + std::vector<grpc::string> headers(headers_strs, array_end(headers_strs)); + PrintIncludes(&printer, headers, params); if (!file->package().empty()) { std::vector<grpc::string> parts = diff --git a/src/compiler/cpp_generator.h b/src/compiler/cpp_generator.h index 70c2e985f6..4f9de9d11a 100644 --- a/src/compiler/cpp_generator.h +++ b/src/compiler/cpp_generator.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -42,6 +42,10 @@ namespace grpc_cpp_generator { struct Parameters { // Puts the service into a namespace grpc::string services_namespace; + // Use system includes (<>) or local includes ("") + bool use_system_headers; + // Prefix to any grpc include + grpc::string grpc_search_path; }; // Return the prologue of the generated header file. diff --git a/src/compiler/cpp_plugin.cc b/src/compiler/cpp_plugin.cc index 88c704948e..d8ada4835c 100644 --- a/src/compiler/cpp_plugin.cc +++ b/src/compiler/cpp_plugin.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -59,6 +59,7 @@ class CppGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { } grpc_cpp_generator::Parameters generator_parameters; + generator_parameters.use_system_headers = true; if (!parameter.empty()) { std::vector<grpc::string> parameters_list = @@ -70,6 +71,17 @@ class CppGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { grpc_generator::tokenize(*parameter_string, "="); if (param[0] == "services_namespace") { generator_parameters.services_namespace = param[1]; + } else if (param[0] == "use_system_headers") { + if (param[1] == "true") { + generator_parameters.use_system_headers = true; + } else if (param[1] == "false") { + generator_parameters.use_system_headers = false; + } else { + *error = grpc::string("Invalid parameter: ") + *parameter_string; + return false; + } + } else if (param[0] == "grpc_search_path") { + generator_parameters.grpc_search_path = param[1]; } else { *error = grpc::string("Unknown parameter: ") + *parameter_string; return false; diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index 4fd5dfbecb..5a2ecce1d4 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -190,11 +190,10 @@ bool PrintBetaServicer(const ServiceDescriptor* service, "Documentation", doc, }); out->Print("\n"); - out->Print(dict, "class Beta$Service$Servicer(object):\n"); + out->Print(dict, "class Beta$Service$Servicer(six.with_metaclass(abc.ABCMeta, object)):\n"); { IndentScope raii_class_indent(out); out->Print(dict, "\"\"\"$Documentation$\"\"\"\n"); - out->Print("__metaclass__ = abc.ABCMeta\n"); for (int i = 0; i < service->method_count(); ++i) { auto meth = service->method(i); grpc::string arg_name = meth->client_streaming() ? @@ -219,11 +218,10 @@ bool PrintBetaStub(const ServiceDescriptor* service, "Documentation", doc, }); out->Print("\n"); - out->Print(dict, "class Beta$Service$Stub(object):\n"); + out->Print(dict, "class Beta$Service$Stub(six.with_metaclass(abc.ABCMeta, object)):\n"); { IndentScope raii_class_indent(out); out->Print(dict, "\"\"\"$Documentation$\"\"\"\n"); - out->Print("__metaclass__ = abc.ABCMeta\n"); for (int i = 0; i < service->method_count(); ++i) { const MethodDescriptor* meth = service->method(i); grpc::string arg_name = meth->client_streaming() ? @@ -449,6 +447,7 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name, bool PrintPreamble(const FileDescriptor* file, const GeneratorConfiguration& config, Printer* out) { out->Print("import abc\n"); + out->Print("import six\n"); out->Print("from $Package$ import implementations as beta_implementations\n", "Package", config.beta_package_root); out->Print("from grpc.framework.common import cardinality\n"); diff --git a/src/core/census/grpc_plugin.c b/src/core/census/grpc_plugin.c index 3be2a48eb8..8d60a5197e 100644 --- a/src/core/census/grpc_plugin.c +++ b/src/core/census/grpc_plugin.c @@ -63,8 +63,6 @@ void census_grpc_plugin_init(void) { } grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, maybe_add_census_filter, NULL); - grpc_channel_init_register_stage(GRPC_CLIENT_UCHANNEL, INT_MAX, - maybe_add_census_filter, NULL); grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, maybe_add_census_filter, NULL); } diff --git a/src/core/channel/channel_stack_builder.c b/src/core/channel/channel_stack_builder.c index 80e2e393f9..1b1004e5f9 100644 --- a/src/core/channel/channel_stack_builder.c +++ b/src/core/channel/channel_stack_builder.c @@ -216,7 +216,6 @@ void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx, // count the number of filters size_t num_filters = 0; for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) { - gpr_log(GPR_DEBUG, "%d: %s", num_filters, p->filter->name); num_filters++; } diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c deleted file mode 100644 index d32327206e..0000000000 --- a/src/core/channel/client_uchannel.c +++ /dev/null @@ -1,233 +0,0 @@ -/* - * - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/channel/client_uchannel.h" - -#include <string.h> - -#include "src/core/census/grpc_filter.h" -#include "src/core/channel/channel_args.h" -#include "src/core/channel/client_channel.h" -#include "src/core/channel/compress_filter.h" -#include "src/core/channel/subchannel_call_holder.h" -#include "src/core/iomgr/iomgr.h" -#include "src/core/support/string.h" -#include "src/core/surface/channel.h" -#include "src/core/transport/connectivity_state.h" - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/sync.h> -#include <grpc/support/useful.h> - -/** Microchannel (uchannel) implementation: a lightweight channel without any - * load-balancing mechanisms meant for communication from within the core. */ - -typedef struct client_uchannel_channel_data { - /** master channel - the grpc_channel instance that ultimately owns - this channel_data via its channel stack. - We occasionally use this to bump the refcount on the master channel - to keep ourselves alive through an asynchronous operation. */ - grpc_channel_stack *owning_stack; - - /** connectivity state being tracked */ - grpc_connectivity_state_tracker state_tracker; - - /** the subchannel wrapped by the microchannel */ - grpc_connected_subchannel *connected_subchannel; - - /** the callback used to stay subscribed to subchannel connectivity - * notifications */ - grpc_closure connectivity_cb; - - /** the current connectivity state of the wrapped subchannel */ - grpc_connectivity_state subchannel_connectivity; - - gpr_mu mu_state; -} channel_data; - -typedef grpc_subchannel_call_holder call_data; - -static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, - bool iomgr_success) { - channel_data *chand = arg; - grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, - chand->subchannel_connectivity, - "uchannel_monitor_subchannel"); - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, chand->connected_subchannel, NULL, - &chand->subchannel_connectivity, &chand->connectivity_cb); -} - -static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); -} - -static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op); -} - -static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_transport_op *op) { - channel_data *chand = elem->channel_data; - - grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); - - GPR_ASSERT(op->set_accept_stream == false); - GPR_ASSERT(op->bind_pollset == NULL); - - if (op->on_connectivity_state_change != NULL) { - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, op->connectivity_state, - op->on_connectivity_state_change); - op->on_connectivity_state_change = NULL; - op->connectivity_state = NULL; - } - - if (op->disconnect) { - grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, - GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); - } -} - -static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, - grpc_metadata_batch *initial_metadata, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready) { - channel_data *chand = arg; - GPR_ASSERT(initial_metadata != NULL); - *connected_subchannel = chand->connected_subchannel; - return 1; -} - -/* Constructor for call_data */ -static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { - grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel, - elem->channel_data, args->call_stack); -} - -/* Destructor for call_data */ -static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data); -} - -/* Constructor for channel_data */ -static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { - channel_data *chand = elem->channel_data; - memset(chand, 0, sizeof(*chand)); - grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand); - GPR_ASSERT(args->is_last); - GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); - chand->owning_stack = args->channel_stack; - grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, - "client_uchannel"); - gpr_mu_init(&chand->mu_state); -} - -/* Destructor for channel_data */ -static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - /* cancel subscription */ - grpc_connected_subchannel_notify_on_state_change( - exec_ctx, chand->connected_subchannel, NULL, NULL, - &chand->connectivity_cb); - grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - gpr_mu_destroy(&chand->mu_state); - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, chand->connected_subchannel, - "uchannel"); -} - -static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_pollset *pollset) { - call_data *calld = elem->call_data; - calld->pollset = pollset; -} - -const grpc_channel_filter grpc_client_uchannel_filter = { - cuc_start_transport_stream_op, cuc_start_transport_op, sizeof(call_data), - cuc_init_call_elem, cuc_set_pollset, cuc_destroy_call_elem, - sizeof(channel_data), cuc_init_channel_elem, cuc_destroy_channel_elem, - cuc_get_peer, "client-uchannel", -}; - -grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { - channel_data *chand = elem->channel_data; - grpc_connectivity_state out; - gpr_mu_lock(&chand->mu_state); - out = grpc_connectivity_state_check(&chand->state_tracker); - gpr_mu_unlock(&chand->mu_state); - return out; -} - -void grpc_client_uchannel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete) { - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu_state); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, state, on_complete); - gpr_mu_unlock(&chand->mu_state); -} - -grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, - grpc_channel_args *args) { - grpc_channel *channel = NULL; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - channel = - grpc_channel_create(&exec_ctx, NULL, args, GRPC_CLIENT_UCHANNEL, NULL); - - return channel; -} - -void grpc_client_uchannel_set_connected_subchannel( - grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) { - grpc_channel_element *elem = - grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel)); - channel_data *chand = elem->channel_data; - GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); - gpr_mu_lock(&chand->mu_state); - chand->connected_subchannel = connected_subchannel; - GRPC_CONNECTED_SUBCHANNEL_REF(connected_subchannel, "uchannel"); - gpr_mu_unlock(&chand->mu_state); -} diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h index 9086cdc882..84b4657db4 100644 --- a/src/core/channel/subchannel_call_holder.h +++ b/src/core/channel/subchannel_call_holder.h @@ -55,15 +55,14 @@ typedef enum { for initial metadata before trying to create a call object, and handling cancellation gracefully. - Both the channel and uchannel filter use this as their call_data. */ + The channel filter uses this as their call_data. */ typedef struct grpc_subchannel_call_holder { /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; /** Helper function to choose the subchannel on which to create the call object. Channel filter delegates to the load - balancing policy (once it's ready); uchannel returns - immediately */ + balancing policy (once it's ready). */ grpc_subchannel_call_holder_pick_subchannel pick_subchannel; void *pick_subchannel_arg; diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index e28e4757a1..2b2ee97e12 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -42,8 +42,14 @@ #include "src/core/client_config/lb_policy_registry.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/timer.h" +#include "src/core/support/backoff.h" #include "src/core/support/string.h" +#define BACKOFF_MULTIPLIER 1.6 +#define BACKOFF_JITTER 0.2 +#define BACKOFF_MIN_SECONDS 1 +#define BACKOFF_MAX_SECONDS 120 + typedef struct { /** base class: must be first */ grpc_resolver base; @@ -75,6 +81,8 @@ typedef struct { /** retry timer */ bool have_retry_timer; grpc_timer retry_timer; + /** retry backoff state */ + gpr_backoff backoff_state; } dns_resolver; static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); @@ -111,6 +119,7 @@ static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, dns_resolver *r = (dns_resolver *)resolver; gpr_mu_lock(&r->mu); if (!r->resolving) { + gpr_backoff_reset(&r->backoff_state); dns_start_resolving_locked(r); } gpr_mu_unlock(&r->mu); @@ -125,6 +134,7 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, r->next_completion = on_complete; r->target_config = target_config; if (r->resolved_version == 0 && !r->resolving) { + gpr_backoff_reset(&r->backoff_state); dns_start_resolving_locked(r); } else { dns_maybe_finish_next_locked(exec_ctx, r); @@ -185,17 +195,16 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses_destroy(addresses); gpr_free(subchannels); } else { - int retry_seconds = 15; - gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d seconds", - retry_seconds); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); + gpr_timespec timeout = gpr_time_sub(next_try, now); + gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d.%09d seconds", + timeout.tv_sec, timeout.tv_nsec); GPR_ASSERT(!r->have_retry_timer); r->have_retry_timer = true; - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); GRPC_RESOLVER_REF(&r->base, "retry-timer"); - grpc_timer_init( - exec_ctx, &r->retry_timer, - gpr_time_add(now, gpr_time_from_seconds(retry_seconds, GPR_TIMESPAN)), - dns_on_retry_timer, r, now); + grpc_timer_init(exec_ctx, &r->retry_timer, next_try, dns_on_retry_timer, r, + now); } if (r->resolved_config) { grpc_client_config_unref(exec_ctx, r->resolved_config); @@ -263,6 +272,8 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, r->name = gpr_strdup(path); r->default_port = gpr_strdup(default_port); r->subchannel_factory = args->subchannel_factory; + gpr_backoff_init(&r->backoff_state, BACKOFF_MULTIPLIER, BACKOFF_JITTER, + BACKOFF_MIN_SECONDS * 1000, BACKOFF_MAX_SECONDS * 1000); grpc_subchannel_factory_ref(r->subchannel_factory); r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 68910ad975..3cb7d79b67 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -37,9 +37,6 @@ #include <stdio.h> #include <string.h> -#ifdef GPR_POSIX_SOCKET -#include <sys/un.h> -#endif #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> @@ -47,6 +44,7 @@ #include "src/core/client_config/lb_policy_registry.h" #include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/unix_sockets_posix.h" #include "src/core/support/string.h" typedef struct { @@ -168,24 +166,6 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { gpr_free(r); } -#ifdef GPR_POSIX_SOCKET -static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, - size_t *len) { - struct sockaddr_un *un = (struct sockaddr_un *)addr; - - un->sun_family = AF_UNIX; - strcpy(un->sun_path, uri->path); - *len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; - - return 1; -} - -static char *unix_get_default_authority(grpc_resolver_factory *factory, - grpc_uri *uri) { - return gpr_strdup("localhost"); -} -#endif - static char *ip_get_default_authority(grpc_uri *uri) { const char *path = uri->path; if (path[0] == '/') ++path; @@ -371,21 +351,22 @@ static void sockaddr_factory_ref(grpc_resolver_factory *factory) {} static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} -#define DECL_FACTORY(name) \ +#define DECL_FACTORY(name, prefix) \ static grpc_resolver *name##_factory_create_resolver( \ grpc_resolver_factory *factory, grpc_resolver_args *args) { \ - return sockaddr_create(args, "pick_first", parse_##name); \ + return sockaddr_create(args, "pick_first", prefix##parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ sockaddr_factory_ref, sockaddr_factory_unref, \ - name##_factory_create_resolver, name##_get_default_authority, #name}; \ + name##_factory_create_resolver, prefix##name##_get_default_authority, \ + #name}; \ static grpc_resolver_factory name##_resolver_factory = { \ &name##_factory_vtable}; \ grpc_resolver_factory *grpc_##name##_resolver_factory_create() { \ return &name##_resolver_factory; \ } -#ifdef GPR_POSIX_SOCKET -DECL_FACTORY(unix) +#ifdef GPR_HAVE_UNIX_SOCKET +DECL_FACTORY(unix, grpc_) #endif -DECL_FACTORY(ipv4) DECL_FACTORY(ipv6) +DECL_FACTORY(ipv4, ) DECL_FACTORY(ipv6, ) diff --git a/src/core/httpcli/format_request.c b/src/core/http/format_request.c index 04f2a2d99a..60179297bf 100644 --- a/src/core/httpcli/format_request.c +++ b/src/core/http/format_request.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,7 +31,7 @@ * */ -#include "src/core/httpcli/format_request.h" +#include "src/core/http/format_request.h" #include <stdarg.h> #include <stdio.h> @@ -46,7 +46,7 @@ static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec *buf) { size_t i; - gpr_strvec_add(buf, gpr_strdup(request->path)); + gpr_strvec_add(buf, gpr_strdup(request->http.path)); gpr_strvec_add(buf, gpr_strdup(" HTTP/1.0\r\n")); /* just in case some crazy server really expects HTTP/1.1 */ gpr_strvec_add(buf, gpr_strdup("Host: ")); @@ -56,10 +56,10 @@ static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec_add(buf, gpr_strdup("User-Agent: " GRPC_HTTPCLI_USER_AGENT "\r\n")); /* user supplied headers */ - for (i = 0; i < request->hdr_count; i++) { - gpr_strvec_add(buf, gpr_strdup(request->hdrs[i].key)); + for (i = 0; i < request->http.hdr_count; i++) { + gpr_strvec_add(buf, gpr_strdup(request->http.hdrs[i].key)); gpr_strvec_add(buf, gpr_strdup(": ")); - gpr_strvec_add(buf, gpr_strdup(request->hdrs[i].value)); + gpr_strvec_add(buf, gpr_strdup(request->http.hdrs[i].value)); gpr_strvec_add(buf, gpr_strdup("\r\n")); } } @@ -94,8 +94,8 @@ gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request, fill_common_header(request, &out); if (body_bytes) { uint8_t has_content_type = 0; - for (i = 0; i < request->hdr_count; i++) { - if (strcmp(request->hdrs[i].key, "Content-Type") == 0) { + for (i = 0; i < request->http.hdr_count; i++) { + if (strcmp(request->http.hdrs[i].key, "Content-Type") == 0) { has_content_type = 1; break; } diff --git a/src/core/httpcli/format_request.h b/src/core/http/format_request.h index eb47cc90ca..49593b695c 100644 --- a/src/core/httpcli/format_request.h +++ b/src/core/http/format_request.h @@ -31,10 +31,10 @@ * */ -#ifndef GRPC_CORE_HTTPCLI_FORMAT_REQUEST_H -#define GRPC_CORE_HTTPCLI_FORMAT_REQUEST_H +#ifndef GRPC_CORE_HTTP_FORMAT_REQUEST_H +#define GRPC_CORE_HTTP_FORMAT_REQUEST_H -#include "src/core/httpcli/httpcli.h" +#include "src/core/http/httpcli.h" #include <grpc/support/slice.h> gpr_slice grpc_httpcli_format_get_request(const grpc_httpcli_request *request); @@ -42,4 +42,4 @@ gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request, const char *body_bytes, size_t body_size); -#endif /* GRPC_CORE_HTTPCLI_FORMAT_REQUEST_H */ +#endif /* GRPC_CORE_HTTP_FORMAT_REQUEST_H */ diff --git a/src/core/httpcli/httpcli.c b/src/core/http/httpcli.c index 1219c444c7..1c0d3336ea 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/http/httpcli.c @@ -31,7 +31,7 @@ * */ -#include "src/core/httpcli/httpcli.h" +#include "src/core/http/httpcli.h" #include "src/core/iomgr/sockaddr.h" #include <string.h> @@ -40,8 +40,8 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include "src/core/httpcli/format_request.h" -#include "src/core/httpcli/parser.h" +#include "src/core/http/format_request.h" +#include "src/core/http/parser.h" #include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/resolve_address.h" @@ -50,7 +50,7 @@ typedef struct { gpr_slice request_text; - grpc_httpcli_parser parser; + grpc_http_parser parser; grpc_resolved_addresses *addresses; size_t next_address; grpc_endpoint *ep; @@ -99,8 +99,9 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, int success) { grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set, req->pollset); - req->on_response(exec_ctx, req->user_data, success ? &req->parser.r : NULL); - grpc_httpcli_parser_destroy(&req->parser); + req->on_response(exec_ctx, req->user_data, + success ? &req->parser.http.response : NULL); + grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); } @@ -129,7 +130,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { for (i = 0; i < req->incoming.count; i++) { if (GPR_SLICE_LENGTH(req->incoming.slices[i])) { req->have_read_byte = 1; - if (!grpc_httpcli_parser_parse(&req->parser, req->incoming.slices[i])) { + if (!grpc_http_parser_parse(&req->parser, req->incoming.slices[i])) { finish(exec_ctx, req, 0); return; } @@ -141,7 +142,11 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { } else if (!req->have_read_byte) { next_address(exec_ctx, req); } else { - finish(exec_ctx, req, grpc_httpcli_parser_eof(&req->parser)); + int parse_success = grpc_http_parser_eof(&req->parser); + if (parse_success && (req->parser.type != GRPC_HTTP_RESPONSE)) { + parse_success = 0; + } + finish(exec_ctx, req, parse_success); } } @@ -223,7 +228,7 @@ static void internal_request_begin( internal_request *req = gpr_malloc(sizeof(internal_request)); memset(req, 0, sizeof(*req)); req->request_text = request_text; - grpc_httpcli_parser_init(&req->parser); + grpc_http_parser_init(&req->parser); req->on_response = on_response; req->user_data = user_data; req->deadline = deadline; @@ -255,7 +260,7 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, g_get_override(exec_ctx, request, deadline, on_response, user_data)) { return; } - gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->path); + gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->http.path); internal_request_begin(exec_ctx, context, pollset, request, deadline, on_response, user_data, name, grpc_httpcli_format_get_request(request)); @@ -274,7 +279,7 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context, on_response, user_data)) { return; } - gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->path); + gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->http.path); internal_request_begin( exec_ctx, context, pollset, request, deadline, on_response, user_data, name, grpc_httpcli_format_post_request(request, body_bytes, body_size)); diff --git a/src/core/httpcli/httpcli.h b/src/core/http/httpcli.h index 1fe5782657..0bf4f2f445 100644 --- a/src/core/httpcli/httpcli.h +++ b/src/core/http/httpcli.h @@ -31,27 +31,20 @@ * */ -#ifndef GRPC_CORE_HTTPCLI_HTTPCLI_H -#define GRPC_CORE_HTTPCLI_HTTPCLI_H +#ifndef GRPC_CORE_HTTP_HTTPCLI_H +#define GRPC_CORE_HTTP_HTTPCLI_H #include <stddef.h> #include <grpc/support/time.h> +#include "src/core/http/parser.h" #include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset_set.h" /* User agent this library reports */ #define GRPC_HTTPCLI_USER_AGENT "grpc-httpcli/0.0" -/* Maximum length of a header string of the form 'Key: Value\r\n' */ -#define GRPC_HTTPCLI_MAX_HEADER_LENGTH 4096 - -/* A single header to be passed in a request */ -typedef struct grpc_httpcli_header { - char *key; - char *value; -} grpc_httpcli_header; /* Tracks in-progress http requests TODO(ctiller): allow caching and capturing multiple requests for the @@ -77,33 +70,21 @@ typedef struct grpc_httpcli_request { char *host; /* The host to verify in the SSL handshake (or NULL) */ char *ssl_host_override; - /* The path of the resource to fetch */ - char *path; - /* Additional headers: count and key/values; the following are supplied - automatically and MUST NOT be set here: + /* The main part of the request + The following headers are supplied automatically and MUST NOT be set here: Host, Connection, User-Agent */ - size_t hdr_count; - grpc_httpcli_header *hdrs; + grpc_http_request http; /* handshaker to use ssl for the request */ const grpc_httpcli_handshaker *handshaker; } grpc_httpcli_request; -/* A response */ -typedef struct grpc_httpcli_response { - /* HTTP status code */ - int status; - /* Headers: count and key/values */ - size_t hdr_count; - grpc_httpcli_header *hdrs; - /* Body: length and contents; contents are NOT null-terminated */ - size_t body_length; - char *body; -} grpc_httpcli_response; +/* Expose the parser response type as a httpcli response too */ +typedef struct grpc_http_response grpc_httpcli_response; /* Callback for grpc_httpcli_get and grpc_httpcli_post. */ typedef void (*grpc_httpcli_response_cb)(grpc_exec_ctx *exec_ctx, void *user_data, - const grpc_httpcli_response *response); + const grpc_http_response *response); void grpc_httpcli_context_init(grpc_httpcli_context *context); void grpc_httpcli_context_destroy(grpc_httpcli_context *context); @@ -160,4 +141,4 @@ typedef int (*grpc_httpcli_post_override)( void grpc_httpcli_set_override(grpc_httpcli_get_override get, grpc_httpcli_post_override post); -#endif /* GRPC_CORE_HTTPCLI_HTTPCLI_H */ +#endif /* GRPC_CORE_HTTP_HTTPCLI_H */ diff --git a/src/core/httpcli/httpcli_security_connector.c b/src/core/http/httpcli_security_connector.c index 156961a377..ce82701089 100644 --- a/src/core/httpcli/httpcli_security_connector.c +++ b/src/core/http/httpcli_security_connector.c @@ -31,7 +31,7 @@ * */ -#include "src/core/httpcli/httpcli.h" +#include "src/core/http/httpcli.h" #include <string.h> diff --git a/src/core/http/parser.c b/src/core/http/parser.c new file mode 100644 index 0000000000..ebec8a5157 --- /dev/null +++ b/src/core/http/parser.c @@ -0,0 +1,313 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/http/parser.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +static char *buf2str(void *buffer, size_t length) { + char *out = gpr_malloc(length + 1); + memcpy(out, buffer, length); + out[length] = 0; + return out; +} + +static int handle_response_line(grpc_http_parser *parser) { + uint8_t *beg = parser->cur_line; + uint8_t *cur = beg; + uint8_t *end = beg + parser->cur_line_length; + + if (cur == end || *cur++ != 'H') goto error; + if (cur == end || *cur++ != 'T') goto error; + if (cur == end || *cur++ != 'T') goto error; + if (cur == end || *cur++ != 'P') goto error; + if (cur == end || *cur++ != '/') goto error; + if (cur == end || *cur++ != '1') goto error; + if (cur == end || *cur++ != '.') goto error; + if (cur == end || *cur < '0' || *cur++ > '1') goto error; + if (cur == end || *cur++ != ' ') goto error; + if (cur == end || *cur < '1' || *cur++ > '9') goto error; + if (cur == end || *cur < '0' || *cur++ > '9') goto error; + if (cur == end || *cur < '0' || *cur++ > '9') goto error; + parser->http.response.status = + (cur[-3] - '0') * 100 + (cur[-2] - '0') * 10 + (cur[-1] - '0'); + if (cur == end || *cur++ != ' ') goto error; + + /* we don't really care about the status code message */ + + return 1; + +error: + gpr_log(GPR_ERROR, "Failed parsing response line"); + return 0; +} + +static int handle_request_line(grpc_http_parser *parser) { + uint8_t *beg = parser->cur_line; + uint8_t *cur = beg; + uint8_t *end = beg + parser->cur_line_length; + uint8_t vers_major = 0; + uint8_t vers_minor = 0; + + while (cur != end && *cur++ != ' ') + ; + if (cur == end) goto error; + parser->http.request.method = buf2str(beg, (size_t)(cur - beg - 1)); + + beg = cur; + while (cur != end && *cur++ != ' ') + ; + if (cur == end) goto error; + parser->http.request.path = buf2str(beg, (size_t)(cur - beg - 1)); + + if (cur == end || *cur++ != 'H') goto error; + if (cur == end || *cur++ != 'T') goto error; + if (cur == end || *cur++ != 'T') goto error; + if (cur == end || *cur++ != 'P') goto error; + if (cur == end || *cur++ != '/') goto error; + vers_major = (uint8_t)(*cur++ - '1' + 1); + ++cur; + if (cur == end) goto error; + vers_minor = (uint8_t)(*cur++ - '1' + 1); + + if (vers_major == 1) { + if (vers_minor == 0) { + parser->http.request.version = GRPC_HTTP_HTTP10; + } else if (vers_minor == 1) { + parser->http.request.version = GRPC_HTTP_HTTP11; + } else { + goto error; + } + } else if (vers_major == 2) { + if (vers_minor == 0) { + parser->http.request.version = GRPC_HTTP_HTTP20; + } else { + goto error; + } + } else { + goto error; + } + + return 1; + +error: + gpr_log(GPR_ERROR, "Failed parsing request line"); + return 0; +} + +static int handle_first_line(grpc_http_parser *parser) { + if (parser->cur_line[0] == 'H') { + parser->type = GRPC_HTTP_RESPONSE; + return handle_response_line(parser); + } else { + parser->type = GRPC_HTTP_REQUEST; + return handle_request_line(parser); + } +} + +static int add_header(grpc_http_parser *parser) { + uint8_t *beg = parser->cur_line; + uint8_t *cur = beg; + uint8_t *end = beg + parser->cur_line_length; + size_t *hdr_count = NULL; + grpc_http_header **hdrs = NULL; + grpc_http_header hdr = {NULL, NULL}; + + GPR_ASSERT(cur != end); + + if (*cur == ' ' || *cur == '\t') { + gpr_log(GPR_ERROR, "Continued header lines not supported yet"); + goto error; + } + + while (cur != end && *cur != ':') { + cur++; + } + if (cur == end) { + gpr_log(GPR_ERROR, "Didn't find ':' in header string"); + goto error; + } + GPR_ASSERT(cur >= beg); + hdr.key = buf2str(beg, (size_t)(cur - beg)); + cur++; /* skip : */ + + while (cur != end && (*cur == ' ' || *cur == '\t')) { + cur++; + } + GPR_ASSERT(end - cur >= 2); + hdr.value = buf2str(cur, (size_t)(end - cur) - 2); + + if (parser->type == GRPC_HTTP_RESPONSE) { + hdr_count = &parser->http.response.hdr_count; + hdrs = &parser->http.response.hdrs; + } else if (parser->type == GRPC_HTTP_REQUEST) { + hdr_count = &parser->http.request.hdr_count; + hdrs = &parser->http.request.hdrs; + } else { + return 0; + } + + if (*hdr_count == parser->hdr_capacity) { + parser->hdr_capacity = + GPR_MAX(parser->hdr_capacity + 1, parser->hdr_capacity * 3 / 2); + *hdrs = gpr_realloc(*hdrs, parser->hdr_capacity * sizeof(**hdrs)); + } + (*hdrs)[(*hdr_count)++] = hdr; + return 1; + +error: + gpr_free(hdr.key); + gpr_free(hdr.value); + return 0; +} + +static int finish_line(grpc_http_parser *parser) { + switch (parser->state) { + case GRPC_HTTP_FIRST_LINE: + if (!handle_first_line(parser)) { + return 0; + } + parser->state = GRPC_HTTP_HEADERS; + break; + case GRPC_HTTP_HEADERS: + if (parser->cur_line_length == 2) { + parser->state = GRPC_HTTP_BODY; + break; + } + if (!add_header(parser)) { + return 0; + } + break; + case GRPC_HTTP_BODY: + GPR_UNREACHABLE_CODE(return 0); + } + + parser->cur_line_length = 0; + return 1; +} + +static int addbyte_body(grpc_http_parser *parser, uint8_t byte) { + size_t *body_length = NULL; + char **body = NULL; + + if (parser->type == GRPC_HTTP_RESPONSE) { + body_length = &parser->http.response.body_length; + body = &parser->http.response.body; + } else if (parser->type == GRPC_HTTP_REQUEST) { + body_length = &parser->http.request.body_length; + body = &parser->http.request.body; + } else { + return 0; + } + + if (*body_length == parser->body_capacity) { + parser->body_capacity = GPR_MAX(8, parser->body_capacity * 3 / 2); + *body = gpr_realloc((void *)*body, parser->body_capacity); + } + (*body)[*body_length] = (char)byte; + (*body_length)++; + + return 1; +} + +static int addbyte(grpc_http_parser *parser, uint8_t byte) { + switch (parser->state) { + case GRPC_HTTP_FIRST_LINE: + case GRPC_HTTP_HEADERS: + if (parser->cur_line_length >= GRPC_HTTP_PARSER_MAX_HEADER_LENGTH) { + gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded", + GRPC_HTTP_PARSER_MAX_HEADER_LENGTH); + return 0; + } + parser->cur_line[parser->cur_line_length] = byte; + parser->cur_line_length++; + if (parser->cur_line_length >= 2 && + parser->cur_line[parser->cur_line_length - 2] == '\r' && + parser->cur_line[parser->cur_line_length - 1] == '\n') { + return finish_line(parser); + } else { + return 1; + } + GPR_UNREACHABLE_CODE(return 0); + case GRPC_HTTP_BODY: + return addbyte_body(parser, byte); + } + GPR_UNREACHABLE_CODE(return 0); +} + +void grpc_http_parser_init(grpc_http_parser *parser) { + memset(parser, 0, sizeof(*parser)); + parser->state = GRPC_HTTP_FIRST_LINE; + parser->type = GRPC_HTTP_UNKNOWN; +} + +void grpc_http_parser_destroy(grpc_http_parser *parser) { + size_t i; + if (parser->type == GRPC_HTTP_RESPONSE) { + gpr_free(parser->http.response.body); + for (i = 0; i < parser->http.response.hdr_count; i++) { + gpr_free(parser->http.response.hdrs[i].key); + gpr_free(parser->http.response.hdrs[i].value); + } + gpr_free(parser->http.response.hdrs); + } else if (parser->type == GRPC_HTTP_REQUEST) { + gpr_free(parser->http.request.body); + for (i = 0; i < parser->http.request.hdr_count; i++) { + gpr_free(parser->http.request.hdrs[i].key); + gpr_free(parser->http.request.hdrs[i].value); + } + gpr_free(parser->http.request.hdrs); + gpr_free(parser->http.request.method); + gpr_free(parser->http.request.path); + } +} + +int grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice) { + size_t i; + + for (i = 0; i < GPR_SLICE_LENGTH(slice); i++) { + if (!addbyte(parser, GPR_SLICE_START_PTR(slice)[i])) { + return 0; + } + } + + return 1; +} + +int grpc_http_parser_eof(grpc_http_parser *parser) { + return parser->state == GRPC_HTTP_BODY; +} diff --git a/src/core/http/parser.h b/src/core/http/parser.h new file mode 100644 index 0000000000..39517e485a --- /dev/null +++ b/src/core/http/parser.h @@ -0,0 +1,116 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_HTTP_PARSER_H +#define GRPC_CORE_HTTP_PARSER_H + +#include <grpc/support/port_platform.h> +#include <grpc/support/slice.h> + +/* Maximum length of a header string of the form 'Key: Value\r\n' */ +#define GRPC_HTTP_PARSER_MAX_HEADER_LENGTH 4096 + +/* A single header to be passed in a request */ +typedef struct grpc_http_header { + char *key; + char *value; +} grpc_http_header; + +typedef enum { + GRPC_HTTP_FIRST_LINE, + GRPC_HTTP_HEADERS, + GRPC_HTTP_BODY +} grpc_http_parser_state; + +typedef enum { + GRPC_HTTP_HTTP10, + GRPC_HTTP_HTTP11, + GRPC_HTTP_HTTP20, +} grpc_http_version; + +typedef enum { + GRPC_HTTP_RESPONSE, + GRPC_HTTP_REQUEST, + GRPC_HTTP_UNKNOWN +} grpc_http_type; + +/* A request */ +typedef struct grpc_http_request { + /* Method of the request (e.g. GET, POST) */ + char *method; + /* The path of the resource to fetch */ + char *path; + /* HTTP version to use */ + grpc_http_version version; + /* Headers attached to the request */ + size_t hdr_count; + grpc_http_header *hdrs; + /* Body: length and contents; contents are NOT null-terminated */ + size_t body_length; + char *body; +} grpc_http_request; + +/* A response */ +typedef struct grpc_http_response { + /* HTTP status code */ + int status; + /* Headers: count and key/values */ + size_t hdr_count; + grpc_http_header *hdrs; + /* Body: length and contents; contents are NOT null-terminated */ + size_t body_length; + char *body; +} grpc_http_response; + +typedef struct { + grpc_http_parser_state state; + grpc_http_type type; + + union { + grpc_http_response response; + grpc_http_request request; + } http; + size_t body_capacity; + size_t hdr_capacity; + + uint8_t cur_line[GRPC_HTTP_PARSER_MAX_HEADER_LENGTH]; + size_t cur_line_length; +} grpc_http_parser; + +void grpc_http_parser_init(grpc_http_parser *parser); +void grpc_http_parser_destroy(grpc_http_parser *parser); + +int grpc_http_parser_parse(grpc_http_parser *parser, gpr_slice slice); +int grpc_http_parser_eof(grpc_http_parser *parser); + +#endif /* GRPC_CORE_HTTP_PARSER_H */ diff --git a/src/core/httpcli/parser.c b/src/core/httpcli/parser.c deleted file mode 100644 index eb059e7804..0000000000 --- a/src/core/httpcli/parser.c +++ /dev/null @@ -1,219 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/httpcli/parser.h" - -#include <string.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/useful.h> - -extern int grpc_http_trace; - -static int handle_response_line(grpc_httpcli_parser *parser) { - uint8_t *beg = parser->cur_line; - uint8_t *cur = beg; - uint8_t *end = beg + parser->cur_line_length; - - if (cur == end || *cur++ != 'H') goto error; - if (cur == end || *cur++ != 'T') goto error; - if (cur == end || *cur++ != 'T') goto error; - if (cur == end || *cur++ != 'P') goto error; - if (cur == end || *cur++ != '/') goto error; - if (cur == end || *cur++ != '1') goto error; - if (cur == end || *cur++ != '.') goto error; - if (cur == end || *cur < '0' || *cur++ > '1') goto error; - if (cur == end || *cur++ != ' ') goto error; - if (cur == end || *cur < '1' || *cur++ > '9') goto error; - if (cur == end || *cur < '0' || *cur++ > '9') goto error; - if (cur == end || *cur < '0' || *cur++ > '9') goto error; - parser->r.status = - (cur[-3] - '0') * 100 + (cur[-2] - '0') * 10 + (cur[-1] - '0'); - if (cur == end || *cur++ != ' ') goto error; - - /* we don't really care about the status code message */ - - return 1; - -error: - if (grpc_http_trace) { - gpr_log(GPR_ERROR, "Failed parsing response line"); - } - return 0; -} - -static char *buf2str(void *buffer, size_t length) { - char *out = gpr_malloc(length + 1); - memcpy(out, buffer, length); - out[length] = 0; - return out; -} - -static int add_header(grpc_httpcli_parser *parser) { - uint8_t *beg = parser->cur_line; - uint8_t *cur = beg; - uint8_t *end = beg + parser->cur_line_length; - grpc_httpcli_header hdr = {NULL, NULL}; - - GPR_ASSERT(cur != end); - - if (*cur == ' ' || *cur == '\t') { - if (grpc_http_trace) { - gpr_log(GPR_ERROR, "Continued header lines not supported yet"); - } - goto error; - } - - while (cur != end && *cur != ':') { - cur++; - } - if (cur == end) { - if (grpc_http_trace) { - gpr_log(GPR_ERROR, "Didn't find ':' in header string"); - } - goto error; - } - GPR_ASSERT(cur >= beg); - hdr.key = buf2str(beg, (size_t)(cur - beg)); - cur++; /* skip : */ - - while (cur != end && (*cur == ' ' || *cur == '\t')) { - cur++; - } - GPR_ASSERT(end - cur >= 2); - hdr.value = buf2str(cur, (size_t)(end - cur) - 2); - - if (parser->r.hdr_count == parser->hdr_capacity) { - parser->hdr_capacity = - GPR_MAX(parser->hdr_capacity + 1, parser->hdr_capacity * 3 / 2); - parser->r.hdrs = gpr_realloc( - parser->r.hdrs, parser->hdr_capacity * sizeof(*parser->r.hdrs)); - } - parser->r.hdrs[parser->r.hdr_count++] = hdr; - return 1; - -error: - gpr_free(hdr.key); - gpr_free(hdr.value); - return 0; -} - -static int finish_line(grpc_httpcli_parser *parser) { - switch (parser->state) { - case GRPC_HTTPCLI_INITIAL_RESPONSE: - if (!handle_response_line(parser)) { - return 0; - } - parser->state = GRPC_HTTPCLI_HEADERS; - break; - case GRPC_HTTPCLI_HEADERS: - if (parser->cur_line_length == 2) { - parser->state = GRPC_HTTPCLI_BODY; - break; - } - if (!add_header(parser)) { - return 0; - } - break; - case GRPC_HTTPCLI_BODY: - GPR_UNREACHABLE_CODE(return 0); - } - - parser->cur_line_length = 0; - return 1; -} - -static int addbyte(grpc_httpcli_parser *parser, uint8_t byte) { - switch (parser->state) { - case GRPC_HTTPCLI_INITIAL_RESPONSE: - case GRPC_HTTPCLI_HEADERS: - if (parser->cur_line_length >= GRPC_HTTPCLI_MAX_HEADER_LENGTH) { - gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded", - GRPC_HTTPCLI_MAX_HEADER_LENGTH); - return 0; - } - parser->cur_line[parser->cur_line_length] = byte; - parser->cur_line_length++; - if (parser->cur_line_length >= 2 && - parser->cur_line[parser->cur_line_length - 2] == '\r' && - parser->cur_line[parser->cur_line_length - 1] == '\n') { - return finish_line(parser); - } else { - return 1; - } - GPR_UNREACHABLE_CODE(return 0); - case GRPC_HTTPCLI_BODY: - if (parser->r.body_length == parser->body_capacity) { - parser->body_capacity = GPR_MAX(8, parser->body_capacity * 3 / 2); - parser->r.body = - gpr_realloc((void *)parser->r.body, parser->body_capacity); - } - parser->r.body[parser->r.body_length] = (char)byte; - parser->r.body_length++; - return 1; - } - GPR_UNREACHABLE_CODE(return 0); -} - -void grpc_httpcli_parser_init(grpc_httpcli_parser *parser) { - memset(parser, 0, sizeof(*parser)); - parser->state = GRPC_HTTPCLI_INITIAL_RESPONSE; - parser->r.status = 500; -} - -void grpc_httpcli_parser_destroy(grpc_httpcli_parser *parser) { - size_t i; - gpr_free(parser->r.body); - for (i = 0; i < parser->r.hdr_count; i++) { - gpr_free(parser->r.hdrs[i].key); - gpr_free(parser->r.hdrs[i].value); - } - gpr_free(parser->r.hdrs); -} - -int grpc_httpcli_parser_parse(grpc_httpcli_parser *parser, gpr_slice slice) { - size_t i; - - for (i = 0; i < GPR_SLICE_LENGTH(slice); i++) { - if (!addbyte(parser, GPR_SLICE_START_PTR(slice)[i])) { - return 0; - } - } - - return 1; -} - -int grpc_httpcli_parser_eof(grpc_httpcli_parser *parser) { - return parser->state == GRPC_HTTPCLI_BODY; -} diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index 56f6f146fd..f84b8441df 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,6 +37,7 @@ #include "src/core/iomgr/endpoint_pair.h" #include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/iomgr/unix_sockets_posix.h" #include <errno.h> #include <fcntl.h> @@ -52,7 +53,7 @@ static void create_sockets(int sv[2]) { int flags; - GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + grpc_create_socketpair_if_unix(sv); flags = fcntl(sv[0], F_GETFL, 0); GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); flags = fcntl(sv[1], F_GETFL, 0); diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 3edafa0b07..b4d038a3a1 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -72,9 +72,6 @@ static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; static void freelist_fd(grpc_fd *fd) { - // Note that this function must be called after a release store (or - // full-barrier operation) on refst so that prior actions on the fd are - // ordered before the fd becomes visible to the freelist gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -95,6 +92,7 @@ static grpc_fd *alloc_fd(int fd) { gpr_mu_init(&r->mu); } + gpr_mu_lock(&r->mu); r->shutdown = 0; r->read_closure = CLOSURE_NOT_READY; r->write_closure = CLOSURE_NOT_READY; @@ -106,11 +104,9 @@ static grpc_fd *alloc_fd(int fd) { r->on_done_closure = NULL; r->closed = 0; r->released = 0; - // The last operation on r before returning it should be a release-store - // so that all the above fields are globally visible before the value of - // r could escape to another thread. Our refcount itself needs a release-store - // so use this gpr_atm_rel_store(&r->refst, 1); + gpr_mu_unlock(&r->mu); + return r; } diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index a6c9893f23..26b3aa8189 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -39,7 +39,6 @@ #include <string.h> #include <sys/types.h> -#include <sys/un.h> #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> @@ -51,6 +50,7 @@ #include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/unix_sockets_posix.h" #include "src/core/support/block_annotate.h" #include "src/core/support/string.h" @@ -71,18 +71,10 @@ static grpc_resolved_addresses *blocking_resolve_address_impl( int s; size_t i; grpc_resolved_addresses *addrs = NULL; - struct sockaddr_un *un; if (name[0] == 'u' && name[1] == 'n' && name[2] == 'i' && name[3] == 'x' && name[4] == ':' && name[5] != 0) { - addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); - addrs->naddrs = 1; - addrs->addrs = gpr_malloc(sizeof(grpc_resolved_address)); - un = (struct sockaddr_un *)addrs->addrs->addr; - un->sun_family = AF_UNIX; - strcpy(un->sun_path, name + 5); - addrs->addrs->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; - return addrs; + return grpc_resolve_unix_domain_address(name + 5); } /* parse name, splitting it into host and port parts */ diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index 61006d7a7a..a3c3a874c1 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -36,16 +36,13 @@ #include <errno.h> #include <string.h> -#ifdef GPR_POSIX_SOCKET -#include <sys/un.h> -#endif - #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/iomgr/unix_sockets_posix.h" #include "src/core/support/string.h" static const uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, @@ -191,14 +188,9 @@ char *grpc_sockaddr_to_uri(const struct sockaddr *addr) { gpr_asprintf(&result, "ipv6:%s", temp); gpr_free(temp); return result; -#ifdef GPR_POSIX_SOCKET - case AF_UNIX: - gpr_asprintf(&result, "unix:%s", ((struct sockaddr_un *)addr)->sun_path); - return result; -#endif + default: + return grpc_sockaddr_to_uri_unix_if_possible(addr); } - - return NULL; } int grpc_sockaddr_get_port(const struct sockaddr *addr) { @@ -207,9 +199,10 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) { return ntohs(((struct sockaddr_in *)addr)->sin_port); case AF_INET6: return ntohs(((struct sockaddr_in6 *)addr)->sin6_port); - case AF_UNIX: - return 1; default: + if (grpc_is_unix_socket(addr)) { + return 1; + } gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port", addr->sa_family); return 0; diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 15727856ab..1d3f9b6555 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -54,6 +54,7 @@ #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" #include "src/core/iomgr/timer.h" +#include "src/core/iomgr/unix_sockets_posix.h" #include "src/core/support/string.h" extern int grpc_tcp_trace; @@ -77,13 +78,12 @@ static int prepare_socket(const struct sockaddr *addr, int fd) { } if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || - (addr->sa_family != AF_UNIX && !grpc_set_socket_low_latency(fd, 1)) || + (!grpc_is_unix_socket(addr) && !grpc_set_socket_low_latency(fd, 1)) || !grpc_set_socket_no_sigpipe_if_possible(fd)) { gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, strerror(errno)); goto error; } - return 1; error: diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 5e07f8261c..03dfddd925 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -52,7 +52,6 @@ #include <sys/socket.h> #include <sys/stat.h> #include <sys/types.h> -#include <sys/un.h> #include <unistd.h> #include "src/core/iomgr/pollset_posix.h" @@ -60,6 +59,7 @@ #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" +#include "src/core/iomgr/unix_sockets_posix.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -81,7 +81,6 @@ struct grpc_tcp_listener { union { uint8_t untyped[GRPC_MAX_SOCKADDR_SIZE]; struct sockaddr sockaddr; - struct sockaddr_un un; } addr; size_t addr_len; int port; @@ -98,14 +97,6 @@ struct grpc_tcp_listener { int is_sibling; }; -static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { - struct stat st; - - if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) { - unlink(un->sun_path); - } -} - /* the overall server */ struct grpc_tcp_server { gpr_refcount refs; @@ -203,9 +194,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->head) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - if (sp->addr.sockaddr.sa_family == AF_UNIX) { - unlink_if_unix_domain_socket(&sp->addr.un); - } + grpc_unlink_if_unix_domain_socket(&sp->addr.sockaddr); sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, @@ -281,7 +270,7 @@ static int prepare_socket(int fd, const struct sockaddr *addr, } if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || - (addr->sa_family != AF_UNIX && (!grpc_set_socket_low_latency(fd, 1) || + (!grpc_is_unix_socket(addr) && (!grpc_set_socket_low_latency(fd, 1) || !grpc_set_socket_reuse_addr(fd, 1))) || !grpc_set_socket_no_sigpipe_if_possible(fd)) { gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, @@ -451,9 +440,7 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, if (s->tail != NULL) { port_index = s->tail->port_index + 1; } - if (((struct sockaddr *)addr)->sa_family == AF_UNIX) { - unlink_if_unix_domain_socket(addr); - } + grpc_unlink_if_unix_domain_socket((struct sockaddr *)addr); /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index ef548cfe4d..efedd9f32e 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -52,7 +52,6 @@ #include <sys/socket.h> #include <sys/stat.h> #include <sys/types.h> -#include <sys/un.h> #include <unistd.h> #include "src/core/iomgr/fd_posix.h" @@ -60,6 +59,7 @@ #include "src/core/iomgr/resolve_address.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/iomgr/unix_sockets_posix.h" #include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -77,7 +77,6 @@ typedef struct { union { uint8_t untyped[GRPC_MAX_SOCKADDR_SIZE]; struct sockaddr sockaddr; - struct sockaddr_un un; } addr; size_t addr_len; grpc_closure read_closure; @@ -85,14 +84,6 @@ typedef struct { grpc_udp_server_read_cb read_cb; } server_port; -static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { - struct stat st; - - if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) { - unlink(un->sun_path); - } -} - /* the overall server */ struct grpc_udp_server { gpr_mu mu; @@ -176,9 +167,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { if (s->nports) { for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; - if (sp->addr.sockaddr.sa_family == AF_UNIX) { - unlink_if_unix_domain_socket(&sp->addr.un); - } + grpc_unlink_if_unix_domain_socket(&sp->addr.sockaddr); sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, @@ -336,9 +325,7 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, socklen_t sockname_len; int port; - if (((struct sockaddr *)addr)->sa_family == AF_UNIX) { - unlink_if_unix_domain_socket(addr); - } + grpc_unlink_if_unix_domain_socket((struct sockaddr *)addr); /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index 1e59a92392..148c04fa9b 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -37,15 +37,16 @@ #include "src/core/iomgr/endpoint.h" #include "src/core/iomgr/fd_posix.h" -/* Forward decl of grpc_server */ -typedef struct grpc_server grpc_server; +/* Forward decl of struct grpc_server */ +/* This is not typedef'ed to avoid a typedef-redefinition error */ +struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; /* Called when data is available to read from the socket. */ typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, - grpc_server *server); + struct grpc_server *server); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); @@ -53,7 +54,7 @@ grpc_udp_server *grpc_udp_server_create(void); /* Start listening to bound ports */ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, grpc_pollset **pollsets, size_t pollset_count, - grpc_server *server); + struct grpc_server *server); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); diff --git a/src/core/iomgr/unix_sockets_posix.c b/src/core/iomgr/unix_sockets_posix.c new file mode 100644 index 0000000000..480ff613f6 --- /dev/null +++ b/src/core/iomgr/unix_sockets_posix.c @@ -0,0 +1,103 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/unix_sockets_posix.h" + +#ifdef GPR_HAVE_UNIX_SOCKET + +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/un.h> + +#include <grpc/support/alloc.h> + +void grpc_create_socketpair_if_unix(int sv[2]) { + GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); +} + +grpc_resolved_addresses *grpc_resolve_unix_domain_address(const char *name) { + struct sockaddr_un *un; + + grpc_resolved_addresses *addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + addrs->naddrs = 1; + addrs->addrs = gpr_malloc(sizeof(grpc_resolved_address)); + un = (struct sockaddr_un *)addrs->addrs->addr; + un->sun_family = AF_UNIX; + strcpy(un->sun_path, name); + addrs->addrs->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; + return addrs; +} + +int grpc_is_unix_socket(const struct sockaddr *addr) { + return addr->sa_family == AF_UNIX; +} + +void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr) { + if (addr->sa_family != AF_UNIX) { + return; + } + struct sockaddr_un *un = (struct sockaddr_un *)addr; + struct stat st; + + if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) { + unlink(un->sun_path); + } +} + +int grpc_parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) { + struct sockaddr_un *un = (struct sockaddr_un *)addr; + + un->sun_family = AF_UNIX; + strcpy(un->sun_path, uri->path); + *len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; + + return 1; +} + +char *grpc_unix_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return gpr_strdup("localhost"); +} + +char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr) { + if (addr->sa_family != AF_UNIX) { + return NULL; + } + + char *result; + gpr_asprintf(&result, "unix:%s", ((struct sockaddr_un *)addr)->sun_path); + return result; +} + +#endif diff --git a/src/core/channel/client_uchannel.h b/src/core/iomgr/unix_sockets_posix.h index 8bb288e7d4..e842ba3770 100644 --- a/src/core/channel/client_uchannel.h +++ b/src/core/iomgr/unix_sockets_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,30 +31,31 @@ * */ -#ifndef GRPC_CORE_CHANNEL_CLIENT_UCHANNEL_H -#define GRPC_CORE_CHANNEL_CLIENT_UCHANNEL_H +#ifndef GRPC_CORE_IOMGR_UNIX_SOCKETS_POSIX_H +#define GRPC_CORE_IOMGR_UNIX_SOCKETS_POSIX_H -#include "src/core/channel/channel_stack.h" -#include "src/core/client_config/resolver.h" +#include <grpc/support/port_platform.h> -#define GRPC_MICROCHANNEL_SUBCHANNEL_ARG "grpc.microchannel_subchannel_key" +#include <grpc/support/string_util.h> -/* A client microchannel (aka uchannel) is a channel wrapping a subchannel, for - * the purposes of lightweight RPC communications from within the core.*/ +#include "src/core/client_config/resolver_factory.h" +#include "src/core/client_config/uri_parser.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/sockaddr.h" -extern const grpc_channel_filter grpc_client_uchannel_filter; +void grpc_create_socketpair_if_unix(int sv[2]); -grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); +grpc_resolved_addresses *grpc_resolve_unix_domain_address(const char *name); -void grpc_client_uchannel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete); +int grpc_is_unix_socket(const struct sockaddr *addr); -grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, - grpc_channel_args *args); +void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr); -void grpc_client_uchannel_set_connected_subchannel( - grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel); +int grpc_parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len); -#endif /* GRPC_CORE_CHANNEL_CLIENT_UCHANNEL_H */ +char *grpc_unix_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri); + +char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr); + +#endif /* GRPC_CORE_IOMGR_UNIX_SOCKETS_POSIX_H */ diff --git a/src/core/httpcli/parser.h b/src/core/iomgr/unix_sockets_posix_noop.c index cd4a737245..045467bea4 100644 --- a/src/core/httpcli/parser.h +++ b/src/core/iomgr/unix_sockets_posix_noop.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,34 +31,31 @@ * */ -#ifndef GRPC_CORE_HTTPCLI_PARSER_H -#define GRPC_CORE_HTTPCLI_PARSER_H +#include "src/core/iomgr/unix_sockets_posix.h" -#include "src/core/httpcli/httpcli.h" -#include <grpc/support/port_platform.h> -#include <grpc/support/slice.h> +#ifndef GPR_HAVE_UNIX_SOCKET -typedef enum { - GRPC_HTTPCLI_INITIAL_RESPONSE, - GRPC_HTTPCLI_HEADERS, - GRPC_HTTPCLI_BODY -} grpc_httpcli_parser_state; +void grpc_create_socketpair_if_unix(int sv[2]) {} -typedef struct { - grpc_httpcli_parser_state state; +grpc_resolved_addresses *grpc_resolve_unix_domain_address(const char *name) { + return NULL; +} - grpc_httpcli_response r; - size_t body_capacity; - size_t hdr_capacity; +int grpc_is_unix_socket(const struct sockaddr *addr) { return false; } - uint8_t cur_line[GRPC_HTTPCLI_MAX_HEADER_LENGTH]; - size_t cur_line_length; -} grpc_httpcli_parser; +void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr) {} -void grpc_httpcli_parser_init(grpc_httpcli_parser* parser); -void grpc_httpcli_parser_destroy(grpc_httpcli_parser* parser); +int grpc_parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) { + return 0; +} -int grpc_httpcli_parser_parse(grpc_httpcli_parser* parser, gpr_slice slice); -int grpc_httpcli_parser_eof(grpc_httpcli_parser* parser); +char *grpc_unix_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return NULL; +} -#endif /* GRPC_CORE_HTTPCLI_PARSER_H */ +char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr) { + return NULL; +} + +#endif diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c index 80de181d9d..dd2fd1f057 100644 --- a/src/core/iomgr/wakeup_fd_pipe.c +++ b/src/core/iomgr/wakeup_fd_pipe.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,13 +41,18 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/socket_utils_posix.h" #include <grpc/support/log.h> +#include "src/core/iomgr/socket_utils_posix.h" + static void pipe_init(grpc_wakeup_fd* fd_info) { int pipefd[2]; /* TODO(klempner): Make this nonfatal */ - GPR_ASSERT(0 == pipe(pipefd)); + int r = pipe(pipefd); + if (0 != r) { + gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno)); + abort(); + } GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1)); GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1)); fd_info->read_fd = pipefd[0]; diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index b4fa616fa7..0ba6d5dd84 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -38,7 +38,8 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/http_client_filter.h" -#include "src/core/httpcli/httpcli.h" +#include "src/core/http/parser.h" +#include "src/core/http/httpcli.h" #include "src/core/iomgr/executor.h" #include "src/core/json/json.h" #include "src/core/support/string.h" @@ -539,7 +540,7 @@ static void oauth2_token_fetcher_destruct(grpc_call_credentials *creds) { grpc_credentials_status grpc_oauth2_token_fetcher_credentials_parse_server_response( - const grpc_httpcli_response *response, grpc_credentials_md_store **token_md, + const grpc_http_response *response, grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime) { char *null_terminated_body = NULL; char *new_access_token = NULL; @@ -629,7 +630,7 @@ end: static void on_oauth2_token_fetcher_http_response( grpc_exec_ctx *exec_ctx, void *user_data, - const grpc_httpcli_response *response) { + const grpc_http_response *response) { grpc_credentials_metadata_request *r = (grpc_credentials_metadata_request *)user_data; grpc_oauth2_token_fetcher_credentials *c = @@ -706,13 +707,13 @@ static void compute_engine_fetch_oauth2( grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req, grpc_httpcli_context *httpcli_context, grpc_pollset *pollset, grpc_httpcli_response_cb response_cb, gpr_timespec deadline) { - grpc_httpcli_header header = {"Metadata-Flavor", "Google"}; + grpc_http_header header = {"Metadata-Flavor", "Google"}; grpc_httpcli_request request; memset(&request, 0, sizeof(grpc_httpcli_request)); request.host = GRPC_COMPUTE_ENGINE_METADATA_HOST; - request.path = GRPC_COMPUTE_ENGINE_METADATA_TOKEN_PATH; - request.hdr_count = 1; - request.hdrs = &header; + request.http.path = GRPC_COMPUTE_ENGINE_METADATA_TOKEN_PATH; + request.http.hdr_count = 1; + request.http.hdrs = &header; grpc_httpcli_get(exec_ctx, httpcli_context, pollset, &request, deadline, response_cb, metadata_req); } @@ -747,8 +748,8 @@ static void refresh_token_fetch_oauth2( grpc_httpcli_response_cb response_cb, gpr_timespec deadline) { grpc_google_refresh_token_credentials *c = (grpc_google_refresh_token_credentials *)metadata_req->creds; - grpc_httpcli_header header = {"Content-Type", - "application/x-www-form-urlencoded"}; + grpc_http_header header = {"Content-Type", + "application/x-www-form-urlencoded"}; grpc_httpcli_request request; char *body = NULL; gpr_asprintf(&body, GRPC_REFRESH_TOKEN_POST_BODY_FORMAT_STRING, @@ -756,9 +757,9 @@ static void refresh_token_fetch_oauth2( c->refresh_token.refresh_token); memset(&request, 0, sizeof(grpc_httpcli_request)); request.host = GRPC_GOOGLE_OAUTH2_SERVICE_HOST; - request.path = GRPC_GOOGLE_OAUTH2_SERVICE_TOKEN_PATH; - request.hdr_count = 1; - request.hdrs = &header; + request.http.path = GRPC_GOOGLE_OAUTH2_SERVICE_TOKEN_PATH; + request.http.hdr_count = 1; + request.http.hdrs = &header; request.handshaker = &grpc_httpcli_ssl; grpc_httpcli_post(exec_ctx, httpcli_context, pollset, &request, body, strlen(body), deadline, response_cb, metadata_req); diff --git a/src/core/security/credentials.h b/src/core/security/credentials.h index 133aa9d8d9..afac7a5bf2 100644 --- a/src/core/security/credentials.h +++ b/src/core/security/credentials.h @@ -39,11 +39,12 @@ #include <grpc/grpc_security.h> #include <grpc/support/sync.h> -#include "src/core/httpcli/httpcli.h" +#include "src/core/http/httpcli.h" +#include "src/core/http/parser.h" #include "src/core/security/json_token.h" #include "src/core/security/security_connector.h" -struct grpc_httpcli_response; +struct grpc_http_response; /* --- Constants. --- */ @@ -207,7 +208,7 @@ grpc_call_credentials *grpc_credentials_contains_type( /* Exposed for testing only. */ grpc_credentials_status grpc_oauth2_token_fetcher_credentials_parse_server_response( - const struct grpc_httpcli_response *response, + const struct grpc_http_response *response, grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime); void grpc_flush_cached_google_default_credentials(void); diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index 1f4f3e4aa5..3872e86993 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -39,7 +39,8 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> -#include "src/core/httpcli/httpcli.h" +#include "src/core/http/httpcli.h" +#include "src/core/http/parser.h" #include "src/core/support/env.h" #include "src/core/support/load_file.h" #include "src/core/surface/api_trace.h" @@ -66,14 +67,14 @@ typedef struct { static void on_compute_engine_detection_http_response( grpc_exec_ctx *exec_ctx, void *user_data, - const grpc_httpcli_response *response) { + const grpc_http_response *response) { compute_engine_detector *detector = (compute_engine_detector *)user_data; if (response != NULL && response->status == 200 && response->hdr_count > 0) { /* Internet providers can return a generic response to all requests, so it is necessary to check that metadata header is present also. */ size_t i; for (i = 0; i < response->hdr_count; i++) { - grpc_httpcli_header *header = &response->hdrs[i]; + grpc_http_header *header = &response->hdrs[i]; if (strcmp(header->key, "Metadata-Flavor") == 0 && strcmp(header->value, "Google") == 0) { detector->success = 1; @@ -109,7 +110,7 @@ static int is_stack_running_on_compute_engine(void) { memset(&request, 0, sizeof(grpc_httpcli_request)); request.host = GRPC_COMPUTE_ENGINE_DETECTION_HOST; - request.path = "/"; + request.http.path = "/"; grpc_httpcli_context_init(&context); diff --git a/src/core/security/jwt_verifier.c b/src/core/security/jwt_verifier.c index 928c6c148d..0bb8e05306 100644 --- a/src/core/security/jwt_verifier.c +++ b/src/core/security/jwt_verifier.c @@ -36,7 +36,7 @@ #include <limits.h> #include <string.h> -#include "src/core/httpcli/httpcli.h" +#include "src/core/http/httpcli.h" #include "src/core/security/b64.h" #include "src/core/tsi/ssl_types.h" @@ -635,11 +635,11 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data, jwks_uri += 8; req.handshaker = &grpc_httpcli_ssl; req.host = gpr_strdup(jwks_uri); - req.path = strchr(jwks_uri, '/'); - if (req.path == NULL) { - req.path = ""; + req.http.path = strchr(jwks_uri, '/'); + if (req.http.path == NULL) { + req.http.path = ""; } else { - *(req.host + (req.path - jwks_uri)) = '\0'; + *(req.host + (req.http.path - jwks_uri)) = '\0'; } grpc_httpcli_get( exec_ctx, &ctx->verifier->http_ctx, ctx->pollset, &req, @@ -725,20 +725,20 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, req.host = gpr_strdup(mapping->key_url_prefix); path_prefix = strchr(req.host, '/'); if (path_prefix == NULL) { - gpr_asprintf(&req.path, "/%s", iss); + gpr_asprintf(&req.http.path, "/%s", iss); } else { *(path_prefix++) = '\0'; - gpr_asprintf(&req.path, "/%s/%s", path_prefix, iss); + gpr_asprintf(&req.http.path, "/%s/%s", path_prefix, iss); } http_cb = on_keys_retrieved; } else { req.host = gpr_strdup(strstr(iss, "https://") == iss ? iss + 8 : iss); path_prefix = strchr(req.host, '/'); if (path_prefix == NULL) { - req.path = gpr_strdup(GRPC_OPENID_CONFIG_URL_SUFFIX); + req.http.path = gpr_strdup(GRPC_OPENID_CONFIG_URL_SUFFIX); } else { *(path_prefix++) = 0; - gpr_asprintf(&req.path, "/%s%s", path_prefix, + gpr_asprintf(&req.http.path, "/%s%s", path_prefix, GRPC_OPENID_CONFIG_URL_SUFFIX); } http_cb = on_openid_config_retrieved; @@ -749,7 +749,7 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), http_cb, ctx); gpr_free(req.host); - gpr_free(req.path); + gpr_free(req.http.path); return; error: diff --git a/src/core/support/backoff.c b/src/core/support/backoff.c index 7458219645..4ccfb774ed 100644 --- a/src/core/support/backoff.c +++ b/src/core/support/backoff.c @@ -69,3 +69,8 @@ gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) { return gpr_time_add( now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN)); } + +void gpr_backoff_reset(gpr_backoff *backoff) { + // forces step() to return a timeout of min_timeout_millis + backoff->current_timeout_millis = 0; +} diff --git a/src/core/support/backoff.h b/src/core/support/backoff.h index f7730fde2a..0f933c3149 100644 --- a/src/core/support/backoff.h +++ b/src/core/support/backoff.h @@ -61,5 +61,8 @@ void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter, gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now); /// Step a retry loop: returns a timespec for the NEXT retry gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now); +/// Reset the backoff, so the next gpr_backoff_step will be a gpr_backoff_begin +/// instead +void gpr_backoff_reset(gpr_backoff *backoff); #endif /* GRPC_CORE_SUPPORT_BACKOFF_H */ diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c index 2dd4fce26b..18267939ed 100644 --- a/src/core/surface/channel_connectivity.c +++ b/src/core/surface/channel_connectivity.c @@ -37,7 +37,6 @@ #include <grpc/support/log.h> #include "src/core/channel/client_channel.h" -#include "src/core/channel/client_uchannel.h" #include "src/core/iomgr/timer.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/completion_queue.h" @@ -58,12 +57,6 @@ grpc_connectivity_state grpc_channel_check_connectivity_state( grpc_exec_ctx_finish(&exec_ctx); return state; } - if (client_channel_elem->filter == &grpc_client_uchannel_filter) { - state = grpc_client_uchannel_check_connectivity_state( - &exec_ctx, client_channel_elem, try_to_connect); - grpc_exec_ctx_finish(&exec_ctx); - return state; - } gpr_log(GPR_ERROR, "grpc_channel_check_connectivity_state called on something that is " "not a (u)client channel, but '%s'", @@ -98,9 +91,6 @@ static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { if (client_channel_elem->filter == &grpc_client_channel_filter) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, "watch_channel_connectivity"); - } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) { - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel, - "watch_uchannel_connectivity"); } else { abort(); } @@ -209,11 +199,8 @@ void grpc_channel_watch_connectivity_state( grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state, &w->on_complete); - } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) { - GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity"); - grpc_client_uchannel_watch_connectivity_state( - &exec_ctx, client_channel_elem, grpc_cq_pollset(cq), &w->state, - &w->on_complete); + } else { + abort(); } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/surface/channel_init.c b/src/core/surface/channel_init.c index 538be84696..ac962f3972 100644 --- a/src/core/surface/channel_init.c +++ b/src/core/surface/channel_init.c @@ -112,8 +112,6 @@ static const char *name_for_type(grpc_channel_stack_type type) { return "CLIENT_SUBCHANNEL"; case GRPC_SERVER_CHANNEL: return "SERVER_CHANNEL"; - case GRPC_CLIENT_UCHANNEL: - return "CLIENT_UCHANNEL"; case GRPC_CLIENT_LAME_CHANNEL: return "CLIENT_LAME_CHANNEL"; case GRPC_CLIENT_DIRECT_CHANNEL: diff --git a/src/core/surface/channel_stack_type.c b/src/core/surface/channel_stack_type.c index 6fd33d411d..29bb7704f8 100644 --- a/src/core/surface/channel_stack_type.c +++ b/src/core/surface/channel_stack_type.c @@ -39,8 +39,6 @@ bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type) { switch (type) { case GRPC_CLIENT_CHANNEL: return true; - case GRPC_CLIENT_UCHANNEL: - return true; case GRPC_CLIENT_SUBCHANNEL: return true; case GRPC_CLIENT_LAME_CHANNEL: diff --git a/src/core/surface/channel_stack_type.h b/src/core/surface/channel_stack_type.h index 846391a68a..75a1b9c072 100644 --- a/src/core/surface/channel_stack_type.h +++ b/src/core/surface/channel_stack_type.h @@ -39,9 +39,6 @@ typedef enum { // normal top-half client channel with load-balancing, connection management GRPC_CLIENT_CHANNEL, - // abbreviated top-half client channel bound to one subchannel - for internal - // load balancing implementation - GRPC_CLIENT_UCHANNEL, // bottom-half of a client channel: everything that happens post-load // balancing (bound to a specific transport) GRPC_CLIENT_SUBCHANNEL, diff --git a/src/core/surface/init.c b/src/core/surface/init.c index b50770959f..2ce50a0d82 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -45,7 +45,6 @@ #include "src/core/channel/compress_filter.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/client_channel.h" -#include "src/core/channel/client_uchannel.h" #include "src/core/channel/http_client_filter.h" #include "src/core/channel/http_server_filter.h" #include "src/core/client_config/lb_policy_registry.h" @@ -112,9 +111,6 @@ static void register_builtin_channel_init() { grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX, prepend_filter, (void *)&grpc_compress_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_UCHANNEL, INT_MAX, - prepend_filter, - (void *)&grpc_compress_filter); grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter, (void *)&grpc_compress_filter); grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX, @@ -134,8 +130,6 @@ static void register_builtin_channel_init() { grpc_add_connected_filter, NULL); grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter, (void *)&grpc_client_channel_filter); - grpc_channel_init_register_stage(GRPC_CLIENT_UCHANNEL, INT_MAX, append_filter, - (void *)&grpc_client_uchannel_filter); grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL, INT_MAX, append_filter, (void *)&grpc_lame_filter); grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter, diff --git a/src/core/transport/chttp2/timeout_encoding.c b/src/core/transport/chttp2/timeout_encoding.c index a6f7081d21..c4802e050e 100644 --- a/src/core/transport/chttp2/timeout_encoding.c +++ b/src/core/transport/chttp2/timeout_encoding.c @@ -150,7 +150,7 @@ int grpc_chttp2_decode_timeout(const char *buffer, gpr_timespec *timeout) { /* spec allows max. 8 digits, but we allow values up to 1,000,000,000 */ if (x >= (100 * 1000 * 1000)) { if (x != (100 * 1000 * 1000) || digit != 0) { - *timeout = gpr_inf_future(GPR_CLOCK_REALTIME); + *timeout = gpr_inf_future(GPR_TIMESPAN); return 1; } } diff --git a/src/core/transport/static_metadata.c b/src/core/transport/static_metadata.c index eeedae0619..84abb59e99 100644 --- a/src/core/transport/static_metadata.c +++ b/src/core/transport/static_metadata.c @@ -50,7 +50,7 @@ grpc_mdstr grpc_static_mdstr_table[GRPC_STATIC_MDSTR_COUNT]; grpc_mdelem grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT]; uintptr_t grpc_static_mdelem_user_data[GRPC_STATIC_MDELEM_COUNT] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 3, 7, 5, 2, 4, 8, 6, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 4, 8, 6, 2, 4, 8, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; diff --git a/src/node/ext/call_credentials.cc b/src/node/ext/call_credentials.cc index 98696db232..bd2d146bbc 100644 --- a/src/node/ext/call_credentials.cc +++ b/src/node/ext/call_credentials.cc @@ -35,6 +35,8 @@ #include <nan.h> #include <uv.h> +#include <list> + #include "grpc/grpc.h" #include "grpc/grpc_security.h" #include "grpc/support/log.h" @@ -161,6 +163,15 @@ NAN_METHOD(CallCredentials::CreateFromPlugin) { grpc_metadata_credentials_plugin plugin; plugin_state *state = new plugin_state; state->callback = new Nan::Callback(info[0].As<Function>()); + state->pending_callbacks = new std::list<plugin_callback_data*>(); + uv_mutex_init(&state->plugin_mutex); + uv_async_init(uv_default_loop(), + &state->plugin_async, + SendPluginCallback); + uv_unref((uv_handle_t*)&state->plugin_async); + + state->plugin_async.data = state; + plugin.get_metadata = plugin_get_metadata; plugin.destroy = plugin_destroy_state; plugin.state = reinterpret_cast<void*>(state); @@ -208,48 +219,60 @@ NAN_METHOD(PluginCallback) { NAUV_WORK_CB(SendPluginCallback) { Nan::HandleScope scope; - plugin_callback_data *data = reinterpret_cast<plugin_callback_data*>( - async->data); - // Attach cb and user_data to plugin_callback so that it can access them later - v8::Local<v8::Function> plugin_callback = Nan::GetFunction( - Nan::New<v8::FunctionTemplate>(PluginCallback)).ToLocalChecked(); - Nan::Set(plugin_callback, Nan::New("cb").ToLocalChecked(), - Nan::New<v8::External>(reinterpret_cast<void*>(data->cb))); - Nan::Set(plugin_callback, Nan::New("user_data").ToLocalChecked(), - Nan::New<v8::External>(data->user_data)); - const int argc = 2; - v8::Local<v8::Value> argv[argc] = { - Nan::New(data->service_url).ToLocalChecked(), - plugin_callback - }; - Nan::Callback *callback = data->state->callback; - callback->Call(argc, argv); - delete data; - uv_unref((uv_handle_t *)async); - uv_close((uv_handle_t *)async, (uv_close_cb)free); + plugin_state *state = reinterpret_cast<plugin_state*>(async->data); + std::list<plugin_callback_data*> callbacks; + uv_mutex_lock(&state->plugin_mutex); + callbacks.splice(callbacks.begin(), *state->pending_callbacks); + uv_mutex_unlock(&state->plugin_mutex); + while (!callbacks.empty()) { + plugin_callback_data *data = callbacks.front(); + callbacks.pop_front(); + // Attach cb and user_data to plugin_callback so that it can access them later + v8::Local<v8::Function> plugin_callback = Nan::GetFunction( + Nan::New<v8::FunctionTemplate>(PluginCallback)).ToLocalChecked(); + Nan::Set(plugin_callback, Nan::New("cb").ToLocalChecked(), + Nan::New<v8::External>(reinterpret_cast<void*>(data->cb))); + Nan::Set(plugin_callback, Nan::New("user_data").ToLocalChecked(), + Nan::New<v8::External>(data->user_data)); + const int argc = 2; + v8::Local<v8::Value> argv[argc] = { + Nan::New(data->service_url).ToLocalChecked(), + plugin_callback + }; + Nan::Callback *callback = state->callback; + callback->Call(argc, argv); + delete data; + } } void plugin_get_metadata(void *state, grpc_auth_metadata_context context, grpc_credentials_plugin_metadata_cb cb, void *user_data) { - uv_async_t *async = static_cast<uv_async_t*>(malloc(sizeof(uv_async_t))); - uv_async_init(uv_default_loop(), - async, - SendPluginCallback); + plugin_state *p_state = reinterpret_cast<plugin_state*>(state); plugin_callback_data *data = new plugin_callback_data; - data->state = reinterpret_cast<plugin_state*>(state); data->service_url = context.service_url; data->cb = cb; data->user_data = user_data; - async->data = data; - /* libuv says that it will coalesce calls to uv_async_send. If there is ever a - * problem with a callback not getting called, that is probably the reason */ - uv_async_send(async); + + uv_mutex_lock(&p_state->plugin_mutex); + p_state->pending_callbacks->push_back(data); + uv_mutex_unlock(&p_state->plugin_mutex); + + uv_async_send(&p_state->plugin_async); +} + +void plugin_uv_close_cb(uv_handle_t *handle) { + uv_async_t *async = reinterpret_cast<uv_async_t*>(handle); + plugin_state *state = reinterpret_cast<plugin_state *>(async->data); + uv_mutex_destroy(&state->plugin_mutex); + delete state->pending_callbacks; + delete state->callback; + delete state; } void plugin_destroy_state(void *ptr) { plugin_state *state = reinterpret_cast<plugin_state *>(ptr); - delete state->callback; + uv_close((uv_handle_t*)&state->plugin_async, plugin_uv_close_cb); } } // namespace node diff --git a/src/node/ext/call_credentials.h b/src/node/ext/call_credentials.h index a9bfe30f94..1f35595f3d 100644 --- a/src/node/ext/call_credentials.h +++ b/src/node/ext/call_credentials.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,8 +34,11 @@ #ifndef GRPC_NODE_CALL_CREDENTIALS_H_ #define GRPC_NODE_CALL_CREDENTIALS_H_ +#include <list> + #include <node.h> #include <nan.h> +#include <uv.h> #include "grpc/grpc_security.h" namespace grpc { @@ -73,17 +76,20 @@ class CallCredentials : public Nan::ObjectWrap { /* Auth metadata plugin functionality */ -typedef struct plugin_state { - Nan::Callback *callback; -} plugin_state; - typedef struct plugin_callback_data { - plugin_state *state; const char *service_url; grpc_credentials_plugin_metadata_cb cb; void *user_data; } plugin_callback_data; +typedef struct plugin_state { + Nan::Callback *callback; + std::list<plugin_callback_data*> *pending_callbacks; + uv_mutex_t plugin_mutex; + // async.data == this + uv_async_t plugin_async; +} plugin_state; + void plugin_get_metadata(void *state, grpc_auth_metadata_context context, grpc_credentials_plugin_metadata_cb cb, void *user_data); diff --git a/src/node/src/credentials.js b/src/node/src/credentials.js index 1d73723cc0..97c4bd73ac 100644 --- a/src/node/src/credentials.js +++ b/src/node/src/credentials.js @@ -118,7 +118,6 @@ exports.createFromMetadataGenerator = function(metadata_generator) { exports.createFromGoogleCredential = function(google_credential) { return exports.createFromMetadataGenerator(function(auth_context, callback) { var service_url = auth_context.service_url; - console.log('Service URL:', service_url); google_credential.getRequestMetadata(service_url, function(err, header) { if (err) { console.log('Auth error:', err); @@ -127,7 +126,6 @@ exports.createFromGoogleCredential = function(google_credential) { } var metadata = new Metadata(); metadata.add('authorization', header.Authorization); - console.log(header.Authorization); callback(null, metadata); }); }); diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 2d45818b6e..51263b073c 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -308,37 +308,30 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; } - (void)invokeCall { - __weak GRPCCall *weakSelf = self; [self invokeCallWithHeadersHandler:^(NSDictionary *headers) { // Response headers received. - GRPCCall *strongSelf = weakSelf; - if (strongSelf) { - strongSelf.responseHeaders = headers; - [strongSelf startNextRead]; - } + self.responseHeaders = headers; + [self startNextRead]; } completionHandler:^(NSError *error, NSDictionary *trailers) { - GRPCCall *strongSelf = weakSelf; - if (strongSelf) { - strongSelf.responseTrailers = trailers; - - if (error) { - NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; - if (error.userInfo) { - [userInfo addEntriesFromDictionary:error.userInfo]; - } - userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; - // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be - // called before this one, so an error might end up with trailers but no headers. We - // shouldn't call finishWithError until ater both blocks are called. It is also when this is - // done that we can provide a merged view of response headers and trailers in a thread-safe - // way. - if (strongSelf.responseHeaders) { - userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; - } - error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; + self.responseTrailers = trailers; + + if (error) { + NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; + if (error.userInfo) { + [userInfo addEntriesFromDictionary:error.userInfo]; + } + userInfo[kGRPCTrailersKey] = self.responseTrailers; + // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be + // called before this one, so an error might end up with trailers but no headers. We + // shouldn't call finishWithError until ater both blocks are called. It is also when this is + // done that we can provide a merged view of response headers and trailers in a thread-safe + // way. + if (self.responseHeaders) { + userInfo[kGRPCHeadersKey] = self.responseHeaders; } - [strongSelf finishWithError:error]; + error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; } + [self finishWithError:error]; }]; // Now that the RPC has been initiated, request writes can start. @synchronized(_requestWriter) { @@ -377,7 +370,6 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; [strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeUnavailable userInfo:@{NSLocalizedDescriptionKey: @"Connectivity lost."}]]; - [[GRPCHost hostWithAddress:strongSelf->_host] disconnect]; } }]; } diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index fe3d51da53..f6527e283c 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -54,7 +54,9 @@ - (void)finish { if (_handler) { - _handler(); + void(^handler)() = _handler; + _handler = nil; + handler(); } } @end diff --git a/src/proto/grpc/testing/metrics.proto b/src/proto/grpc/testing/metrics.proto index 4485d3a53b..0cc4b60239 100644 --- a/src/proto/grpc/testing/metrics.proto +++ b/src/proto/grpc/testing/metrics.proto @@ -1,4 +1,3 @@ - // Copyright 2015, Google Inc. // All rights reserved. // @@ -28,12 +27,17 @@ // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// An integration test service that covers all the method signature permutations -// of unary/streaming requests/responses. +// Contains the definitions for a metrics service and the type of metrics +// exposed by the service. +// +// Currently, 'Gauge' (i.e a metric that represents the measured value of +// something at an instant of time) is the only metric type supported by the +// service. syntax = "proto3"; package grpc.testing; +// Reponse message containing the gauge name and value message GaugeResponse { string name = 1; oneof value { @@ -43,11 +47,18 @@ message GaugeResponse { } } -message GaugeRequest { string name = 1; } +// Request message containing the gauge name +message GaugeRequest { + string name = 1; +} message EmptyMessage {} service MetricsService { + // Returns the values of all the gauges that are currently being maintained by + // the service rpc GetAllGauges(EmptyMessage) returns (stream GaugeResponse); + + // Returns the value of one gauge rpc GetGauge(GaugeRequest) returns (GaugeResponse); } diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 851389a261..6ecdcf7222 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -254,7 +254,7 @@ cdef class ByteBuffer: if self.c_byte_buffer != NULL: with nogil: grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer) - result = b"" + result = bytearray() with nogil: while grpc_byte_buffer_reader_next(&reader, &data_slice): data_slice_pointer = gpr_slice_start_ptr(data_slice) @@ -263,7 +263,7 @@ cdef class ByteBuffer: result += (<char *>data_slice_pointer)[:data_slice_length] with nogil: grpc_byte_buffer_reader_destroy(&reader) - return result + return bytes(result) else: return None diff --git a/src/python/grpcio/grpc/framework/alpha/_face_utilities.py b/src/python/grpcio/grpc/framework/alpha/_face_utilities.py index c97307df16..b5e4133cad 100644 --- a/src/python/grpcio/grpc/framework/alpha/_face_utilities.py +++ b/src/python/grpcio/grpc/framework/alpha/_face_utilities.py @@ -111,7 +111,7 @@ def break_down_invocation(service_name, method_descriptions): face_cardinalities = {} request_serializers = {} response_deserializers = {} - for name, method_description in method_descriptions.iteritems(): + for name, method_description in six.iteritems(method_descriptions): qualified_name = _qualified_name(service_name, name) method_cardinality = method_description.cardinality() cardinalities[name] = method_description.cardinality() @@ -139,7 +139,7 @@ def break_down_service(service_name, method_descriptions): implementations = {} request_deserializers = {} response_serializers = {} - for name, method_description in method_descriptions.iteritems(): + for name, method_description in six.iteritems(method_descriptions): qualified_name = _qualified_name(service_name, name) method_cardinality = method_description.cardinality() if method_cardinality is interfaces.Cardinality.UNARY_UNARY: diff --git a/src/python/grpcio/grpc/framework/alpha/_reexport.py b/src/python/grpcio/grpc/framework/alpha/_reexport.py index 198cb95ad5..4ea0e94d80 100644 --- a/src/python/grpcio/grpc/framework/alpha/_reexport.py +++ b/src/python/grpcio/grpc/framework/alpha/_reexport.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import six + from grpc.framework.common import cardinality from grpc.framework.face import exceptions as face_exceptions from grpc.framework.face import interfaces as face_interfaces @@ -181,7 +183,7 @@ def common_cardinality(early_adopter_cardinality): def common_cardinalities(early_adopter_cardinalities): common_cardinalities = {} - for name, early_adopter_cardinality in early_adopter_cardinalities.iteritems(): + for name, early_adopter_cardinality in six.iteritems(early_adopter_cardinalities): common_cardinalities[name] = _EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY[ early_adopter_cardinality] return common_cardinalities diff --git a/src/python/grpcio/grpc/framework/crust/implementations.py b/src/python/grpcio/grpc/framework/crust/implementations.py index 4ebc4e9ae8..d0ecafcaf6 100644 --- a/src/python/grpcio/grpc/framework/crust/implementations.py +++ b/src/python/grpcio/grpc/framework/crust/implementations.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -29,6 +29,8 @@ """Entry points into the Crust layer of RPC Framework.""" +import six + from grpc.framework.common import cardinality from grpc.framework.common import style from grpc.framework.crust import _calls @@ -271,7 +273,7 @@ class _DynamicStub(face.DynamicStub): def _adapt_method_implementations(method_implementations, pool): adapted_implementations = {} - for name, method_implementation in method_implementations.iteritems(): + for name, method_implementation in six.iteritems(method_implementations): if method_implementation.style is style.Service.INLINE: if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: adapted_implementations[name] = _service.adapt_inline_unary_unary( diff --git a/src/python/grpcio/grpc/framework/face/implementations.py b/src/python/grpcio/grpc/framework/face/implementations.py index 4a6de52974..9c75a5faf4 100644 --- a/src/python/grpcio/grpc/framework/face/implementations.py +++ b/src/python/grpcio/grpc/framework/face/implementations.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -29,6 +29,8 @@ """Entry points into the Face layer of RPC Framework.""" +import six + from grpc.framework.common import cardinality from grpc.framework.common import style from grpc.framework.base import exceptions as _base_exceptions @@ -228,7 +230,7 @@ class _DynamicStub(interfaces.DynamicStub): def _adapt_method_implementations(method_implementations, pool): adapted_implementations = {} - for name, method_implementation in method_implementations.iteritems(): + for name, method_implementation in six.iteritems(method_implementations): if method_implementation.style is style.Service.INLINE: if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: adapted_implementations[name] = _service.adapt_inline_value_in_value_out( diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index b9e7d8c898..27cd8d60bc 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -81,7 +81,6 @@ CORE_SOURCE_FILES = [ 'src/core/channel/channel_stack.c', 'src/core/channel/channel_stack_builder.c', 'src/core/channel/client_channel.c', - 'src/core/channel/client_uchannel.c', 'src/core/channel/compress_filter.c', 'src/core/channel/connected_channel.c', 'src/core/channel/http_client_filter.c', @@ -109,9 +108,9 @@ CORE_SOURCE_FILES = [ 'src/core/compression/compression_algorithm.c', 'src/core/compression/message_compress.c', 'src/core/debug/trace.c', - 'src/core/httpcli/format_request.c', - 'src/core/httpcli/httpcli.c', - 'src/core/httpcli/parser.c', + 'src/core/http/format_request.c', + 'src/core/http/httpcli.c', + 'src/core/http/parser.c', 'src/core/iomgr/closure.c', 'src/core/iomgr/endpoint.c', 'src/core/iomgr/endpoint_pair_posix.c', @@ -146,6 +145,8 @@ CORE_SOURCE_FILES = [ 'src/core/iomgr/timer.c', 'src/core/iomgr/timer_heap.c', 'src/core/iomgr/udp_server.c', + 'src/core/iomgr/unix_sockets_posix.c', + 'src/core/iomgr/unix_sockets_posix_noop.c', 'src/core/iomgr/wakeup_fd_eventfd.c', 'src/core/iomgr/wakeup_fd_nospecial.c', 'src/core/iomgr/wakeup_fd_pipe.c', @@ -207,7 +208,7 @@ CORE_SOURCE_FILES = [ 'src/core/transport/static_metadata.c', 'src/core/transport/transport.c', 'src/core/transport/transport_op_string.c', - 'src/core/httpcli/httpcli_security_connector.c', + 'src/core/http/httpcli_security_connector.c', 'src/core/security/b64.c', 'src/core/security/client_auth_filter.c', 'src/core/security/credentials.c', diff --git a/src/python/grpcio/tests/__init__.py b/src/python/grpcio/tests/__init__.py index b76b3985a1..c3b80d766d 100644 --- a/src/python/grpcio/tests/__init__.py +++ b/src/python/grpcio/tests/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from __future__ import absolute_import + from tests import _loader from tests import _runner diff --git a/src/python/grpcio/tests/_loader.py b/src/python/grpcio/tests/_loader.py index 6992029b5e..2f9e5c660e 100644 --- a/src/python/grpcio/tests/_loader.py +++ b/src/python/grpcio/tests/_loader.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -27,6 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from __future__ import absolute_import + import importlib import pkgutil import re diff --git a/src/python/grpcio/tests/_result.py b/src/python/grpcio/tests/_result.py index 0670be921f..065153f0c3 100644 --- a/src/python/grpcio/tests/_result.py +++ b/src/python/grpcio/tests/_result.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -27,7 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import cStringIO as StringIO +from __future__ import absolute_import + import collections import itertools import traceback @@ -35,6 +36,7 @@ import unittest from xml.etree import ElementTree import coverage +from six import moves from tests import _loader @@ -356,7 +358,7 @@ def _traceback_string(type, value, trace): Returns: str: Formatted exception descriptive string. """ - buffer = StringIO.StringIO() + buffer = moves.cStringIO() traceback.print_exception(type, value, trace, file=buffer) return buffer.getvalue() diff --git a/src/python/grpcio/tests/_runner.py b/src/python/grpcio/tests/_runner.py index 3b5ca03dd9..e899154b0b 100644 --- a/src/python/grpcio/tests/_runner.py +++ b/src/python/grpcio/tests/_runner.py @@ -27,7 +27,8 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import cStringIO as StringIO +from __future__ import absolute_import + import collections import fcntl import multiprocessing @@ -41,6 +42,8 @@ import time import unittest import uuid +from six import moves + from tests import _loader from tests import _result @@ -143,7 +146,7 @@ class Runner(object): for case in filtered_cases] case_id_by_case = dict((augmented_case.case, augmented_case.id) for augmented_case in augmented_cases) - result_out = StringIO.StringIO() + result_out = moves.cStringIO() result = _result.TerminalResult( result_out, id_map=lambda case: case_id_by_case[case]) stdout_pipe = CaptureFile(sys.stdout.fileno()) diff --git a/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py b/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py index efc990421a..881633754c 100644 --- a/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py +++ b/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -35,6 +35,8 @@ import random import time import unittest +import six + from grpc._adapter import _intermediary_low from grpc._links import invocation from grpc._links import service @@ -68,7 +70,7 @@ def _serialization_behaviors_from_serializations(serializations): request_deserializers = {} response_serializers = {} response_deserializers = {} - for (group, method), serialization in serializations.iteritems(): + for (group, method), serialization in six.iteritems(serializations): request_serializers[group, method] = serialization.serialize_request request_deserializers[group, method] = serialization.deserialize_request response_serializers[group, method] = serialization.serialize_response diff --git a/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py b/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py index 4faaaadc2b..3be3b051fb 100644 --- a/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py +++ b/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -32,6 +32,8 @@ import collections import unittest +import six + from grpc._adapter import _intermediary_low from grpc._links import invocation from grpc._links import service @@ -59,7 +61,7 @@ def _serialization_behaviors_from_test_methods(test_methods): request_deserializers = {} response_serializers = {} response_deserializers = {} - for (group, method), test_method in test_methods.iteritems(): + for (group, method), test_method in six.iteritems(test_methods): request_serializers[group, method] = test_method.serialize_request request_deserializers[group, method] = test_method.deserialize_request response_serializers[group, method] = test_method.serialize_response @@ -108,7 +110,7 @@ class _Implementation(test_interfaces.Implementation): # _digest.TestServiceDigest. cardinalities = { method: method_object.cardinality() - for (group, method), method_object in methods.iteritems()} + for (group, method), method_object in six.iteritems(methods)} dynamic_stub = crust_implementations.dynamic_stub( invocation_end_link, group, cardinalities, pool) diff --git a/src/python/grpcio/tests/unit/beta/_face_interface_test.py b/src/python/grpcio/tests/unit/beta/_face_interface_test.py index 1c21dfd03d..cb302bbf68 100644 --- a/src/python/grpcio/tests/unit/beta/_face_interface_test.py +++ b/src/python/grpcio/tests/unit/beta/_face_interface_test.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -32,6 +32,8 @@ import collections import unittest +import six + from grpc.beta import implementations from grpc.beta import interfaces from tests.unit import resources @@ -57,7 +59,7 @@ def _serialization_behaviors_from_test_methods(test_methods): request_deserializers = {} response_serializers = {} response_deserializers = {} - for (group, method), test_method in test_methods.iteritems(): + for (group, method), test_method in six.iteritems(test_methods): request_serializers[group, method] = test_method.serialize_request request_deserializers[group, method] = test_method.deserialize_request response_serializers[group, method] = test_method.serialize_response @@ -79,7 +81,7 @@ class _Implementation(test_interfaces.Implementation): # _digest.TestServiceDigest. cardinalities = { method: method_object.cardinality() - for (group, method), method_object in methods.iteritems()} + for (group, method), method_object in six.iteritems(methods)} server_options = implementations.server_options( request_deserializers=serialization_behaviors.request_deserializers, diff --git a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py b/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py index 360ecc95d5..fd2d4298f9 100644 --- a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py +++ b/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -32,6 +32,8 @@ import collections import unittest +import six + from grpc.framework.core import implementations as core_implementations from grpc.framework.crust import implementations as crust_implementations from grpc.framework.foundation import logging_pool @@ -66,7 +68,7 @@ class _Implementation(test_interfaces.Implementation): # _digest.TestServiceDigest. cardinalities = { method: method_object.cardinality() - for (group, method), method_object in methods.iteritems()} + for (group, method), method_object in six.iteritems(methods)} dynamic_stub = crust_implementations.dynamic_stub( invocation_end_link, group, cardinalities, pool) diff --git a/src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py index 2fc67f6292..b0b00bfa81 100644 --- a/src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py +++ b/src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py @@ -74,7 +74,7 @@ class BlockingInvocationInlineServiceTestCase( def testSuccessfulUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -85,7 +85,7 @@ class BlockingInvocationInlineServiceTestCase( def testSuccessfulUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -97,7 +97,7 @@ class BlockingInvocationInlineServiceTestCase( def testSuccessfulStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -108,7 +108,7 @@ class BlockingInvocationInlineServiceTestCase( def testSuccessfulStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -120,7 +120,7 @@ class BlockingInvocationInlineServiceTestCase( def testSequentialInvocations(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() second_request = test_messages.request() @@ -137,7 +137,7 @@ class BlockingInvocationInlineServiceTestCase( def testExpiredUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -148,7 +148,7 @@ class BlockingInvocationInlineServiceTestCase( def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -160,7 +160,7 @@ class BlockingInvocationInlineServiceTestCase( def testExpiredStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -171,7 +171,7 @@ class BlockingInvocationInlineServiceTestCase( def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -183,7 +183,7 @@ class BlockingInvocationInlineServiceTestCase( def testFailedUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -193,7 +193,7 @@ class BlockingInvocationInlineServiceTestCase( def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -204,7 +204,7 @@ class BlockingInvocationInlineServiceTestCase( def testFailedStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -214,7 +214,7 @@ class BlockingInvocationInlineServiceTestCase( def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() diff --git a/src/python/grpcio/tests/unit/framework/face/testing/digest.py b/src/python/grpcio/tests/unit/framework/face/testing/digest.py index 39f28b9657..100067cc83 100644 --- a/src/python/grpcio/tests/unit/framework/face/testing/digest.py +++ b/src/python/grpcio/tests/unit/framework/face/testing/digest.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -32,6 +32,8 @@ import collections import threading +import six + # testing_control, interfaces, and testing_service are referenced from # specification in this module. from grpc.framework.common import cardinality @@ -368,7 +370,7 @@ def _assemble( events = {} adaptations = {} messages = {} - for name, scenario in scenarios.iteritems(): + for name, scenario in six.iteritems(scenarios): if name in names: raise ValueError('Repeated name "%s"!' % name) diff --git a/src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py index b707dcdf6c..db901cfe4e 100644 --- a/src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py +++ b/src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py @@ -74,7 +74,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testSuccessfulUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = testing_callback.Callback() @@ -89,7 +89,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testSuccessfulUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = testing_callback.Callback() @@ -104,7 +104,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testSuccessfulStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = testing_callback.Callback() @@ -122,7 +122,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testSuccessfulStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = testing_callback.Callback() @@ -140,7 +140,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testSequentialInvocations(self): # pylint: disable=cell-var-from-loop for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() second_request = test_messages.request() @@ -165,7 +165,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testExpiredUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = testing_callback.Callback() @@ -180,7 +180,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = testing_callback.Callback() @@ -195,7 +195,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testExpiredStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for unused_test_messages in test_messages_sequence: callback = testing_callback.Callback() @@ -208,7 +208,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = testing_callback.Callback() @@ -223,7 +223,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testFailedUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = testing_callback.Callback() @@ -239,7 +239,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = testing_callback.Callback() @@ -255,7 +255,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testFailedStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = testing_callback.Callback() @@ -274,7 +274,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = testing_callback.Callback() @@ -291,7 +291,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testParallelInvocations(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() first_callback = testing_callback.Callback() @@ -318,7 +318,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testCancelledUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = testing_callback.Callback() @@ -334,7 +334,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testCancelledUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = testing_callback.Callback() @@ -349,7 +349,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testCancelledStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = testing_callback.Callback() @@ -366,7 +366,7 @@ class EventInvocationSynchronousEventServiceTestCase( def testCancelledStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for unused_test_messages in test_messages_sequence: callback = testing_callback.Callback() diff --git a/src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py index 8adfc613c6..d8706aa39e 100644 --- a/src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py +++ b/src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -110,7 +110,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testSuccessfulUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -122,7 +122,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testSuccessfulUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -134,7 +134,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testSuccessfulStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() request_iterator = _PauseableIterator(iter(requests)) @@ -150,7 +150,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testSuccessfulStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() request_iterator = _PauseableIterator(iter(requests)) @@ -166,7 +166,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testSequentialInvocations(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() second_request = test_messages.request() @@ -185,7 +185,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testExpiredUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -200,7 +200,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -212,7 +212,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testExpiredStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -227,7 +227,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -239,7 +239,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testFailedUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -258,7 +258,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -273,7 +273,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testFailedStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -292,7 +292,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -307,7 +307,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testParallelInvocations(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() second_request = test_messages.request() @@ -329,7 +329,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testCancelledUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -343,7 +343,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testCancelledUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -357,7 +357,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testCancelledStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( - self.digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -371,7 +371,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( def testCancelledStreamRequestStreamResponse(self): for name, test_messages_sequence in ( - self.digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self.digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py index a47fcd3681..530ba4ff0f 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -82,7 +82,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testSuccessfulUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -93,7 +93,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testSuccessfulUnaryRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -105,7 +105,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testSuccessfulStreamRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -116,7 +116,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testSuccessfulStreamRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -128,7 +128,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testSequentialInvocations(self): for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() second_request = test_messages.request() @@ -146,7 +146,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testParallelInvocations(self): pool = logging_pool.pool(test_constants.PARALLELISM) for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = [] response_futures = [] @@ -168,7 +168,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testWaitingForSomeButNotAllParallelInvocations(self): pool = logging_pool.pool(test_constants.PARALLELISM) for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = [] response_futures_to_indices = {} @@ -206,7 +206,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testExpiredUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -217,7 +217,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testExpiredUnaryRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -229,7 +229,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testExpiredStreamRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -240,7 +240,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testExpiredStreamRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -252,7 +252,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testFailedUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -262,7 +262,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testFailedUnaryRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -273,7 +273,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testFailedStreamRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -283,7 +283,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testFailedStreamRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py index 9304b6b1db..40c03f9e71 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -32,6 +32,8 @@ import collections import threading +import six + # test_control, _service, and test_interfaces are referenced from specification # in this module. from grpc.framework.common import cardinality @@ -363,7 +365,7 @@ def _assemble( events = {} adaptations = {} messages = {} - for identifier, scenario in scenarios.iteritems(): + for identifier, scenario in six.iteritems(scenarios): if identifier in identifiers: raise ValueError('Repeated identifier "(%s, %s)"!' % identifier) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index c1081a7d99..89f344c289 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -145,7 +145,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testSuccessfulUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = _Callback() @@ -160,7 +160,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testSuccessfulUnaryRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -172,7 +172,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testSuccessfulStreamRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() request_iterator = _PauseableIterator(iter(requests)) @@ -192,7 +192,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testSuccessfulStreamRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() request_iterator = _PauseableIterator(iter(requests)) @@ -208,7 +208,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testSequentialInvocations(self): for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() second_request = test_messages.request() @@ -227,7 +227,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testParallelInvocations(self): for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() second_request = test_messages.request() @@ -243,7 +243,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. test_messages.verify(second_request, second_response, self) for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = [] response_futures = [] @@ -263,7 +263,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testWaitingForSomeButNotAllParallelInvocations(self): pool = logging_pool.pool(test_constants.PARALLELISM) for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = [] response_futures_to_indices = {} @@ -285,7 +285,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testCancelledUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = _Callback() @@ -302,7 +302,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testCancelledUnaryRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -316,7 +316,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testCancelledStreamRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = _Callback() @@ -333,7 +333,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testCancelledStreamRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -347,7 +347,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testExpiredUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = _Callback() @@ -364,7 +364,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testExpiredUnaryRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -376,7 +376,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testExpiredStreamRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = _Callback() @@ -393,7 +393,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testExpiredStreamRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -405,7 +405,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testFailedUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = _Callback() @@ -427,7 +427,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testFailedUnaryRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -442,7 +442,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testFailedStreamRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = _Callback() @@ -464,7 +464,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. def testFailedStreamRequestStreamResponse(self): for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): + six.iteritems(self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() diff --git a/src/python/grpcio/tests/unit/test_common.py b/src/python/grpcio/tests/unit/test_common.py index 29431bfb9d..824f1cbd16 100644 --- a/src/python/grpcio/tests/unit/test_common.py +++ b/src/python/grpcio/tests/unit/test_common.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -31,6 +31,8 @@ import collections +import six + INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),) SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),) SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),) @@ -65,7 +67,7 @@ def metadata_transmitted(original_metadata, transmitted_metadata): key, value = tuple(key_value_pair) transmitted[key].append(value) - for key, values in original.iteritems(): + for key, values in six.iteritems(original): transmitted_values = transmitted[key] transmitted_iterator = iter(transmitted_values) try: diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index af05ddf6e7..cd0aa6aaf2 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -72,6 +72,10 @@ static ID id_cq; * the flags used to create metadata from a Hash */ static ID id_flags; +/* id_credentials is the name of the hidden ivar that preserves the value + * of the credentials added to the call */ +static ID id_credentials; + /* id_input_md is the name of the hidden ivar that preserves the hash used to * create metadata, so that references to the strings it contains last as long * as the call the metadata is added to. */ @@ -299,6 +303,7 @@ static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) { "grpc_call_set_credentials failed with %s (code=%d)", grpc_call_error_detail_of(err), err); } + rb_ivar_set(self, id_credentials, credentials); return Qnil; } @@ -859,6 +864,7 @@ void Init_grpc_call() { id_cq = rb_intern("__cq"); id_flags = rb_intern("__flags"); id_input_md = rb_intern("__input_md"); + id_credentials = rb_intern("__credentials"); /* Ids used in constructing the batch result. */ sym_send_message = ID2SYM(rb_intern("send_message")); diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index 2426f106a9..2b978fcdd0 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -41,6 +41,7 @@ #include <grpc/grpc.h> #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> +#include <grpc/support/log.h> #include "rb_call.h" #include "rb_event_thread.h" @@ -50,9 +51,9 @@ * grpc_call_credentials */ static VALUE grpc_rb_cCallCredentials = Qnil; -/* grpc_rb_call_credentials wraps a grpc_call_credentials. It provides a peer - * ruby object, 'mark' to minimize copying when a credential is created from - * ruby. */ +/* grpc_rb_call_credentials wraps a grpc_call_credentials. It provides a mark + * object that is used to hold references to any objects used to create the + * credentials. */ typedef struct grpc_rb_call_credentials { /* Holder of ruby objects involved in contructing the credentials */ VALUE mark; @@ -81,14 +82,23 @@ static VALUE grpc_rb_call_credentials_callback(VALUE callback_args) { static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args, VALUE exception_object) { VALUE result = rb_hash_new(); + VALUE backtrace = rb_funcall( + rb_funcall(exception_object, rb_intern("backtrace"), 0), + rb_intern("join"), + 1, rb_str_new2("\n\tfrom ")); + VALUE exception_info = rb_funcall(exception_object, rb_intern("to_s"), 0); + const char *exception_classname = rb_obj_classname(exception_object); (void)args; + gpr_log(GPR_INFO, "Call credentials callback failed: %s: %s\n%s", + exception_classname, StringValueCStr(exception_info), + StringValueCStr(backtrace)); rb_hash_aset(result, rb_str_new2("metadata"), Qnil); /* Currently only gives the exception class name. It should be possible get more details */ rb_hash_aset(result, rb_str_new2("status"), INT2NUM(GRPC_STATUS_PERMISSION_DENIED)); rb_hash_aset(result, rb_str_new2("details"), - rb_str_new2(rb_obj_classname(exception_object))); + rb_str_new2(exception_classname)); return result; } @@ -146,13 +156,8 @@ static void grpc_rb_call_credentials_free(void *p) { return; } wrapper = (grpc_rb_call_credentials *)p; - - /* Delete the wrapped object if the mark object is Qnil, which indicates that - * no other object is the actual owner. */ - if (wrapper->wrapped != NULL && wrapper->mark == Qnil) { - grpc_call_credentials_release(wrapper->wrapped); - wrapper->wrapped = NULL; - } + grpc_call_credentials_release(wrapper->wrapped); + wrapper->wrapped = NULL; xfree(p); } @@ -164,8 +169,6 @@ static void grpc_rb_call_credentials_mark(void *p) { return; } wrapper = (grpc_rb_call_credentials *)p; - - /* If it's not already cleaned up, mark the mark object */ if (wrapper->mark != Qnil) { rb_gc_mark(wrapper->mark); } @@ -194,7 +197,7 @@ static VALUE grpc_rb_call_credentials_alloc(VALUE cls) { /* Creates a wrapping object for a given call credentials. This should only be * called with grpc_call_credentials objects that are not already associated * with any Ruby object */ -VALUE grpc_rb_wrap_call_credentials(grpc_call_credentials *c) { +VALUE grpc_rb_wrap_call_credentials(grpc_call_credentials *c, VALUE mark) { VALUE rb_wrapper; grpc_rb_call_credentials *wrapper; if (c == NULL) { @@ -204,6 +207,7 @@ VALUE grpc_rb_wrap_call_credentials(grpc_call_credentials *c) { TypedData_Get_Struct(rb_wrapper, grpc_rb_call_credentials, &grpc_rb_call_credentials_data_type, wrapper); wrapper->wrapped = c; + wrapper->mark = mark; return rb_wrapper; } @@ -267,6 +271,7 @@ static VALUE grpc_rb_call_credentials_init(VALUE self, VALUE proc) { return Qnil; } + wrapper->mark = proc; wrapper->wrapped = creds; rb_ivar_set(self, id_callback, proc); @@ -277,15 +282,18 @@ static VALUE grpc_rb_call_credentials_compose(int argc, VALUE *argv, VALUE self) { grpc_call_credentials *creds; grpc_call_credentials *other; + VALUE mark; if (argc == 0) { return self; } + mark = rb_ary_new(); creds = grpc_rb_get_wrapped_call_credentials(self); for (int i = 0; i < argc; i++) { + rb_ary_push(mark, argv[i]); other = grpc_rb_get_wrapped_call_credentials(argv[i]); creds = grpc_composite_call_credentials_create(creds, other, NULL); } - return grpc_rb_wrap_call_credentials(creds); + return grpc_rb_wrap_call_credentials(creds, mark); } void Init_grpc_call_credentials() { diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 0e6badbdaf..e1aaa539db 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -70,11 +70,10 @@ static VALUE grpc_rb_cChannel = Qnil; /* Used during the conversion of a hash to channel args during channel setup */ static VALUE grpc_rb_cChannelArgs; -/* grpc_rb_channel wraps a grpc_channel. It provides a peer ruby object, - * 'mark' to minimize copying when a channel is created from ruby. */ +/* grpc_rb_channel wraps a grpc_channel. */ typedef struct grpc_rb_channel { - /* Holder of ruby objects involved in constructing the channel */ - VALUE mark; + VALUE credentials; + /* The actual channel */ grpc_channel *wrapped; } grpc_rb_channel; @@ -87,13 +86,8 @@ static void grpc_rb_channel_free(void *p) { }; ch = (grpc_rb_channel *)p; - /* Deletes the wrapped object if the mark object is Qnil, which indicates - * that no other object is the actual owner. */ - if (ch->wrapped != NULL && ch->mark == Qnil) { + if (ch->wrapped != NULL) { grpc_channel_destroy(ch->wrapped); - rb_warning("channel gc: destroyed the c channel"); - } else { - rb_warning("channel gc: did not destroy the c channel"); } xfree(p); @@ -106,8 +100,8 @@ static void grpc_rb_channel_mark(void *p) { return; } channel = (grpc_rb_channel *)p; - if (channel->mark != Qnil) { - rb_gc_mark(channel->mark); + if (channel->credentials != Qnil) { + rb_gc_mark(channel->credentials); } } @@ -125,7 +119,7 @@ static rb_data_type_t grpc_channel_data_type = { static VALUE grpc_rb_channel_alloc(VALUE cls) { grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel); wrapper->wrapped = NULL; - wrapper->mark = Qnil; + wrapper->credentials = Qnil; return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper); } @@ -162,6 +156,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { } ch = grpc_insecure_channel_create(target_chars, &args, NULL); } else { + wrapper->credentials = credentials; creds = grpc_rb_get_wrapped_channel_credentials(credentials); ch = grpc_secure_channel_create(creds, target_chars, &args, NULL); } @@ -330,7 +325,6 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { if (ch != NULL) { grpc_channel_destroy(ch); wrapper->wrapped = NULL; - wrapper->mark = Qnil; } return Qnil; diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c index 8c6fc3b7eb..f649084311 100644 --- a/src/ruby/ext/grpc/rb_channel_credentials.c +++ b/src/ruby/ext/grpc/rb_channel_credentials.c @@ -49,8 +49,8 @@ static VALUE grpc_rb_cChannelCredentials = Qnil; /* grpc_rb_channel_credentials wraps a grpc_channel_credentials. It provides a - * peer ruby object, 'mark' to minimize copying when a credential is - * created from ruby. */ + * mark object that is used to hold references to any objects used to create + * the credentials. */ typedef struct grpc_rb_channel_credentials { /* Holder of ruby objects involved in constructing the credentials */ VALUE mark; @@ -66,13 +66,8 @@ static void grpc_rb_channel_credentials_free(void *p) { return; }; wrapper = (grpc_rb_channel_credentials *)p; - - /* Delete the wrapped object if the mark object is Qnil, which indicates that - * no other object is the actual owner. */ - if (wrapper->wrapped != NULL && wrapper->mark == Qnil) { - grpc_channel_credentials_release(wrapper->wrapped); - wrapper->wrapped = NULL; - } + grpc_channel_credentials_release(wrapper->wrapped); + wrapper->wrapped = NULL; xfree(p); } @@ -85,7 +80,6 @@ static void grpc_rb_channel_credentials_mark(void *p) { } wrapper = (grpc_rb_channel_credentials *)p; - /* If it's not already cleaned up, mark the mark object */ if (wrapper->mark != Qnil) { rb_gc_mark(wrapper->mark); } @@ -114,7 +108,7 @@ static VALUE grpc_rb_channel_credentials_alloc(VALUE cls) { /* Creates a wrapping object for a given channel credentials. This should only * be called with grpc_channel_credentials objects that are not already * associated with any Ruby object. */ -VALUE grpc_rb_wrap_channel_credentials(grpc_channel_credentials *c) { +VALUE grpc_rb_wrap_channel_credentials(grpc_channel_credentials *c, VALUE mark) { VALUE rb_wrapper; grpc_rb_channel_credentials *wrapper; if (c == NULL) { @@ -124,6 +118,7 @@ VALUE grpc_rb_wrap_channel_credentials(grpc_channel_credentials *c) { TypedData_Get_Struct(rb_wrapper, grpc_rb_channel_credentials, &grpc_rb_channel_credentials_data_type, wrapper); wrapper->wrapped = c; + wrapper->mark = mark; return rb_wrapper; } @@ -222,11 +217,15 @@ static VALUE grpc_rb_channel_credentials_compose(int argc, VALUE *argv, VALUE self) { grpc_channel_credentials *creds; grpc_call_credentials *other; + VALUE mark; if (argc == 0) { return self; } + mark = rb_ary_new(); + rb_ary_push(mark, self); creds = grpc_rb_get_wrapped_channel_credentials(self); for (int i = 0; i < argc; i++) { + rb_ary_push(mark, argv[i]); other = grpc_rb_get_wrapped_call_credentials(argv[i]); creds = grpc_composite_channel_credentials_create(creds, other, NULL); if (creds == NULL) { @@ -234,7 +233,7 @@ static VALUE grpc_rb_channel_credentials_compose(int argc, VALUE *argv, "Failed to compose channel and call credentials"); } } - return grpc_rb_wrap_channel_credentials(creds); + return grpc_rb_wrap_channel_credentials(creds, mark); } void Init_grpc_channel_credentials() { |