diff options
Diffstat (limited to 'src')
98 files changed, 2979 insertions, 1330 deletions
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index cdd3d8a98a..a93b08c5ce 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -33,9 +33,11 @@ #include <cassert> #include <cctype> +#include <cstring> #include <map> #include <ostream> #include <sstream> +#include <vector> #include "src/compiler/python_generator.h" #include <google/protobuf/io/printer.h> @@ -43,14 +45,19 @@ #include <google/protobuf/descriptor.pb.h> #include <google/protobuf/descriptor.h> +using google::protobuf::Descriptor; using google::protobuf::FileDescriptor; using google::protobuf::ServiceDescriptor; using google::protobuf::MethodDescriptor; using google::protobuf::io::Printer; using google::protobuf::io::StringOutputStream; using std::initializer_list; +using std::make_pair; using std::map; +using std::pair; using std::string; +using std::strlen; +using std::vector; namespace grpc_python_generator { namespace { @@ -99,62 +106,81 @@ class IndentScope { // END FORMATTING BOILERPLATE // //////////////////////////////// -void PrintService(const ServiceDescriptor* service, - Printer* out) { +bool PrintServicer(const ServiceDescriptor* service, + Printer* out) { string doc = "<fill me in later!>"; map<string, string> dict = ListToDict({ "Service", service->name(), "Documentation", doc, }); - out->Print(dict, "class $Service$Service(object):\n"); + out->Print(dict, "class EarlyAdopter$Service$Servicer(object):\n"); { IndentScope raii_class_indent(out); out->Print(dict, "\"\"\"$Documentation$\"\"\"\n"); - out->Print("def __init__(self):\n"); - { - IndentScope raii_method_indent(out); - out->Print("pass\n"); + out->Print("__metaclass__ = abc.ABCMeta\n"); + for (int i = 0; i < service->method_count(); ++i) { + auto meth = service->method(i); + string arg_name = meth->client_streaming() ? + "request_iterator" : "request"; + out->Print("@abc.abstractmethod\n"); + out->Print("def $Method$(self, $ArgName$):\n", + "Method", meth->name(), "ArgName", arg_name); + { + IndentScope raii_method_indent(out); + out->Print("raise NotImplementedError()\n"); + } } } + return true; } -void PrintServicer(const ServiceDescriptor* service, - Printer* out) { +bool PrintServer(const ServiceDescriptor* service, Printer* out) { string doc = "<fill me in later!>"; map<string, string> dict = ListToDict({ "Service", service->name(), "Documentation", doc, }); - out->Print(dict, "class $Service$Servicer(object):\n"); + out->Print(dict, "class EarlyAdopter$Service$Server(object):\n"); { IndentScope raii_class_indent(out); out->Print(dict, "\"\"\"$Documentation$\"\"\"\n"); - for (int i = 0; i < service->method_count(); ++i) { - auto meth = service->method(i); - out->Print("def $Method$(self, arg):\n", "Method", meth->name()); - { - IndentScope raii_method_indent(out); - out->Print("raise NotImplementedError()\n"); - } + out->Print("__metaclass__ = abc.ABCMeta\n"); + out->Print("@abc.abstractmethod\n"); + out->Print("def start(self):\n"); + { + IndentScope raii_method_indent(out); + out->Print("raise NotImplementedError()\n"); + } + + out->Print("@abc.abstractmethod\n"); + out->Print("def stop(self):\n"); + { + IndentScope raii_method_indent(out); + out->Print("raise NotImplementedError()\n"); } } + return true; } -void PrintStub(const ServiceDescriptor* service, +bool PrintStub(const ServiceDescriptor* service, Printer* out) { string doc = "<fill me in later!>"; map<string, string> dict = ListToDict({ "Service", service->name(), "Documentation", doc, }); - out->Print(dict, "class $Service$Stub(object):\n"); + out->Print(dict, "class EarlyAdopter$Service$Stub(object):\n"); { IndentScope raii_class_indent(out); out->Print(dict, "\"\"\"$Documentation$\"\"\"\n"); + out->Print("__metaclass__ = abc.ABCMeta\n"); for (int i = 0; i < service->method_count(); ++i) { const MethodDescriptor* meth = service->method(i); - auto methdict = ListToDict({"Method", meth->name()}); - out->Print(methdict, "def $Method$(self, arg):\n"); + string arg_name = meth->client_streaming() ? + "request_iterator" : "request"; + auto methdict = ListToDict({"Method", meth->name(), "ArgName", arg_name}); + out->Print("@abc.abstractmethod\n"); + out->Print(methdict, "def $Method$(self, $ArgName$):\n"); { IndentScope raii_method_indent(out); out->Print("raise NotImplementedError()\n"); @@ -162,169 +188,190 @@ void PrintStub(const ServiceDescriptor* service, out->Print(methdict, "$Method$.async = None\n"); } } + return true; } -void PrintStubImpl(const ServiceDescriptor* service, - Printer* out) { - map<string, string> dict = ListToDict({ - "Service", service->name(), - }); - out->Print(dict, "class _$Service$Stub($Service$Stub):\n"); - { - IndentScope raii_class_indent(out); - out->Print("def __init__(self, face_stub, default_timeout):\n"); - { - IndentScope raii_method_indent(out); - out->Print("self._face_stub = face_stub\n" - "self._default_timeout = default_timeout\n" - "stub_self = self\n"); +bool GetModuleAndMessagePath(const Descriptor* type, + pair<string, string>* out) { + const Descriptor* path_elem_type = type; + vector<const Descriptor*> message_path; + do { + message_path.push_back(path_elem_type); + path_elem_type = path_elem_type->containing_type(); + } while (path_elem_type != nullptr); + string file_name = type->file()->name(); + string module_name; + static const int proto_suffix_length = strlen(".proto"); + if (!(file_name.size() > static_cast<size_t>(proto_suffix_length) && + file_name.find_last_of(".proto") == file_name.size() - 1)) { + return false; + } + module_name = file_name.substr( + 0, file_name.size() - proto_suffix_length) + "_pb2"; + string package = type->file()->package(); + string module = (package.empty() ? "" : package + ".") + + module_name; + string message_type; + for (auto path_iter = message_path.rbegin(); + path_iter != message_path.rend(); ++path_iter) { + message_type += (*path_iter)->name() + "."; + } + message_type.pop_back(); + *out = make_pair(module, message_type); + return true; +} - for (int i = 0; i < service->method_count(); ++i) { - const MethodDescriptor* meth = service->method(i); - bool server_streaming = meth->server_streaming(); - bool client_streaming = meth->client_streaming(); - std::string blocking_call, future_call; - if (server_streaming) { - if (client_streaming) { - blocking_call = "stub_self._face_stub.inline_stream_in_stream_out"; - future_call = blocking_call; - } else { - blocking_call = "stub_self._face_stub.inline_value_in_stream_out"; - future_call = blocking_call; - } - } else { - if (client_streaming) { - blocking_call = "stub_self._face_stub.blocking_stream_in_value_out"; - future_call = "stub_self._face_stub.future_stream_in_value_out"; - } else { - blocking_call = "stub_self._face_stub.blocking_value_in_value_out"; - future_call = "stub_self._face_stub.future_value_in_value_out"; - } - } - // TODO(atash): use the solution described at - // http://stackoverflow.com/a/2982 to bind 'async' attribute - // functions to def'd functions instead of using callable attributes. - auto methdict = ListToDict({ - "Method", meth->name(), - "BlockingCall", blocking_call, - "FutureCall", future_call - }); - out->Print(methdict, "class $Method$(object):\n"); - { - IndentScope raii_callable_indent(out); - out->Print("def __call__(self, arg):\n"); - { - IndentScope raii_callable_call_indent(out); - out->Print(methdict, - "return $BlockingCall$(\"$Method$\", arg, " - "stub_self._default_timeout)\n"); - } - out->Print("def async(self, arg):\n"); - { - IndentScope raii_callable_async_indent(out); - out->Print(methdict, - "return $FutureCall$(\"$Method$\", arg, " - "stub_self._default_timeout)\n"); - } - } - out->Print(methdict, "self.$Method$ = $Method$()\n"); +bool PrintServerFactory(const ServiceDescriptor* service, Printer* out) { + out->Print("def early_adopter_create_$Service$_server(servicer, port, " + "root_certificates, key_chain_pairs):\n", + "Service", service->name()); + { + IndentScope raii_create_server_indent(out); + map<string, pair<string, string>> method_to_module_and_message; + out->Print("method_implementations = {\n"); + for (int i = 0; i < service->method_count(); ++i) { + IndentScope raii_implementations_indent(out); + const MethodDescriptor* meth = service->method(i); + string meth_type = + string(meth->client_streaming() ? "stream" : "unary") + + string(meth->server_streaming() ? "_stream" : "_unary") + "_inline"; + out->Print("\"$Method$\": utilities.$Type$(servicer.$Method$),\n", + "Method", meth->name(), + "Type", meth_type); + // Maintain information on the input type of the service method for later + // use in constructing the service assembly's activated fore link. + const Descriptor* input_type = meth->input_type(); + pair<string, string> module_and_message; + if (!GetModuleAndMessagePath(input_type, &module_and_message)) { + return false; } + method_to_module_and_message.insert( + make_pair(meth->name(), module_and_message)); + } + out->Print("}\n"); + // Ensure that we've imported all of the relevant messages. + for (auto& meth_vals : method_to_module_and_message) { + out->Print("import $Module$\n", + "Module", meth_vals.second.first); } + out->Print("request_deserializers = {\n"); + for (auto& meth_vals : method_to_module_and_message) { + IndentScope raii_serializers_indent(out); + string full_input_type_path = meth_vals.second.first + "." + + meth_vals.second.second; + out->Print("\"$Method$\": $Type$.FromString,\n", + "Method", meth_vals.first, + "Type", full_input_type_path); + } + out->Print("}\n"); + out->Print("response_serializers = {\n"); + for (auto& meth_vals : method_to_module_and_message) { + IndentScope raii_serializers_indent(out); + out->Print("\"$Method$\": lambda x: x.SerializeToString(),\n", + "Method", meth_vals.first); + } + out->Print("}\n"); + out->Print("link = fore.activated_fore_link(port, request_deserializers, " + "response_serializers, root_certificates, key_chain_pairs)\n"); + out->Print("return implementations.assemble_service(" + "method_implementations, link)\n"); } + return true; } -void PrintStubGenerators(const ServiceDescriptor* service, Printer* out) { +bool PrintStubFactory(const ServiceDescriptor* service, Printer* out) { map<string, string> dict = ListToDict({ "Service", service->name(), }); - // Write out a generator of linked pairs of Server/Stub - out->Print(dict, "def mock_$Service$(servicer, default_timeout):\n"); + out->Print(dict, "def early_adopter_create_$Service$_stub(host, port):\n"); { - IndentScope raii_mock_indent(out); - out->Print("value_in_value_out = {}\n" - "value_in_stream_out = {}\n" - "stream_in_value_out = {}\n" - "stream_in_stream_out = {}\n"); + IndentScope raii_create_server_indent(out); + map<string, pair<string, string>> method_to_module_and_message; + out->Print("method_implementations = {\n"); for (int i = 0; i < service->method_count(); ++i) { + IndentScope raii_implementations_indent(out); const MethodDescriptor* meth = service->method(i); - std::string super_interface, meth_dict; - bool server_streaming = meth->server_streaming(); - bool client_streaming = meth->client_streaming(); - if (server_streaming) { - if (client_streaming) { - super_interface = "InlineStreamInStreamOutMethod"; - meth_dict = "stream_in_stream_out"; - } else { - super_interface = "InlineValueInStreamOutMethod"; - meth_dict = "value_in_stream_out"; - } - } else { - if (client_streaming) { - super_interface = "InlineStreamInValueOutMethod"; - meth_dict = "stream_in_value_out"; - } else { - super_interface = "InlineValueInValueOutMethod"; - meth_dict = "value_in_value_out"; - } - } - map<string, string> methdict = ListToDict({ - "Method", meth->name(), - "SuperInterface", super_interface, - "MethodDict", meth_dict - }); - out->Print( - methdict, "class $Method$(_face_interfaces.$SuperInterface$):\n"); - { - IndentScope raii_inline_class_indent(out); - out->Print("def service(self, request, context):\n"); - { - IndentScope raii_inline_class_fn_indent(out); - out->Print(methdict, "return servicer.$Method$(request)\n"); - } + string meth_type = + string(meth->client_streaming() ? "stream" : "unary") + + string(meth->server_streaming() ? "_stream" : "_unary") + "_inline"; + // TODO(atash): once the expected input to assemble_dynamic_inline_stub is + // cleaned up, change this to the expected argument's dictionary values. + out->Print("\"$Method$\": utilities.$Type$(None),\n", + "Method", meth->name(), + "Type", meth_type); + // Maintain information on the input type of the service method for later + // use in constructing the service assembly's activated fore link. + const Descriptor* output_type = meth->output_type(); + pair<string, string> module_and_message; + if (!GetModuleAndMessagePath(output_type, &module_and_message)) { + return false; } - out->Print(methdict, "$MethodDict$['$Method$'] = $Method$()\n"); + method_to_module_and_message.insert( + make_pair(meth->name(), module_and_message)); } - out->Print( - "face_linked_pair = _face_testing.server_and_stub(default_timeout," - "inline_value_in_value_out_methods=value_in_value_out," - "inline_value_in_stream_out_methods=value_in_stream_out," - "inline_stream_in_value_out_methods=stream_in_value_out," - "inline_stream_in_stream_out_methods=stream_in_stream_out)\n"); - out->Print("class LinkedPair(object):\n"); - { - IndentScope raii_linked_pair(out); - out->Print("def __init__(self, server, stub):\n"); - { - IndentScope raii_linked_pair_init(out); - out->Print("self.server = server\n" - "self.stub = stub\n"); - } + out->Print("}\n"); + // Ensure that we've imported all of the relevant messages. + for (auto& meth_vals : method_to_module_and_message) { + out->Print("import $Module$\n", + "Module", meth_vals.second.first); } - out->Print( - dict, - "stub = _$Service$Stub(face_linked_pair.stub, default_timeout)\n"); - out->Print("return LinkedPair(None, stub)\n"); + out->Print("response_deserializers = {\n"); + for (auto& meth_vals : method_to_module_and_message) { + IndentScope raii_serializers_indent(out); + string full_output_type_path = meth_vals.second.first + "." + + meth_vals.second.second; + out->Print("\"$Method$\": $Type$.FromString,\n", + "Method", meth_vals.first, + "Type", full_output_type_path); + } + out->Print("}\n"); + out->Print("request_serializers = {\n"); + for (auto& meth_vals : method_to_module_and_message) { + IndentScope raii_serializers_indent(out); + out->Print("\"$Method$\": lambda x: x.SerializeToString(),\n", + "Method", meth_vals.first); + } + out->Print("}\n"); + out->Print("link = rear.activated_rear_link(" + "host, port, request_serializers, response_deserializers)\n"); + out->Print("return implementations.assemble_dynamic_inline_stub(" + "method_implementations, link)\n"); } + return true; +} + +bool PrintPreamble(const FileDescriptor* file, Printer* out) { + out->Print("import abc\n"); + out->Print("from grpc._adapter import fore\n"); + out->Print("from grpc._adapter import rear\n"); + out->Print("from grpc.framework.assembly import implementations\n"); + out->Print("from grpc.framework.assembly import utilities\n"); + return true; } } // namespace -string GetServices(const FileDescriptor* file) { +pair<bool, string> GetServices(const FileDescriptor* file) { string output; - StringOutputStream output_stream(&output); - Printer out(&output_stream, '$'); - out.Print("from grpc.framework.face import demonstration as _face_testing\n"); - out.Print("from grpc.framework.face import interfaces as _face_interfaces\n"); - - for (int i = 0; i < file->service_count(); ++i) { - auto service = file->service(i); - PrintService(service, &out); - PrintServicer(service, &out); - PrintStub(service, &out); - PrintStubImpl(service, &out); - PrintStubGenerators(service, &out); + { + // Scope the output stream so it closes and finalizes output to the string. + StringOutputStream output_stream(&output); + Printer out(&output_stream, '$'); + if (!PrintPreamble(file, &out)) { + return make_pair(false, ""); + } + for (int i = 0; i < file->service_count(); ++i) { + auto service = file->service(i); + if (!(PrintServicer(service, &out) && + PrintServer(service, &out) && + PrintStub(service, &out) && + PrintServerFactory(service, &out) && + PrintStubFactory(service, &out))) { + return make_pair(false, ""); + } + } } - return output; + return make_pair(true, std::move(output)); } } // namespace grpc_python_generator diff --git a/src/compiler/python_generator.h b/src/compiler/python_generator.h index 673ef7b23b..773dfa3513 100644 --- a/src/compiler/python_generator.h +++ b/src/compiler/python_generator.h @@ -35,6 +35,7 @@ #define __GRPC_COMPILER_PYTHON_GENERATOR_H__ #include <string> +#include <utility> namespace google { namespace protobuf { @@ -44,7 +45,7 @@ class FileDescriptor; namespace grpc_python_generator { -std::string GetServices(const google::protobuf::FileDescriptor* file); +std::pair<bool, std::string> GetServices(const google::protobuf::FileDescriptor* file); } // namespace grpc_python_generator diff --git a/src/compiler/python_plugin.cc b/src/compiler/python_plugin.cc index 05c6b095d8..ed1e0494fb 100644 --- a/src/compiler/python_plugin.cc +++ b/src/compiler/python_plugin.cc @@ -33,6 +33,7 @@ // Generates a Python gRPC service interface out of Protobuf IDL. +#include <cstring> #include <memory> #include <string> @@ -50,6 +51,7 @@ using google::protobuf::compiler::PluginMain; using google::protobuf::io::CodedOutputStream; using google::protobuf::io::ZeroCopyOutputStream; using std::string; +using std::strlen; class PythonGrpcGenerator : public CodeGenerator { public: @@ -62,7 +64,7 @@ class PythonGrpcGenerator : public CodeGenerator { string* error) const override { // Get output file name. string file_name; - static const int proto_suffix_length = 6; // length of ".proto" + static const int proto_suffix_length = strlen(".proto"); if (file->name().size() > static_cast<size_t>(proto_suffix_length) && file->name().find_last_of(".proto") == file->name().size() - 1) { file_name = file->name().substr( @@ -75,9 +77,15 @@ class PythonGrpcGenerator : public CodeGenerator { std::unique_ptr<ZeroCopyOutputStream> output( context->OpenForInsert(file_name, "module_scope")); CodedOutputStream coded_out(output.get()); - string code = grpc_python_generator::GetServices(file); - coded_out.WriteRaw(code.data(), code.size()); - return true; + bool success = false; + string code = ""; + tie(success, code) = grpc_python_generator::GetServices(file); + if (success) { + coded_out.WriteRaw(code.data(), code.size()); + return true; + } else { + return false; + } } }; diff --git a/src/core/debug/trace.c b/src/core/debug/trace.c index 92acbe924d..b8eb755bff 100644 --- a/src/core/debug/trace.c +++ b/src/core/debug/trace.c @@ -81,6 +81,8 @@ static void parse(const char *s) { grpc_trace_bits |= GRPC_TRACE_TCP; } else if (0 == strcmp(s, "secure_endpoint")) { grpc_trace_bits |= GRPC_TRACE_SECURE_ENDPOINT; + } else if (0 == strcmp(s, "http")) { + grpc_trace_bits |= GRPC_TRACE_HTTP; } else if (0 == strcmp(s, "all")) { grpc_trace_bits = -1; } else { diff --git a/src/core/debug/trace.h b/src/core/debug/trace.h index 167ef3c6ea..bf9b8a3642 100644 --- a/src/core/debug/trace.h +++ b/src/core/debug/trace.h @@ -45,7 +45,8 @@ typedef enum { GRPC_TRACE_SURFACE = 1 << 0, GRPC_TRACE_CHANNEL = 1 << 1, GRPC_TRACE_TCP = 1 << 2, - GRPC_TRACE_SECURE_ENDPOINT = 1 << 3 + GRPC_TRACE_SECURE_ENDPOINT = 1 << 3, + GRPC_TRACE_HTTP = 1 << 4 } grpc_trace_bit_value; #if GRPC_ENABLE_TRACING diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 41fd24e05a..abdd49bbda 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -38,6 +38,7 @@ #include "src/core/iomgr/fd_posix.h" #include <assert.h> +#include <sys/socket.h> #include <unistd.h> #include "src/core/iomgr/iomgr_internal.h" @@ -113,6 +114,7 @@ static void ref_by(grpc_fd *fd, int n) { static void unref_by(grpc_fd *fd, int n) { gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { + close(fd->fd); grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); freelist_fd(fd); grpc_iomgr_unref(); @@ -158,9 +160,9 @@ static void wake_watchers(grpc_fd *fd) { void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { fd->on_done = on_done ? on_done : do_nothing; fd->on_done_user_data = user_data; + shutdown(fd->fd, SHUT_RDWR); ref_by(fd, 1); /* remove active status, but keep referenced */ wake_watchers(fd); - close(fd->fd); unref_by(fd, 2); /* drop the reference */ } diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 9d04b014ba..c26947f37c 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -52,9 +52,14 @@ #include "src/core/iomgr/pollset_windows.h" #endif + void grpc_pollset_init(grpc_pollset *pollset); +void grpc_pollset_shutdown(grpc_pollset *pollset, + void (*shutdown_done)(void *arg), + void *shutdown_done_arg); void grpc_pollset_destroy(grpc_pollset *pollset); + /* Do some work on a pollset. May involve invoking asynchronous callbacks, or actually polling file descriptors. diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 87e7aa85ee..f0a8453fd7 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -55,6 +55,7 @@ static grpc_pollset g_backup_pollset; static int g_shutdown_backup_poller; static gpr_event g_backup_poller_done; +static gpr_event g_backup_pollset_shutdown_done; static void backup_poller(void *p) { gpr_timespec delta = gpr_time_from_millis(100); @@ -104,9 +105,14 @@ void grpc_pollset_global_init(void) { /* start the backup poller thread */ g_shutdown_backup_poller = 0; gpr_event_init(&g_backup_poller_done); + gpr_event_init(&g_backup_pollset_shutdown_done); gpr_thd_new(&id, backup_poller, NULL, NULL); } +static void on_backup_pollset_shutdown_done(void *arg) { + gpr_event_set(&g_backup_pollset_shutdown_done, (void *)1); +} + void grpc_pollset_global_shutdown(void) { /* terminate the backup poller thread */ gpr_mu_lock(&g_backup_pollset.mu); @@ -114,6 +120,10 @@ void grpc_pollset_global_shutdown(void) { gpr_mu_unlock(&g_backup_pollset.mu); gpr_event_wait(&g_backup_poller_done, gpr_inf_future); + grpc_pollset_shutdown(&g_backup_pollset, on_backup_pollset_shutdown_done, + NULL); + gpr_event_wait(&g_backup_pollset_shutdown_done, gpr_inf_future); + /* destroy the backup pollset */ grpc_pollset_destroy(&g_backup_pollset); @@ -130,6 +140,8 @@ void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); gpr_cv_init(&pollset->cv); grpc_pollset_kick_init(&pollset->kick_state); + pollset->in_flight_cbs = 0; + pollset->shutting_down = 0; become_empty_pollset(pollset); } @@ -163,7 +175,24 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { return pollset->vtable->maybe_work(pollset, deadline, now, 1); } +void grpc_pollset_shutdown(grpc_pollset *pollset, + void (*shutdown_done)(void *arg), + void *shutdown_done_arg) { + int in_flight_cbs; + gpr_mu_lock(&pollset->mu); + pollset->shutting_down = 1; + in_flight_cbs = pollset->in_flight_cbs; + pollset->shutdown_done_cb = shutdown_done; + pollset->shutdown_done_arg = shutdown_done_arg; + gpr_mu_unlock(&pollset->mu); + if (in_flight_cbs == 0) { + shutdown_done(shutdown_done_arg); + } +} + void grpc_pollset_destroy(grpc_pollset *pollset) { + GPR_ASSERT(pollset->shutting_down); + GPR_ASSERT(pollset->in_flight_cbs == 0); pollset->vtable->destroy(pollset); grpc_pollset_kick_destroy(&pollset->kick_state); gpr_mu_destroy(&pollset->mu); @@ -201,21 +230,119 @@ static void become_empty_pollset(grpc_pollset *pollset) { * via poll() */ + +typedef struct grpc_unary_promote_args { + const grpc_pollset_vtable *original_vtable; + grpc_pollset *pollset; + grpc_fd *fd; +} grpc_unary_promote_args; + +static void unary_poll_do_promote(void *args, int success) { + grpc_unary_promote_args *up_args = args; + const grpc_pollset_vtable *original_vtable = up_args->original_vtable; + grpc_pollset *pollset = up_args->pollset; + grpc_fd *fd = up_args->fd; + int do_shutdown_cb = 0; + gpr_free(up_args); + + /* + * This is quite tricky. There are a number of cases to keep in mind here: + * 1. fd may have been orphaned + * 2. The pollset may no longer be a unary poller (and we can't let case #1 + * leak to other pollset types!) + * 3. pollset's fd (which may have changed) may have been orphaned + * 4. The pollset may be shutting down. + */ + + gpr_mu_lock(&pollset->mu); + /* First we need to ensure that nobody is polling concurrently */ + while (pollset->counter != 0) { + grpc_pollset_kick(pollset); + gpr_cv_wait(&pollset->cv, &pollset->mu, gpr_inf_future); + } + /* At this point the pollset may no longer be a unary poller. In that case + * we should just call the right add function and be done. */ + /* TODO(klempner): If we're not careful this could cause infinite recursion. + * That's not a problem for now because empty_pollset has a trivial poller + * and we don't have any mechanism to unbecome multipoller. */ + pollset->in_flight_cbs--; + if (pollset->shutting_down) { + gpr_log(GPR_INFO, "Shutting down"); + /* We don't care about this pollset anymore. */ + if (pollset->in_flight_cbs == 0) { + do_shutdown_cb = 1; + } + } else if (grpc_fd_is_orphaned(fd)) { + /* Don't try to add it to anything, we'll drop our ref on it below */ + } else if (pollset->vtable != original_vtable) { + gpr_log(GPR_INFO, "Not original vtable"); + pollset->vtable->add_fd(pollset, fd); + } else if (fd != pollset->data.ptr) { + grpc_fd *fds[2]; + fds[0] = pollset->data.ptr; + fds[1] = fd; + + if (!grpc_fd_is_orphaned(fds[0])) { + grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_fd_unref(fds[0]); + } else { + /* old fd is orphaned and we haven't cleaned it up until now, so remain a + * unary poller */ + /* Note that it is possible that fds[1] is also orphaned at this point. + * That's okay, we'll correct it at the next add or poll. */ + grpc_fd_unref(fds[0]); + pollset->data.ptr = fd; + grpc_fd_ref(fd); + } + } + + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); + + if (do_shutdown_cb) { + pollset->shutdown_done_cb(pollset->shutdown_done_arg); + } + + /* Matching ref in unary_poll_pollset_add_fd */ + grpc_fd_unref(fd); +} + static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { - grpc_fd *fds[2]; + grpc_unary_promote_args *up_args; if (fd == pollset->data.ptr) return; - fds[0] = pollset->data.ptr; - fds[1] = fd; - if (!grpc_fd_is_orphaned(fds[0])) { - grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); - grpc_fd_unref(fds[0]); - } else { - /* old fd is orphaned and we haven't cleaned it up until now, so remain a - * unary poller */ - grpc_fd_unref(fds[0]); - pollset->data.ptr = fd; - grpc_fd_ref(fd); + + if (!pollset->counter) { + /* Fast path -- no in flight cbs */ + /* TODO(klempner): Comment this out and fix any test failures or establish + * they are due to timing issues */ + grpc_fd *fds[2]; + fds[0] = pollset->data.ptr; + fds[1] = fd; + + if (!grpc_fd_is_orphaned(fds[0])) { + grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_fd_unref(fds[0]); + } else { + /* old fd is orphaned and we haven't cleaned it up until now, so remain a + * unary poller */ + grpc_fd_unref(fds[0]); + pollset->data.ptr = fd; + grpc_fd_ref(fd); + } + return; } + + /* Now we need to promote. This needs to happen when we're not polling. Since + * this may be called from poll, the wait needs to happen asynchronously. */ + grpc_fd_ref(fd); + pollset->in_flight_cbs++; + up_args = gpr_malloc(sizeof(*up_args)); + up_args->pollset = pollset; + up_args->fd = fd; + up_args->original_vtable = pollset->vtable; + grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + + grpc_pollset_kick(pollset); } static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { @@ -238,6 +365,10 @@ static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, if (pollset->counter) { return 0; } + if (pollset->in_flight_cbs) { + /* Give do_promote priority so we don't starve it out */ + return 0; + } fd = pollset->data.ptr; if (grpc_fd_is_orphaned(fd)) { grpc_fd_unref(fd); diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 03b4c775b7..86b6c9f20e 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -55,6 +55,10 @@ typedef struct grpc_pollset { gpr_cv cv; grpc_pollset_kick_state kick_state; int counter; + int in_flight_cbs; + int shutting_down; + void (*shutdown_done_cb)(void *arg); + void *shutdown_done_arg; union { int fd; void *ptr; diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index d21072b283..bea6711611 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -46,6 +46,12 @@ void grpc_pollset_init(grpc_pollset *pollset) { gpr_cv_init(&pollset->cv); } +void grpc_pollset_shutdown(grpc_pollset *pollset, + void (*shutdown_done)(void *arg), + void *shutdown_done_arg) { + shutdown_done(shutdown_done_arg); +} + void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); gpr_cv_destroy(&pollset->cv); diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index edf40b5ad1..989b968ae2 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -66,7 +66,6 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( int s; size_t i; grpc_resolved_addresses *addrs = NULL; - const gpr_timespec start_time = gpr_now(); struct sockaddr_un *un; if (name[0] == 'u' && name[1] == 'n' && name[2] == 'i' && name[3] == 'x' && @@ -121,22 +120,6 @@ grpc_resolved_addresses *grpc_blocking_resolve_address( i++; } - /* Temporary logging, to help identify flakiness in dualstack_socket_test. */ - { - const gpr_timespec delay = gpr_time_sub(gpr_now(), start_time); - const int delay_ms = - delay.tv_sec * GPR_MS_PER_SEC + delay.tv_nsec / GPR_NS_PER_MS; - gpr_log(GPR_INFO, "logspam: getaddrinfo(%s, %s) resolved %d addrs in %dms:", - host, port, addrs->naddrs, delay_ms); - for (i = 0; i < addrs->naddrs; i++) { - char *buf; - grpc_sockaddr_to_string(&buf, (struct sockaddr *)&addrs->addrs[i].addr, - 0); - gpr_log(GPR_INFO, "logspam: [%d] %s", i, buf); - gpr_free(buf); - } - } - done: gpr_free(host); gpr_free(port); diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c index 0a65480b2f..62264e4105 100644 --- a/src/core/security/security_context.c +++ b/src/core/security/security_context.c @@ -43,7 +43,9 @@ #include "src/core/support/file.h" #include "src/core/support/string.h" #include "src/core/transport/chttp2/alpn.h" + #include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/slice_buffer.h> #include "src/core/tsi/fake_transport_security.h" @@ -51,20 +53,33 @@ /* -- Constants. -- */ -/* Defines the cipher suites that we accept. All these cipher suites are - compliant with TLS 1.2 and use an RSA public key. We prefer GCM over CBC - and ECDHE-RSA over just RSA. */ -#define GRPC_SSL_CIPHER_SUITES \ - "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:AES128-GCM-SHA256:" \ - "AES256-GCM-SHA384:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA384:AES128-" \ - "SHA256:AES256-SHA256" - #ifndef INSTALL_PREFIX static const char *installed_roots_path = "/usr/share/grpc/roots.pem"; #else static const char *installed_roots_path = INSTALL_PREFIX "/share/grpc/roots.pem"; #endif +/* -- Cipher suites. -- */ + +/* Defines the cipher suites that we accept by default. All these cipher suites + are compliant with HTTP2. */ +#define GRPC_SSL_CIPHER_SUITES \ + "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-" \ + "SHA384:ECDHE-RSA-AES256-GCM-SHA384" + +static gpr_once cipher_suites_once = GPR_ONCE_INIT; +static const char *cipher_suites = NULL; + +static void init_cipher_suites(void) { + char *overridden = gpr_getenv("GRPC_SSL_CIPHER_SUITES"); + cipher_suites = overridden != NULL ? overridden : GRPC_SSL_CIPHER_SUITES; +} + +static const char *ssl_cipher_suites(void) { + gpr_once_init(&cipher_suites_once, init_cipher_suites); + return cipher_suites; +} + /* -- Common methods. -- */ grpc_security_status grpc_security_context_create_handshaker( @@ -322,6 +337,24 @@ static grpc_security_status ssl_server_create_handshaker( return ssl_create_handshaker(c->handshaker_factory, 0, NULL, handshaker); } +static int ssl_host_matches_name(const tsi_peer *peer, + const char *peer_name) { + char *allocated_name = NULL; + int r; + + if (strchr(peer_name, ':') != NULL) { + char *ignored_port; + gpr_split_host_port(peer_name, &allocated_name, &ignored_port); + gpr_free(ignored_port); + peer_name = allocated_name; + if (!peer_name) return 0; + } + + r = tsi_ssl_peer_matches_name(peer, peer_name); + gpr_free(allocated_name); + return r; +} + static grpc_security_status ssl_check_peer(const char *peer_name, const tsi_peer *peer) { /* Check the ALPN. */ @@ -343,10 +376,11 @@ static grpc_security_status ssl_check_peer(const char *peer_name, /* Check the peer name if specified. */ if (peer_name != NULL && - !tsi_ssl_peer_matches_name(peer, peer_name)) { + !ssl_host_matches_name(peer, peer_name)) { gpr_log(GPR_ERROR, "Peer name %s is not in peer certificate", peer_name); return GRPC_SECURITY_ERROR; } + return GRPC_SECURITY_OK; } @@ -382,7 +416,7 @@ static grpc_security_status ssl_channel_check_call_host( grpc_ssl_channel_security_context *c = (grpc_ssl_channel_security_context *)ctx; - if (tsi_ssl_peer_matches_name(&c->peer, host)) return GRPC_SECURITY_OK; + if (ssl_host_matches_name(&c->peer, host)) return GRPC_SECURITY_OK; /* If the target name was overridden, then the original target_name was 'checked' transitively during the previous peer check at the end of the @@ -442,6 +476,7 @@ grpc_security_status grpc_ssl_channel_security_context_create( size_t i; const unsigned char *pem_root_certs; size_t pem_root_certs_size; + char *port; for (i = 0; i < num_alpn_protocols; i++) { alpn_protocol_strings[i] = @@ -467,9 +502,8 @@ grpc_security_status grpc_ssl_channel_security_context_create( c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; c->base.request_metadata_creds = grpc_credentials_ref(request_metadata_creds); c->base.check_call_host = ssl_channel_check_call_host; - if (target_name != NULL) { - c->target_name = gpr_strdup(target_name); - } + gpr_split_host_port(target_name, &c->target_name, &port); + gpr_free(port); if (overridden_target_name != NULL) { c->overridden_target_name = gpr_strdup(overridden_target_name); } @@ -486,7 +520,7 @@ grpc_security_status grpc_ssl_channel_security_context_create( result = tsi_create_ssl_client_handshaker_factory( config->pem_private_key, config->pem_private_key_size, config->pem_cert_chain, config->pem_cert_chain_size, pem_root_certs, - pem_root_certs_size, GRPC_SSL_CIPHER_SUITES, alpn_protocol_strings, + pem_root_certs_size, ssl_cipher_suites(), alpn_protocol_strings, alpn_protocol_string_lengths, num_alpn_protocols, &c->handshaker_factory); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", @@ -540,7 +574,7 @@ grpc_security_status grpc_ssl_server_security_context_create( (const unsigned char **)config->pem_cert_chains, config->pem_cert_chains_sizes, config->num_key_cert_pairs, config->pem_root_certs, config->pem_root_certs_size, - GRPC_SSL_CIPHER_SUITES, alpn_protocol_strings, + ssl_cipher_suites(), alpn_protocol_strings, alpn_protocol_string_lengths, num_alpn_protocols, &c->handshaker_factory); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", diff --git a/src/core/statistics/census_init.c b/src/core/statistics/census_init.c index 820d75f795..e6306f5e6f 100644 --- a/src/core/statistics/census_init.c +++ b/src/core/statistics/census_init.c @@ -38,13 +38,11 @@ #include "src/core/statistics/census_tracing.h" void census_init(void) { - gpr_log(GPR_INFO, "Initialize census library."); census_tracing_init(); census_stats_store_init(); } void census_shutdown(void) { - gpr_log(GPR_INFO, "Shutdown census library."); census_stats_store_shutdown(); census_tracing_shutdown(); } diff --git a/src/core/statistics/census_rpc_stats.c b/src/core/statistics/census_rpc_stats.c index 388ce4fe2c..0491c91947 100644 --- a/src/core/statistics/census_rpc_stats.c +++ b/src/core/statistics/census_rpc_stats.c @@ -222,7 +222,6 @@ void census_get_server_stats(census_aggregated_rpc_stats* data) { } void census_stats_store_init(void) { - gpr_log(GPR_INFO, "Initialize census stats store."); init_mutex_once(); gpr_mu_lock(&g_mu); if (g_client_stats_store == NULL && g_server_stats_store == NULL) { @@ -235,7 +234,6 @@ void census_stats_store_init(void) { } void census_stats_store_shutdown(void) { - gpr_log(GPR_INFO, "Shutdown census stats store."); init_mutex_once(); gpr_mu_lock(&g_mu); if (g_client_stats_store != NULL) { diff --git a/src/core/statistics/census_tracing.c b/src/core/statistics/census_tracing.c index adfcbecb4c..05e72b99c0 100644 --- a/src/core/statistics/census_tracing.c +++ b/src/core/statistics/census_tracing.c @@ -154,7 +154,6 @@ void census_tracing_end_op(census_op_id op_id) { } void census_tracing_init(void) { - gpr_log(GPR_INFO, "Initialize census trace store."); init_mutex_once(); gpr_mu_lock(&g_mu); if (g_trace_store == NULL) { @@ -167,7 +166,6 @@ void census_tracing_init(void) { } void census_tracing_shutdown(void) { - gpr_log(GPR_INFO, "Shutdown census trace store."); gpr_mu_lock(&g_mu); if (g_trace_store != NULL) { census_ht_destroy(g_trace_store); diff --git a/src/core/support/cpu_posix.c b/src/core/support/cpu_posix.c index 33c7b90b0b..5f45fb0bc3 100644 --- a/src/core/support/cpu_posix.c +++ b/src/core/support/cpu_posix.c @@ -40,6 +40,7 @@ #include <string.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> static __thread char magic_thread_local; @@ -55,7 +56,7 @@ static void init_ncpus() { unsigned gpr_cpu_num_cores(void) { static gpr_once once = GPR_ONCE_INIT; - gpr_once_init(&once, init_num_cpus); + gpr_once_init(&once, init_ncpus); return ncpus; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 2efc084d7b..c4b8d60782 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -389,12 +389,17 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { } } -void grpc_completion_queue_destroy(grpc_completion_queue *cc) { - GPR_ASSERT(cc->queue == NULL); +static void on_pollset_destroy_done(void *arg) { + grpc_completion_queue *cc = arg; grpc_pollset_destroy(&cc->pollset); gpr_free(cc); } +void grpc_completion_queue_destroy(grpc_completion_queue *cc) { + GPR_ASSERT(cc->queue == NULL); + grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); +} + void grpc_event_finish(grpc_event *base) { event *ev = (event *)base; ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK); diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c index 06429e220b..e6c4b7e38f 100644 --- a/src/core/transport/chttp2/frame_settings.c +++ b/src/core/transport/chttp2/frame_settings.c @@ -35,6 +35,7 @@ #include <string.h> +#include "src/core/debug/trace.h" #include "src/core/transport/chttp2/frame.h" #include <grpc/support/log.h> #include <grpc/support/useful.h> @@ -53,7 +54,8 @@ const grpc_chttp2_setting_parameters {"MAX_FRAME_SIZE", 16384, 16384, 16777215, GRPC_CHTTP2_DISCONNECT_ON_INVALID_VALUE}, {"MAX_HEADER_LIST_SIZE", 0xffffffffu, 0, 0xffffffffu, - GRPC_CHTTP2_CLAMP_INVALID_VALUE}, }; + GRPC_CHTTP2_CLAMP_INVALID_VALUE}, +}; static gpr_uint8 *fill_header(gpr_uint8 *out, gpr_uint32 length, gpr_uint8 flags) { @@ -155,7 +157,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( } return GRPC_CHTTP2_PARSE_OK; } - parser->id = ((gpr_uint16) * cur) << 8; + parser->id = ((gpr_uint16)*cur) << 8; cur++; /* fallthrough */ case GRPC_CHTTP2_SPS_ID1: @@ -171,7 +173,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( parser->state = GRPC_CHTTP2_SPS_VAL0; return GRPC_CHTTP2_PARSE_OK; } - parser->value = ((gpr_uint32) * cur) << 24; + parser->value = ((gpr_uint32)*cur) << 24; cur++; /* fallthrough */ case GRPC_CHTTP2_SPS_VAL1: @@ -179,7 +181,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( parser->state = GRPC_CHTTP2_SPS_VAL1; return GRPC_CHTTP2_PARSE_OK; } - parser->value |= ((gpr_uint32) * cur) << 16; + parser->value |= ((gpr_uint32)*cur) << 16; cur++; /* fallthrough */ case GRPC_CHTTP2_SPS_VAL2: @@ -187,7 +189,7 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( parser->state = GRPC_CHTTP2_SPS_VAL2; return GRPC_CHTTP2_PARSE_OK; } - parser->value |= ((gpr_uint32) * cur) << 8; + parser->value |= ((gpr_uint32)*cur) << 8; cur++; /* fallthrough */ case GRPC_CHTTP2_SPS_VAL3: @@ -216,8 +218,10 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( } } parser->incoming_settings[parser->id] = parser->value; - gpr_log(GPR_DEBUG, "CHTTP2: got setting %d = %d", parser->id, - parser->value); + if (grpc_trace_bits & GRPC_TRACE_HTTP) { + gpr_log(GPR_DEBUG, "CHTTP2: got setting %d = %d", parser->id, + parser->value); + } } else { gpr_log(GPR_ERROR, "CHTTP2: Ignoring unknown setting %d (value %d)", parser->id, parser->value); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index ccd8d0c376..476cc4b226 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -37,6 +37,7 @@ #include <stdio.h> #include <string.h> +#include "src/core/debug/trace.h" #include "src/core/support/string.h" #include "src/core/transport/chttp2/frame_data.h" #include "src/core/transport/chttp2/frame_goaway.h" @@ -66,6 +67,12 @@ typedef struct transport transport; typedef struct stream stream; +#define IF_TRACING(stmt) \ + if (!(grpc_trace_bits & GRPC_TRACE_HTTP)) \ + ; \ + else \ + stmt + /* streams are kept in various linked lists depending on what things need to happen to them... this enum labels each list */ typedef enum { @@ -301,7 +308,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id, gpr_uint32 value); static int prepare_callbacks(transport *t); -static void run_callbacks(transport *t); +static void run_callbacks(transport *t, const grpc_transport_callbacks *cb); static int prepare_write(transport *t); static void perform_write(transport *t, grpc_endpoint *ep); @@ -552,7 +559,7 @@ static int init_stream(grpc_transport *gt, grpc_stream *gs, lock(t); s->id = 0; } else { - s->id = (gpr_uint32)(gpr_uintptr)server_data; + s->id = (gpr_uint32)(gpr_uintptr) server_data; t->incoming_stream = s; grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); } @@ -706,6 +713,7 @@ static void unlock(transport *t) { pending_goaway *goaways = NULL; grpc_endpoint *ep = t->ep; grpc_stream_op_buffer nuke_now; + const grpc_transport_callbacks *cb = t->cb; grpc_sopb_init(&nuke_now); if (t->nuke_later_sopb.nops) { @@ -725,7 +733,7 @@ static void unlock(transport *t) { } /* gather any callbacks that need to be made */ - if (!t->calling_back && t->cb) { + if (!t->calling_back && cb) { perform_callbacks = prepare_callbacks(t); if (perform_callbacks) { t->calling_back = 1; @@ -733,6 +741,7 @@ static void unlock(transport *t) { if (t->error_state == ERROR_STATE_SEEN) { call_closed = 1; t->calling_back = 1; + t->cb = NULL; /* no more callbacks */ t->error_state = ERROR_STATE_NOTIFIED; } if (t->num_pending_goaways) { @@ -754,16 +763,16 @@ static void unlock(transport *t) { /* perform some callbacks if necessary */ for (i = 0; i < num_goaways; i++) { - t->cb->goaway(t->cb_user_data, &t->base, goaways[i].status, - goaways[i].debug); + cb->goaway(t->cb_user_data, &t->base, goaways[i].status, + goaways[i].debug); } if (perform_callbacks) { - run_callbacks(t); + run_callbacks(t, cb); } if (call_closed) { - t->cb->closed(t->cb_user_data, &t->base); + cb->closed(t->cb_user_data, &t->base); } /* write some bytes if necessary */ @@ -1206,6 +1215,11 @@ static void on_header(void *tp, grpc_mdelem *md) { stream *s = t->incoming_stream; GPR_ASSERT(s); + + IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:HDR: %s: %s", s->id, + grpc_mdstr_as_c_string(md->key), + grpc_mdstr_as_c_string(md->value))); + stream_list_join(t, s, PENDING_CALLBACKS); if (md->key == t->str_grpc_timeout) { gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout); @@ -1269,7 +1283,7 @@ static int init_header_frame_parser(transport *t, int is_continuation) { t->incoming_stream = NULL; /* if stream is accepted, we set incoming_stream in init_stream */ t->cb->accept_stream(t->cb_user_data, &t->base, - (void *)(gpr_uintptr)t->incoming_stream_id); + (void *)(gpr_uintptr) t->incoming_stream_id); s = t->incoming_stream; if (!s) { gpr_log(GPR_ERROR, "stream not accepted"); @@ -1534,8 +1548,8 @@ static int process_read(transport *t, gpr_slice slice) { "Connect string mismatch: expected '%c' (%d) got '%c' (%d) " "at byte %d", CLIENT_CONNECT_STRING[t->deframe_state], - (int)(gpr_uint8)CLIENT_CONNECT_STRING[t->deframe_state], *cur, - (int)*cur, t->deframe_state); + (int)(gpr_uint8) CLIENT_CONNECT_STRING[t->deframe_state], + *cur, (int)*cur, t->deframe_state); drop_connection(t); return 0; } @@ -1741,13 +1755,13 @@ static int prepare_callbacks(transport *t) { return n; } -static void run_callbacks(transport *t) { +static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) { stream *s; while ((s = stream_list_remove_head(t, EXECUTING_CALLBACKS))) { size_t nops = s->callback_sopb.nops; s->callback_sopb.nops = 0; - t->cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s, - s->callback_sopb.ops, nops, s->callback_state); + cb->recv_batch(t->cb_user_data, &t->base, (grpc_stream *)s, + s->callback_sopb.ops, nops, s->callback_state); } } @@ -1765,9 +1779,9 @@ static void add_to_pollset(grpc_transport *gt, grpc_pollset *pollset) { */ static const grpc_transport_vtable vtable = { - sizeof(stream), init_stream, send_batch, set_allow_window_updates, - add_to_pollset, destroy_stream, abort_stream, goaway, close_transport, - send_ping, destroy_transport}; + sizeof(stream), init_stream, send_batch, set_allow_window_updates, + add_to_pollset, destroy_stream, abort_stream, goaway, + close_transport, send_ping, destroy_transport}; void grpc_create_chttp2_transport(grpc_transport_setup_callback setup, void *arg, diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 85b0922a43..8446cc4fdc 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -180,6 +180,30 @@ static void ssl_info_callback(const SSL* ssl, int where, int ret) { ssl_log_where_info(ssl, where, SSL_CB_HANDSHAKE_DONE, "HANDSHAKE DONE"); } +/* Returns 1 if name looks like an IP address, 0 otherwise. + This is a very rough heuristic as it does not handle IPV6 or things like: + 0300.0250.00.01, 0xC0.0Xa8.0x0.0x1, 000030052000001, 0xc0.052000001 */ +static int looks_like_ip_address(const char *name) { + size_t i; + size_t dot_count = 0; + size_t num_size = 0; + for (i = 0; i < strlen(name); i++) { + if (name[i] >= '0' && name[i] <= '9') { + if (num_size > 3) return 0; + num_size++; + } else if (name[i] == '.') { + if (dot_count > 3 || num_size == 0) return 0; + dot_count++; + num_size = 0; + } else { + return 0; + } + } + if (dot_count < 3 || num_size == 0) return 0; + return 1; +} + + /* Gets the subject CN from an X509 cert. */ static tsi_result ssl_get_x509_common_name(X509* cert, unsigned char** utf8, size_t* utf8_size) { @@ -226,10 +250,18 @@ static tsi_result peer_property_from_x509_common_name( size_t common_name_size; tsi_result result = ssl_get_x509_common_name(cert, &common_name, &common_name_size); - if (result != TSI_OK) return result; + if (result != TSI_OK) { + if (result == TSI_NOT_FOUND) { + common_name = NULL; + common_name_size = 0; + } else { + return result; + } + } result = tsi_construct_string_peer_property( - TSI_X509_SUBJECT_COMMON_NAME_PEER_PROPERTY, (const char*)common_name, - common_name_size, property); + TSI_X509_SUBJECT_COMMON_NAME_PEER_PROPERTY, + common_name == NULL ? "" : (const char*)common_name, common_name_size, + property); OPENSSL_free(common_name); return result; } @@ -1036,9 +1068,22 @@ static void ssl_server_handshaker_factory_destroy( static int does_entry_match_name(const char* entry, size_t entry_length, const char* name) { + const char *dot; const char* name_subdomain = NULL; + size_t name_length = strlen(name); + size_t name_subdomain_length; if (entry_length == 0) return 0; - if (!strncmp(name, entry, entry_length) && (strlen(name) == entry_length)) { + + /* Take care of '.' terminations. */ + if (name[name_length - 1] == '.') { + name_length--; + } + if (entry[entry_length - 1] == '.') { + entry_length--; + if (entry_length == 0) return 0; + } + + if ((name_length == entry_length) && !strncmp(name, entry, entry_length)) { return 1; /* Perfect match. */ } if (entry[0] != '*') return 0; @@ -1049,18 +1094,29 @@ static int does_entry_match_name(const char* entry, size_t entry_length, return 0; } name_subdomain = strchr(name, '.'); - if (name_subdomain == NULL || strlen(name_subdomain) < 2) return 0; + if (name_subdomain == NULL) return 0; + name_subdomain_length = strlen(name_subdomain); + if (name_subdomain_length < 2) return 0; name_subdomain++; /* Starts after the dot. */ + name_subdomain_length--; entry += 2; /* Remove *. */ entry_length -= 2; - return (!strncmp(entry, name_subdomain, entry_length) && - (strlen(name_subdomain) == entry_length)); + dot = strchr(name_subdomain, '.'); + if ((dot == NULL) || (dot == &name_subdomain[name_subdomain_length - 1])) { + gpr_log(GPR_ERROR, "Invalid toplevel subdomain: %s", name_subdomain); + return 0; + } + if (name_subdomain[name_subdomain_length - 1] == '.') { + name_subdomain_length--; + } + return ((entry_length > 0) && (name_subdomain_length == entry_length) && + !strncmp(entry, name_subdomain, entry_length)); } static int ssl_server_handshaker_factory_servername_callback(SSL* ssl, int* ap, void* arg) { tsi_ssl_server_handshaker_factory* impl = - (tsi_ssl_server_handshaker_factory*)arg; + (tsi_ssl_server_handshaker_factory*)arg; size_t i = 0; const char* servername = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name); if (servername == NULL || strlen(servername) == 0) { @@ -1283,17 +1339,13 @@ tsi_result tsi_create_ssl_server_handshaker_factory( int tsi_ssl_peer_matches_name(const tsi_peer* peer, const char* name) { size_t i = 0; - const tsi_peer_property* property = tsi_peer_get_property_by_name( - peer, TSI_X509_SUBJECT_COMMON_NAME_PEER_PROPERTY); - if (property == NULL || property->type != TSI_PEER_PROPERTY_TYPE_STRING) { - gpr_log(GPR_ERROR, "Invalid x509 subject common name property."); - return 0; - } - if (does_entry_match_name(property->value.string.data, - property->value.string.length, name)) { - return 1; - } + size_t san_count = 0; + const tsi_peer_property* property = NULL; + /* For now reject what looks like an IP address. */ + if (looks_like_ip_address(name)) return 0; + + /* Check the SAN first. */ property = tsi_peer_get_property_by_name( peer, TSI_X509_SUBJECT_ALTERNATIVE_NAMES_PEER_PROPERTY); if (property == NULL || property->type != TSI_PEER_PROPERTY_TYPE_LIST) { @@ -1301,7 +1353,8 @@ int tsi_ssl_peer_matches_name(const tsi_peer* peer, const char* name) { return 0; } - for (i = 0; i < property->value.list.child_count; i++) { + san_count = property->value.list.child_count; + for (i = 0; i < san_count; i++) { const tsi_peer_property* alt_name_property = &property->value.list.children[i]; if (alt_name_property->type != TSI_PEER_PROPERTY_TYPE_STRING) { @@ -1313,5 +1366,20 @@ int tsi_ssl_peer_matches_name(const tsi_peer* peer, const char* name) { return 1; } } + + /* If there's no SAN, try the CN. */ + if (san_count == 0) { + property = tsi_peer_get_property_by_name( + peer, TSI_X509_SUBJECT_COMMON_NAME_PEER_PROPERTY); + if (property == NULL || property->type != TSI_PEER_PROPERTY_TYPE_STRING) { + gpr_log(GPR_ERROR, "Invalid x509 subject common name property."); + return 0; + } + if (does_entry_match_name(property->value.string.data, + property->value.string.length, name)) { + return 1; + } + } + return 0; /* Not found. */ } diff --git a/src/core/tsi/ssl_transport_security.h b/src/core/tsi/ssl_transport_security.h index 3c1c4c01a2..eecf2d7c2d 100644 --- a/src/core/tsi/ssl_transport_security.h +++ b/src/core/tsi/ssl_transport_security.h @@ -158,7 +158,12 @@ tsi_result tsi_ssl_handshaker_factory_create_handshaker( while handshakers created with this factory are still in use. */ void tsi_ssl_handshaker_factory_destroy(tsi_ssl_handshaker_factory* self); -/* Util that checks that an ssl peer matches a specific name. */ +/* Util that checks that an ssl peer matches a specific name. + Still TODO(jboeuf): + - handle mixed case. + - handle %encoded chars. + - handle public suffix wildchar more strictly (e.g. *.co.uk) + - handle IP addresses in SAN. */ int tsi_ssl_peer_matches_name(const tsi_peer* peer, const char* name); #ifdef __cplusplus diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 7e564a2fba..39be35c219 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -101,15 +101,8 @@ namespace Grpc.Core.Tests using (Channel channel = new Channel(host + ":" + port)) { var call = new Call<string, string>(unaryEchoStringMethod, channel); - - var stopwatch = new Stopwatch(); - stopwatch.Start(); - for (int i = 0; i < 1000; i++) - { - Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); - } - stopwatch.Stop(); - Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms"); + BenchmarkUtil.RunBenchmark(100, 1000, + () => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); }); } server.ShutdownAsync().Wait(); diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 687be3c0cb..a365320f05 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -41,6 +41,7 @@ <Compile Include="ServerTest.cs" /> <Compile Include="GrpcEnvironmentTest.cs" /> <Compile Include="TimespecTest.cs" /> + <Compile Include="PInvokeTest.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index 8d3aef946a..596918c231 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -41,14 +41,16 @@ namespace Grpc.Core.Tests public class GrpcEnvironmentTest { [Test] - public void InitializeAndShutdownGrpcEnvironment() { + public void InitializeAndShutdownGrpcEnvironment() + { GrpcEnvironment.Initialize(); Assert.IsNotNull(GrpcEnvironment.ThreadPool.CompletionQueue); GrpcEnvironment.Shutdown(); } [Test] - public void SubsequentInvocations() { + public void SubsequentInvocations() + { GrpcEnvironment.Initialize(); GrpcEnvironment.Initialize(); GrpcEnvironment.Shutdown(); @@ -56,7 +58,8 @@ namespace Grpc.Core.Tests } [Test] - public void InitializeAfterShutdown() { + public void InitializeAfterShutdown() + { GrpcEnvironment.Initialize(); var tp1 = GrpcEnvironment.ThreadPool; GrpcEnvironment.Shutdown(); diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs new file mode 100644 index 0000000000..282d521ba3 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs @@ -0,0 +1,145 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; +using System.Runtime.InteropServices; + +namespace Grpc.Core.Tests +{ + public class PInvokeTest + { + int counter; + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_test_nop(IntPtr ptr); + + [TestFixtureSetUp] + public void Init() + { + GrpcEnvironment.Initialize(); + } + + [TestFixtureTearDown] + public void Cleanup() + { + GrpcEnvironment.Shutdown(); + } + + /// <summary> + /// (~1.26us .NET Windows) + /// </summary> + [Test] + public void CompletionQueueCreateDestroyBenchmark() + { + BenchmarkUtil.RunBenchmark( + 100000, 1000000, + () => { + CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create(); + cq.Dispose(); + } + ); + } + + + /// <summary> + /// Approximate results: + /// (~80ns Mono Linux) + /// (~110ns .NET Windows) + /// </summary> + [Test] + public void NativeCallbackBenchmark() + { + CompletionCallbackDelegate handler = Handler; + + counter = 0; + BenchmarkUtil.RunBenchmark( + 1000000, 10000000, + () => { + grpcsharp_test_callback(handler); + } + ); + Assert.AreNotEqual(0, counter); + } + + /// <summary> + /// Creating a new native-to-managed callback has significant overhead + /// compared to using an existing one. We need to be aware of this. + /// (~50us on Mono Linux!!!) + /// (~1.1us on .NET Windows) + /// </summary> + [Test] + public void NewNativeCallbackBenchmark() + { + counter = 0; + BenchmarkUtil.RunBenchmark( + 10000, 10000, + () => { + grpcsharp_test_callback(new CompletionCallbackDelegate(Handler)); + } + ); + Assert.AreNotEqual(0, counter); + } + + /// <summary> + /// Tests overhead of a simple PInvoke call. + /// (~46ns .NET Windows) + /// </summary> + [Test] + public void NopPInvokeBenchmark() + { + CompletionCallbackDelegate handler = Handler; + + BenchmarkUtil.RunBenchmark( + 1000000, 100000000, + () => { + grpcsharp_test_nop(IntPtr.Zero); + } + ); + } + + private void Handler(GRPCOpError op, IntPtr ptr) { + counter ++; + } + } +} + diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index b67332676a..ee2208e5c2 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -47,19 +47,8 @@ namespace Grpc.Core { public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) { - //TODO: implement this in real synchronous style. - try { - return AsyncUnaryCall(call, req, token).Result; - } catch(AggregateException ae) { - foreach (var e in ae.InnerExceptions) - { - if (e is RpcException) - { - throw e; - } - } - throw; - } + var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); + return asyncCall.UnaryCall(call.Channel, call.MethodName, req); } public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 4ad32e10e4..183c442358 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -1,4 +1,4 @@ -<?xml version="1.0" encoding="utf-8"?> +<?xml version="1.0" encoding="utf-8"?> <Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> <PropertyGroup> <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> @@ -33,6 +33,7 @@ <Reference Include="System" /> </ItemGroup> <ItemGroup> + <Compile Include="Internal\GrpcLog.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="RpcException.cs" /> <Compile Include="Calls.cs" /> @@ -62,10 +63,20 @@ <Compile Include="Internal\ClientStreamingInputObserver.cs" /> <Compile Include="Internal\ServerStreamingOutputObserver.cs" /> <Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" /> + <Compile Include="Utils\BenchmarkUtil.cs" /> + <Compile Include="Utils\ExceptionHelper.cs" /> </ItemGroup> + <Choose> + <!-- Under Windows, automatically copy the C core library to output dir. + Under Monodevelop it's not supported so it has no effect. --> + <When Condition=" '$(Platform)' == 'AnyCPU' "> + <ItemGroup> + <Content Include="..\..\..\vsprojects\vs2013\Debug\grpc_csharp_ext.dll"> + <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> + </Content> + </ItemGroup> + </When> + <Otherwise/> + </Choose> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> - <ItemGroup> - <Folder Include="Internal\" /> - <Folder Include="Utils\" /> - </ItemGroup> </Project>
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 0e3a0a581c..d3a8da4729 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -107,6 +107,7 @@ namespace Grpc.Core /// </summary> private GrpcEnvironment() { + GrpcLog.RedirectNativeLogs(Console.Error); grpcsharp_init(); threadPool = new GrpcThreadPool(THREAD_POOL_SIZE); threadPool.Start(); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 5e96092e27..6f37b059f7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -38,6 +38,7 @@ using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { @@ -112,6 +113,36 @@ namespace Grpc.Core.Internal InitializeInternal(call, true); } + public TRead UnaryCall(Channel channel, String methodName, TWrite msg) + { + using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) + { + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + unaryResponseTcs = new TaskCompletionSource<TRead>(); + + lock (myLock) + { + Initialize(channel, cq, methodName); + started = true; + halfcloseRequested = true; + readingDone = true; + } + call.BlockingUnary(cq, payload, unaryResponseHandler); + + try + { + // Once the blocking call returns, the result should be available synchronously. + return unaryResponseTcs.Task.Result; + } + catch (AggregateException ae) + { + throw ExceptionHelper.UnwrapRpcException(ae); + } + } + } + public Task<TRead> UnaryCallAsync(TWrite msg) { lock (myLock) @@ -150,6 +181,7 @@ namespace Grpc.Core.Internal { started = true; halfcloseRequested = true; + halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. this.readObserver = readObserver; @@ -513,6 +545,8 @@ namespace Grpc.Core.Internal } observer = readObserver; status = finishedStatus; + + ReleaseResourcesIfPossible(); } // TODO: wrap deserialization... diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 659a383b4b..1c0bc98f06 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -63,6 +63,11 @@ namespace Grpc.Core.Internal byte[] send_buffer, UIntPtr send_buffer_len); [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); @@ -113,6 +118,11 @@ namespace Grpc.Core.Internal AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length))); } + public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback) + { + grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong) payload.Length)); + } + public void StartClientStreaming(CompletionCallbackDelegate callback) { AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); diff --git a/src/csharp/Grpc.Core/Internal/GrpcLog.cs b/src/csharp/Grpc.Core/Internal/GrpcLog.cs new file mode 100644 index 0000000000..98768d05c6 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/GrpcLog.cs @@ -0,0 +1,94 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.IO; +using System.Runtime.InteropServices; +using System.Threading; + +namespace Grpc.Core.Internal +{ + internal delegate void GprLogDelegate(IntPtr fileStringPtr, Int32 line, UInt64 threadId, IntPtr severityStringPtr, IntPtr msgPtr); + + /// <summary> + /// Logs from gRPC C core library can get lost if your application is not a console app. + /// This class allows redirection of logs to arbitrary destination. + /// </summary> + internal static class GrpcLog + { + static object staticLock = new object(); + static GprLogDelegate writeCallback; + static TextWriter dest; + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_redirect_log(GprLogDelegate callback); + + /// <summary> + /// Sets text writer as destination for logs from native gRPC C core library. + /// Only first invocation has effect. + /// </summary> + /// <param name="textWriter"></param> + public static void RedirectNativeLogs(TextWriter textWriter) + { + lock (staticLock) + { + if (writeCallback == null) + { + writeCallback = new GprLogDelegate(HandleWrite); + dest = textWriter; + grpcsharp_redirect_log(writeCallback); + } + } + } + + private static void HandleWrite(IntPtr fileStringPtr, Int32 line, UInt64 threadId, IntPtr severityStringPtr, IntPtr msgPtr) + { + try + { + // TODO: DateTime format used here is different than in C core. + dest.WriteLine(string.Format("{0}{1} {2} {3}:{4}: {5}", + Marshal.PtrToStringAnsi(severityStringPtr), DateTime.Now, + threadId, + Marshal.PtrToStringAnsi(fileStringPtr), + line, + Marshal.PtrToStringAnsi(msgPtr))); + } + catch (Exception e) + { + Console.WriteLine("Caught exception in native callback " + e); + } + } + } +} diff --git a/src/csharp/Grpc.Core/RpcException.cs b/src/csharp/Grpc.Core/RpcException.cs index 5a9d0039bc..e1cf64ca56 100644 --- a/src/csharp/Grpc.Core/RpcException.cs +++ b/src/csharp/Grpc.Core/RpcException.cs @@ -49,7 +49,8 @@ namespace Grpc.Core this.status = status; } - public Status Status { + public Status Status + { get { return status; diff --git a/src/csharp/Grpc.Core/ServerCallHandler.cs b/src/csharp/Grpc.Core/ServerCallHandler.cs index 1296947f34..289f97aece 100644 --- a/src/csharp/Grpc.Core/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/ServerCallHandler.cs @@ -111,6 +111,8 @@ namespace Grpc.Core var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>()); + // TODO: this makes the call finish before all reads can be done which causes trouble + // in AsyncCall.HandleReadFinished callback. Revisit this. asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait(); finishedTask.Wait(); diff --git a/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs new file mode 100644 index 0000000000..3f0dae84cf --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs @@ -0,0 +1,68 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Diagnostics; + +namespace Grpc.Core.Utils +{ + public static class BenchmarkUtil + { + /// <summary> + /// Runs a simple benchmark preceded by warmup phase. + /// </summary> + public static void RunBenchmark(int warmupIterations, int benchmarkIterations, Action action) + { + Console.WriteLine("Warmup iterations: " + warmupIterations); + for (int i = 0; i < warmupIterations; i++) + { + action(); + } + + Console.WriteLine("Benchmark iterations: " + benchmarkIterations); + var stopwatch = new Stopwatch(); + stopwatch.Start(); + for (int i = 0; i < benchmarkIterations; i++) + { + action(); + } + stopwatch.Stop(); + Console.WriteLine("Elapsed time: " + stopwatch.ElapsedMilliseconds + "ms"); + Console.WriteLine("Ops per second: " + (int) ((double) benchmarkIterations * 1000 / stopwatch.ElapsedMilliseconds)); + } + } +} + diff --git a/src/csharp/Grpc.Core/Utils/ExceptionHelper.cs b/src/csharp/Grpc.Core/Utils/ExceptionHelper.cs new file mode 100644 index 0000000000..18702e1cc4 --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/ExceptionHelper.cs @@ -0,0 +1,57 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core.Utils +{ + public static class ExceptionHelper + { + /// <summary> + /// If inner exceptions contain RpcException, rethrows it. + /// Otherwise, rethrows the original aggregate exception. + /// Always throws, the exception return type is here only to make the. + /// </summary> + public static Exception UnwrapRpcException(AggregateException ae) { + foreach (var e in ae.InnerExceptions) + { + if (e is RpcException) + { + throw e; + } + } + throw ae; + } + } +} + diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs index 462fab4454..76a08ce518 100644 --- a/src/csharp/Grpc.Examples/MathServiceImpl.cs +++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs @@ -127,8 +127,7 @@ namespace math public void OnCompleted() { - Task.Factory.StartNew(() => - responseObserver.OnCompleted()); + responseObserver.OnCompleted(); } public void OnError(Exception error) @@ -138,13 +137,7 @@ namespace math public void OnNext(DivArgs value) { - // TODO: currently we need this indirection because - // responseObserver waits for write to finish, this - // callback is called from grpc threadpool which - // currently only has one thread. - // Same story for OnCompleted(). - Task.Factory.StartNew(() => - responseObserver.OnNext(DivInternal(value))); + responseObserver.OnNext(DivInternal(value)); } } } diff --git a/src/csharp/Grpc.IntegrationTesting/Client.cs b/src/csharp/Grpc.IntegrationTesting/Client.cs index bb650a112d..fa1c7cd051 100644 --- a/src/csharp/Grpc.IntegrationTesting/Client.cs +++ b/src/csharp/Grpc.IntegrationTesting/Client.cs @@ -33,7 +33,9 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Text.RegularExpressions; +using System.Threading.Tasks; using Google.ProtocolBuffers; using Grpc.Core; using Grpc.Core.Utils; @@ -128,12 +130,15 @@ namespace Grpc.IntegrationTesting case "empty_stream": RunEmptyStream(client); break; + case "benchmark_empty_unary": + RunBenchmarkEmptyUnary(client); + break; default: throw new ArgumentException("Unknown test case " + testCase); } } - private void RunEmptyUnary(TestServiceGrpc.ITestServiceClient client) + public static void RunEmptyUnary(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running empty_unary"); var response = client.EmptyCall(Empty.DefaultInstance); @@ -141,7 +146,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } - private void RunLargeUnary(TestServiceGrpc.ITestServiceClient client) + public static void RunLargeUnary(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running large_unary"); var request = SimpleRequest.CreateBuilder() @@ -157,7 +162,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } - private void RunClientStreaming(TestServiceGrpc.ITestServiceClient client) + public static void RunClientStreaming(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running client_streaming"); @@ -176,7 +181,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } - private void RunServerStreaming(TestServiceGrpc.ITestServiceClient client) + public static void RunServerStreaming(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running server_streaming"); @@ -201,7 +206,7 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } - private void RunPingPong(TestServiceGrpc.ITestServiceClient client) + public static void RunPingPong(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running ping_pong"); @@ -230,7 +235,7 @@ namespace Grpc.IntegrationTesting inputs.OnNext(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2635)) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653)) .SetPayload(CreateZerosPayload(1828)).Build()); response = recorder.Queue.Take(); @@ -247,13 +252,15 @@ namespace Grpc.IntegrationTesting Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); Assert.AreEqual(58979, response.Payload.Body.Length); + inputs.OnCompleted(); + recorder.Finished.Wait(); Assert.AreEqual(0, recorder.Queue.Count); Console.WriteLine("Passed!"); } - private void RunEmptyStream(TestServiceGrpc.ITestServiceClient client) + public static void RunEmptyStream(TestServiceGrpc.ITestServiceClient client) { Console.WriteLine("running empty_stream"); @@ -267,8 +274,14 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } + // This is not an official interop test, but it's useful. + public static void RunBenchmarkEmptyUnary(TestServiceGrpc.ITestServiceClient client) + { + BenchmarkUtil.RunBenchmark(10000, 10000, + () => { client.EmptyCall(Empty.DefaultInstance);}); + } - private Payload CreateZerosPayload(int size) { + private static Payload CreateZerosPayload(int size) { return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build(); } diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 9b46a644bc..e66f708a94 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -47,6 +47,8 @@ <Compile Include="TestServiceGrpc.cs" /> <Compile Include="Empty.cs" /> <Compile Include="Messages.cs" /> + <Compile Include="InteropClientServerTest.cs" /> + <Compile Include="TestServiceImpl.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs new file mode 100644 index 0000000000..87d25b0a98 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs @@ -0,0 +1,119 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Utils; +using NUnit.Framework; +using grpc.testing; + +namespace Grpc.IntegrationTesting +{ + /// <summary> + /// Runs interop tests in-process. + /// </summary> + public class InteropClientServerTest + { + string host = "localhost"; + Server server; + Channel channel; + TestServiceGrpc.ITestServiceClient client; + + [TestFixtureSetUp] + public void Init() + { + GrpcEnvironment.Initialize(); + + server = new Server(); + server.AddServiceDefinition(TestServiceGrpc.BindService(new TestServiceImpl())); + int port = server.AddPort(host + ":0"); + server.Start(); + channel = new Channel(host + ":" + port); + client = TestServiceGrpc.NewStub(channel); + } + + [TestFixtureTearDown] + public void Cleanup() + { + channel.Dispose(); + + server.ShutdownAsync().Wait(); + GrpcEnvironment.Shutdown(); + } + + [Test] + public void EmptyUnary() + { + Client.RunEmptyUnary(client); + } + + [Test] + public void LargeUnary() + { + Client.RunEmptyUnary(client); + } + + [Test] + public void ClientStreaming() + { + Client.RunClientStreaming(client); + } + + [Test] + public void ServerStreaming() + { + Client.RunServerStreaming(client); + } + + [Test] + public void PingPong() + { + Client.RunPingPong(client); + } + + [Test] + public void EmptyStream() + { + Client.RunEmptyStream(client); + } + + // TODO: add cancel_after_begin + + // TODO: add cancel_after_first_response + + } +} + diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs new file mode 100644 index 0000000000..176843b130 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs @@ -0,0 +1,140 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Google.ProtocolBuffers; +using Grpc.Core.Utils; + +namespace grpc.testing +{ + /// <summary> + /// Implementation of TestService server + /// </summary> + public class TestServiceImpl : TestServiceGrpc.ITestService + { + public void EmptyCall(Empty request, IObserver<Empty> responseObserver) + { + responseObserver.OnNext(Empty.DefaultInstance); + responseObserver.OnCompleted(); + } + + public void UnaryCall(SimpleRequest request, IObserver<SimpleResponse> responseObserver) + { + var response = SimpleResponse.CreateBuilder() + .SetPayload(CreateZerosPayload(request.ResponseSize)).Build(); + //TODO: check we support ReponseType + responseObserver.OnNext(response); + responseObserver.OnCompleted(); + } + + public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver) + { + foreach(var responseParam in request.ResponseParametersList) + { + var response = StreamingOutputCallResponse.CreateBuilder() + .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); + responseObserver.OnNext(response); + } + responseObserver.OnCompleted(); + } + + public IObserver<StreamingInputCallRequest> StreamingInputCall(IObserver<StreamingInputCallResponse> responseObserver) + { + var recorder = new RecordingObserver<StreamingInputCallRequest>(); + Task.Run(() => { + int sum = 0; + foreach(var req in recorder.ToList().Result) + { + sum += req.Payload.Body.Length; + } + var response = StreamingInputCallResponse.CreateBuilder() + .SetAggregatedPayloadSize(sum).Build(); + responseObserver.OnNext(response); + responseObserver.OnCompleted(); + }); + return recorder; + } + + public IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver) + { + return new FullDuplexObserver(responseObserver); + } + + public IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver) + { + throw new NotImplementedException(); + } + + private class FullDuplexObserver : IObserver<StreamingOutputCallRequest> { + + readonly IObserver<StreamingOutputCallResponse> responseObserver; + + public FullDuplexObserver(IObserver<StreamingOutputCallResponse> responseObserver) + { + this.responseObserver = responseObserver; + } + + public void OnCompleted() + { + responseObserver.OnCompleted(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnNext(StreamingOutputCallRequest value) + { + // TODO: this is not in order!!! + //Task.Factory.StartNew(() => { + + foreach(var responseParam in value.ResponseParametersList) + { + var response = StreamingOutputCallResponse.CreateBuilder() + .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); + responseObserver.OnNext(response); + } + //}); + } + } + + private static Payload CreateZerosPayload(int size) { + return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build(); + } + } +} + diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 5f9f22cab1..8f5a414187 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -35,9 +35,10 @@ #include <grpc/support/port_platform.h> #include <grpc/support/alloc.h> -#include <grpc/grpc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> +#include <grpc/support/thd.h> +#include <grpc/grpc.h> #include <string.h> @@ -343,6 +344,23 @@ grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback, return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } +/* Synchronous unary call */ +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_call_blocking_unary(grpc_call *call, + grpc_completion_queue *dedicated_cq, + callback_funcptr callback, + const char *send_buffer, size_t send_buffer_len) { + GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer, + send_buffer_len) == GRPC_CALL_OK); + + /* TODO: we would like to use pluck, but we don't know the tag */ + GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == + GRPC_OP_COMPLETE); + grpc_completion_queue_shutdown(dedicated_cq); + GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == + GRPC_QUEUE_SHUTDOWN); +} + GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming(grpc_call *call, callback_funcptr callback) { @@ -566,3 +584,32 @@ grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq, server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details), &(ctx->server_rpc_new.request_metadata), cq, ctx); } + +/* Logging */ + +typedef void(GPR_CALLTYPE *grpcsharp_log_func)(const char *file, gpr_int32 line, + gpr_uint64 thd_id, + const char *severity_string, + const char *msg); +static grpcsharp_log_func log_func = NULL; + +/* Redirects gpr_log to log_func callback */ +static void grpcsharp_log_handler(gpr_log_func_args *args) { + log_func(args->file, args->line, gpr_thd_currentid(), + gpr_log_severity_string(args->severity), args->message); +} + +GPR_EXPORT void GPR_CALLTYPE grpcsharp_redirect_log(grpcsharp_log_func func) { + GPR_ASSERT(func); + log_func = func; + gpr_set_log_function(grpcsharp_log_handler); +} + +/* For testing */ +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_test_callback(callback_funcptr callback) { + callback(GRPC_OP_OK, NULL); +} + +/* For testing */ +GPR_EXPORT void *GPR_CALLTYPE grpcsharp_test_nop(void *ptr) { return ptr; } diff --git a/src/node/README.md b/src/node/README.md index 8880213e9a..5b3de6b4f6 100644 --- a/src/node/README.md +++ b/src/node/README.md @@ -4,6 +4,10 @@ Alpha : Ready for early adopters +## Prerequisites + +This requires `node` to be installed. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package. + ## Installation First, clone this repository (NPM package coming soon). Then follow the instructions in the `INSTALL` file in the root of the repository to install the C core library that this package depends on. diff --git a/src/node/binding.gyp b/src/node/binding.gyp index fb4c779f8e..5c34be24ff 100644 --- a/src/node/binding.gyp +++ b/src/node/binding.gyp @@ -7,7 +7,7 @@ "targets" : [ { 'include_dirs': [ - "<!(nodejs -e \"require('nan')\")" + "<!(node -e \"require('nan')\")" ], 'cflags': [ '-std=c++11', diff --git a/src/node/examples/pubsub/pubsub_demo.js b/src/node/examples/pubsub/pubsub_demo.js index 4f7a9a92ee..26301515f0 100644 --- a/src/node/examples/pubsub/pubsub_demo.js +++ b/src/node/examples/pubsub/pubsub_demo.js @@ -35,7 +35,7 @@ var async = require('async'); var fs = require('fs'); -var GoogleAuth = require('googleauth'); +var GoogleAuth = require('google-auth-library'); var parseArgs = require('minimist'); var strftime = require('strftime'); var _ = require('underscore'); diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc index 6c7a89e596..bc9461d7df 100644 --- a/src/node/ext/channel.cc +++ b/src/node/ext/channel.cc @@ -103,11 +103,15 @@ NAN_METHOD(Channel::New) { grpc_channel *wrapped_channel; // Owned by the Channel object NanUtf8String *host = new NanUtf8String(args[0]); + NanUtf8String *host_override = NULL; if (args[1]->IsUndefined()) { wrapped_channel = grpc_channel_create(**host, NULL); } else if (args[1]->IsObject()) { grpc_credentials *creds = NULL; Handle<Object> args_hash(args[1]->ToObject()->Clone()); + if (args_hash->HasOwnProperty(NanNew(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG))) { + host_override = new NanUtf8String(args_hash->Get(NanNew(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG))); + } if (args_hash->HasOwnProperty(NanNew("credentials"))) { Handle<Value> creds_value = args_hash->Get(NanNew("credentials")); if (!Credentials::HasInstance(creds_value)) { @@ -155,7 +159,12 @@ NAN_METHOD(Channel::New) { } else { return NanThrowTypeError("Channel expects a string and an object"); } - Channel *channel = new Channel(wrapped_channel, host); + Channel *channel; + if (host_override == NULL) { + channel = new Channel(wrapped_channel, host); + } else { + channel = new Channel(wrapped_channel, host_override); + } channel->Wrap(args.This()); NanReturnValue(args.This()); } else { diff --git a/src/node/index.js b/src/node/index.js index 4b5302e438..ad3dd96af7 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -78,7 +78,7 @@ function load(filename) { /** * Get a function that a client can use to update metadata with authentication * information from a Google Auth credential object, which comes from the - * googleauth library. + * google-auth-library. * @param {Object} credential The credential object to use * @return {function(Object, callback)} Metadata updater function */ diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index eaf254bcfe..8060baf827 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -37,7 +37,7 @@ var fs = require('fs'); var path = require('path'); var grpc = require('..'); var testProto = grpc.load(__dirname + '/test.proto').grpc.testing; -var GoogleAuth = require('googleauth'); +var GoogleAuth = require('google-auth-library'); var assert = require('assert'); diff --git a/src/node/package.json b/src/node/package.json index e6ac550554..e9995e7f0c 100644 --- a/src/node/package.json +++ b/src/node/package.json @@ -1,10 +1,10 @@ { "name": "grpc", - "version": "0.2.0", + "version": "0.5.0", "description": "gRPC Library for Node", "scripts": { - "lint": "nodejs ./node_modules/jshint/bin/jshint src test examples interop index.js", - "test": "nodejs ./node_modules/mocha/bin/mocha && npm run-script lint" + "lint": "node ./node_modules/jshint/bin/jshint src test examples interop index.js", + "test": "node ./node_modules/mocha/bin/mocha && npm run-script lint" }, "dependencies": { "bindings": "^1.2.1", @@ -16,7 +16,7 @@ }, "devDependencies": { "async": "^0.9.0", - "googleauth": "google/google-auth-library-nodejs", + "google-auth-library": "^0.9.2", "minimist": "^1.1.0", "mocha": "~1.21.0", "strftime": "^0.8.2" diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index 8dc933eac5..6b3aa3dd84 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -40,7 +40,7 @@ var server; var port; -var name_override = 'foo.test.google.com'; +var name_override = 'foo.test.google.fr'; describe('Interop tests', function() { before(function(done) { diff --git a/src/php/ext/grpc/credentials.c b/src/php/ext/grpc/credentials.c index f25e042dd7..6d8f59fa33 100644 --- a/src/php/ext/grpc/credentials.c +++ b/src/php/ext/grpc/credentials.c @@ -94,7 +94,7 @@ zval *grpc_php_wrap_credentials(grpc_credentials *wrapped) { * @return Credentials The new default credentials object */ PHP_METHOD(Credentials, createDefault) { - grpc_credentials *creds = grpc_default_credentials_create(); + grpc_credentials *creds = grpc_google_default_credentials_create(); zval *creds_object = grpc_php_wrap_credentials(creds); RETURN_DESTROY_ZVAL(creds_object); } diff --git a/src/php/ext/grpc/event.c b/src/php/ext/grpc/event.c index 8d398450a4..452c4b8bcb 100644 --- a/src/php/ext/grpc/event.c +++ b/src/php/ext/grpc/event.c @@ -90,10 +90,6 @@ zval *grpc_php_convert_event(grpc_event *event) { add_property_stringl(event_object, "data", read_string, read_len, true); } break; - case GRPC_INVOKE_ACCEPTED: - add_property_long(event_object, "data", - (long)event->data.invoke_accepted); - break; case GRPC_WRITE_ACCEPTED: add_property_long(event_object, "data", (long)event->data.write_accepted); break; diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index 5a09fc7d78..82ca438169 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -215,7 +215,7 @@ $stub = new grpc\testing\TestServiceClient( new Grpc\BaseStub( $server_address, [ - 'grpc.ssl_target_name_override' => 'foo.test.google.com', + 'grpc.ssl_target_name_override' => 'foo.test.google.fr', 'credentials' => $credentials ])); diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php index b19ac80ddd..c23dd791ac 100755 --- a/src/php/tests/unit_tests/SecureEndToEndTest.php +++ b/src/php/tests/unit_tests/SecureEndToEndTest.php @@ -47,7 +47,7 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $this->channel = new Grpc\Channel( 'localhost:' . $port, [ - 'grpc.ssl_target_name_override' => 'foo.test.google.com', + 'grpc.ssl_target_name_override' => 'foo.test.google.fr', 'credentials' => $credentials ]); } diff --git a/src/python/README.md b/src/python/README.md index be2f2bedf9..0ead86b91e 100755 --- a/src/python/README.md +++ b/src/python/README.md @@ -1,9 +1,14 @@ -GRPC Python +gRPC Python ========= -The Python facility of GRPC. +The Python facility of gRPC. +Status +------- + +Usable with limitations, Pre-Alpha + Prerequisites ----------------------- @@ -13,8 +18,8 @@ Python 2.7, virtualenv, pip, libprotobuf-dev, and libprotoc-dev. Building from source ---------------------- -- Build the GRPC core -E.g, from the root of the grpc [git repo](https://github.com/google/grpc) +- Build the gRPC core from the root of the + [gRPC git repo](https://github.com/grpc/grpc) ``` $ make shared_c static_c ``` @@ -28,7 +33,7 @@ $ tools/run_tests/build_python.sh Testing ----------------------- -- Use run_python.sh to run GRPC as it was installed into the virtual environment +- Use run_python.sh to run gRPC as it was installed into the virtual environment ``` $ tools/run_tests/run_python.sh ``` diff --git a/src/python/interop/interop/client.py b/src/python/interop/interop/client.py new file mode 100644 index 0000000000..f4a449ef9e --- /dev/null +++ b/src/python/interop/interop/client.py @@ -0,0 +1,86 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""The Python implementation of the GRPC interoperability test client.""" + +import argparse + +from grpc.early_adopter import implementations + +from interop import methods +from interop import resources + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 + + +def _args(): + parser = argparse.ArgumentParser() + parser.add_argument( + '--server_host', help='the host to which to connect', type=str) + parser.add_argument( + '--server_host_override', + help='the server host to which to claim to connect', type=str) + parser.add_argument( + '--server_port', help='the port to which to connect', type=int) + parser.add_argument( + '--test_case', help='the test case to execute', type=str) + parser.add_argument( + '--use_tls', help='require a secure connection', dest='use_tls', + action='store_true') + parser.add_argument( + '--use_test_ca', help='replace platform root CAs with ca.pem', + action='store_true') + return parser.parse_args() + + +def _stub(args): + if args.use_tls: + if args.use_test_ca: + root_certificates = resources.test_root_certificates() + else: + root_certificates = resources.prod_root_certificates() + # TODO(nathaniel): server host override. + + stub = implementations.secure_stub( + methods.CLIENT_METHODS, args.server_host, args.server_port, + root_certificates, None, None) + else: + stub = implementations.insecure_stub( + methods.CLIENT_METHODS, args.server_host, args.server_port) + return stub + + +def _test_interoperability(): + args = _args() + stub = _stub(args) + methods.test_interoperability(args.test_case, stub) + + +if __name__ == '__main__': + _test_interoperability() diff --git a/src/python/interop/interop/credentials/ca.pem b/src/python/interop/interop/credentials/ca.pem new file mode 100755 index 0000000000..6c8511a73c --- /dev/null +++ b/src/python/interop/interop/credentials/ca.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICSjCCAbOgAwIBAgIJAJHGGR4dGioHMA0GCSqGSIb3DQEBCwUAMFYxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQxDzANBgNVBAMTBnRlc3RjYTAeFw0xNDExMTEyMjMxMjla +Fw0yNDExMDgyMjMxMjlaMFYxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0 +YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMT +BnRlc3RjYTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAwEDfBV5MYdlHVHJ7 ++L4nxrZy7mBfAVXpOc5vMYztssUI7mL2/iYujiIXM+weZYNTEpLdjyJdu7R5gGUu +g1jSVK/EPHfc74O7AyZU34PNIP4Sh33N+/A5YexrNgJlPY+E3GdVYi4ldWJjgkAd +Qah2PH5ACLrIIC6tRka9hcaBlIECAwEAAaMgMB4wDAYDVR0TBAUwAwEB/zAOBgNV +HQ8BAf8EBAMCAgQwDQYJKoZIhvcNAQELBQADgYEAHzC7jdYlzAVmddi/gdAeKPau +sPBG/C2HCWqHzpCUHcKuvMzDVkY/MP2o6JIW2DBbY64bO/FceExhjcykgaYtCH/m +oIU63+CFOTtR7otyQAWHqXa7q4SbCDlG7DyRFxqG0txPtGvy12lgldA2+RgcigQG +Dfcog5wrJytaQ6UA0wE= +-----END CERTIFICATE----- diff --git a/src/python/interop/interop/methods.py b/src/python/interop/interop/methods.py index 26c1869f93..4da28ee775 100644 --- a/src/python/interop/interop/methods.py +++ b/src/python/interop/interop/methods.py @@ -29,52 +29,57 @@ """Implementations of interoperability test methods.""" +import threading + from grpc.early_adopter import utilities from interop import empty_pb2 from interop import messages_pb2 -def _empty_call(request): +_TIMEOUT = 7 + + +def _empty_call(request, unused_context): return empty_pb2.Empty() -_CLIENT_EMPTY_CALL = utilities.unary_unary_client_rpc_method( +_CLIENT_EMPTY_CALL = utilities.unary_unary_invocation_description( empty_pb2.Empty.SerializeToString, empty_pb2.Empty.FromString) -_SERVER_EMPTY_CALL = utilities.unary_unary_server_rpc_method( +_SERVER_EMPTY_CALL = utilities.unary_unary_service_description( _empty_call, empty_pb2.Empty.FromString, empty_pb2.Empty.SerializeToString) -def _unary_call(request): +def _unary_call(request, unused_context): return messages_pb2.SimpleResponse( payload=messages_pb2.Payload( type=messages_pb2.COMPRESSABLE, body=b'\x00' * request.response_size)) -_CLIENT_UNARY_CALL = utilities.unary_unary_client_rpc_method( +_CLIENT_UNARY_CALL = utilities.unary_unary_invocation_description( messages_pb2.SimpleRequest.SerializeToString, messages_pb2.SimpleResponse.FromString) -_SERVER_UNARY_CALL = utilities.unary_unary_server_rpc_method( +_SERVER_UNARY_CALL = utilities.unary_unary_service_description( _unary_call, messages_pb2.SimpleRequest.FromString, messages_pb2.SimpleResponse.SerializeToString) -def _streaming_output_call(request): +def _streaming_output_call(request, unused_context): for response_parameters in request.response_parameters: yield messages_pb2.StreamingOutputCallResponse( payload=messages_pb2.Payload( type=request.response_type, body=b'\x00' * response_parameters.size)) -_CLIENT_STREAMING_OUTPUT_CALL = utilities.unary_stream_client_rpc_method( +_CLIENT_STREAMING_OUTPUT_CALL = utilities.unary_stream_invocation_description( messages_pb2.StreamingOutputCallRequest.SerializeToString, messages_pb2.StreamingOutputCallResponse.FromString) -_SERVER_STREAMING_OUTPUT_CALL = utilities.unary_stream_server_rpc_method( +_SERVER_STREAMING_OUTPUT_CALL = utilities.unary_stream_service_description( _streaming_output_call, messages_pb2.StreamingOutputCallRequest.FromString, messages_pb2.StreamingOutputCallResponse.SerializeToString) -def _streaming_input_call(request_iterator): +def _streaming_input_call(request_iterator, unused_context): aggregate_size = 0 for request in request_iterator: if request.payload and request.payload.body: @@ -82,35 +87,35 @@ def _streaming_input_call(request_iterator): return messages_pb2.StreamingInputCallResponse( aggregated_payload_size=aggregate_size) -_CLIENT_STREAMING_INPUT_CALL = utilities.stream_unary_client_rpc_method( +_CLIENT_STREAMING_INPUT_CALL = utilities.stream_unary_invocation_description( messages_pb2.StreamingInputCallRequest.SerializeToString, messages_pb2.StreamingInputCallResponse.FromString) -_SERVER_STREAMING_INPUT_CALL = utilities.stream_unary_server_rpc_method( +_SERVER_STREAMING_INPUT_CALL = utilities.stream_unary_service_description( _streaming_input_call, messages_pb2.StreamingInputCallRequest.FromString, messages_pb2.StreamingInputCallResponse.SerializeToString) -def _full_duplex_call(request_iterator): +def _full_duplex_call(request_iterator, unused_context): for request in request_iterator: yield messages_pb2.StreamingOutputCallResponse( payload=messages_pb2.Payload( type=request.payload.type, body=b'\x00' * request.response_parameters[0].size)) -_CLIENT_FULL_DUPLEX_CALL = utilities.stream_stream_client_rpc_method( +_CLIENT_FULL_DUPLEX_CALL = utilities.stream_stream_invocation_description( messages_pb2.StreamingOutputCallRequest.SerializeToString, messages_pb2.StreamingOutputCallResponse.FromString) -_SERVER_FULL_DUPLEX_CALL = utilities.stream_stream_server_rpc_method( +_SERVER_FULL_DUPLEX_CALL = utilities.stream_stream_service_description( _full_duplex_call, messages_pb2.StreamingOutputCallRequest.FromString, messages_pb2.StreamingOutputCallResponse.SerializeToString) # NOTE(nathaniel): Apparently this is the same as the full-duplex call? -_CLIENT_HALF_DUPLEX_CALL = utilities.stream_stream_client_rpc_method( +_CLIENT_HALF_DUPLEX_CALL = utilities.stream_stream_invocation_description( messages_pb2.StreamingOutputCallRequest.SerializeToString, messages_pb2.StreamingOutputCallResponse.FromString) -_SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_server_rpc_method( +_SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_service_description( _full_duplex_call, messages_pb2.StreamingOutputCallRequest.FromString, messages_pb2.StreamingOutputCallResponse.SerializeToString) @@ -142,3 +147,134 @@ SERVER_METHODS = { FULL_DUPLEX_CALL_METHOD_NAME: _SERVER_FULL_DUPLEX_CALL, HALF_DUPLEX_CALL_METHOD_NAME: _SERVER_HALF_DUPLEX_CALL, } + + +def _empty_unary(stub): + with stub: + response = stub.EmptyCall(empty_pb2.Empty(), _TIMEOUT) + if not isinstance(response, empty_pb2.Empty): + raise TypeError( + 'response is of type "%s", not empty_pb2.Empty!', type(response)) + + +def _large_unary(stub): + with stub: + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, response_size=314159, + payload=messages_pb2.Payload(body=b'\x00' * 271828)) + response_future = stub.UnaryCall.async(request, _TIMEOUT) + response = response_future.result() + if response.payload.type is not messages_pb2.COMPRESSABLE: + raise ValueError( + 'response payload type is "%s"!' % type(response.payload.type)) + if len(response.payload.body) != 314159: + raise ValueError( + 'response body of incorrect size %d!' % len(response.payload.body)) + + +def _client_streaming(stub): + with stub: + payload_body_sizes = (27182, 8, 1828, 45904) + payloads = ( + messages_pb2.Payload(body=b'\x00' * size) + for size in payload_body_sizes) + requests = ( + messages_pb2.StreamingInputCallRequest(payload=payload) + for payload in payloads) + response = stub.StreamingInputCall(requests, _TIMEOUT) + if response.aggregated_payload_size != 74922: + raise ValueError( + 'incorrect size %d!' % response.aggregated_payload_size) + + +def _server_streaming(stub): + sizes = (31415, 9, 2653, 58979) + + with stub: + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=( + messages_pb2.ResponseParameters(size=sizes[0]), + messages_pb2.ResponseParameters(size=sizes[1]), + messages_pb2.ResponseParameters(size=sizes[2]), + messages_pb2.ResponseParameters(size=sizes[3]), + )) + response_iterator = stub.StreamingOutputCall(request, _TIMEOUT) + for index, response in enumerate(response_iterator): + if response.payload.type != messages_pb2.COMPRESSABLE: + raise ValueError( + 'response body of invalid type %s!' % response.payload.type) + if len(response.payload.body) != sizes[index]: + raise ValueError( + 'response body of invalid size %d!' % len(response.payload.body)) + + +class _Pipe(object): + + def __init__(self): + self._condition = threading.Condition() + self._values = [] + self._open = True + + def __iter__(self): + return self + + def next(self): + with self._condition: + while not self._values and self._open: + self._condition.wait() + if self._values: + return self._values.pop(0) + else: + raise StopIteration() + + def add(self, value): + with self._condition: + self._values.append(value) + self._condition.notify() + + def close(self): + with self._condition: + self._open = False + self._condition.notify() + + +def _ping_pong(stub): + request_response_sizes = (31415, 9, 2653, 58979) + request_payload_sizes = (27182, 8, 1828, 45904) + + with stub: + pipe = _Pipe() + response_iterator = stub.FullDuplexCall(pipe, _TIMEOUT) + print 'Starting ping-pong with response iterator %s' % response_iterator + for response_size, payload_size in zip( + request_response_sizes, request_payload_sizes): + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=(messages_pb2.ResponseParameters( + size=response_size),), + payload=messages_pb2.Payload(body=b'\x00' * payload_size)) + pipe.add(request) + response = next(response_iterator) + if response.payload.type != messages_pb2.COMPRESSABLE: + raise ValueError( + 'response body of invalid type %s!' % response.payload.type) + if len(response.payload.body) != response_size: + raise ValueError( + 'response body of invalid size %d!' % len(response.payload.body)) + pipe.close() + + +def test_interoperability(test_case, stub): + if test_case == 'empty_unary': + _empty_unary(stub) + elif test_case == 'large_unary': + _large_unary(stub) + elif test_case == 'server_streaming': + _server_streaming(stub) + elif test_case == 'client_streaming': + _client_streaming(stub) + elif test_case == 'ping_pong': + _ping_pong(stub) + else: + raise NotImplementedError('Test case "%s" not implemented!') diff --git a/src/ruby/lib/grpc/auth/signet.rb b/src/python/interop/interop/resources.py index a8bce1255c..2c3045313d 100644 --- a/src/ruby/lib/grpc/auth/signet.rb +++ b/src/python/interop/interop/resources.py @@ -27,41 +27,30 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -require 'signet/oauth_2/client' +"""Constants and functions for data used in interoperability testing.""" -module Signet - # Signet::OAuth2 supports OAuth2 authentication. - module OAuth2 - AUTH_METADATA_KEY = :Authorization - # Signet::OAuth2::Client creates an OAuth2 client - # - # Here client is re-opened to add the #apply and #apply! methods which - # update a hash map with the fetched authentication token - # - # Eventually, this change may be merged into signet itself, or some other - # package that provides Google-specific auth via signet, and this extension - # will be unnecessary. - class Client - # Updates a_hash updated with the authentication token - def apply!(a_hash, opts = {}) - # fetch the access token there is currently not one, or if the client - # has expired - fetch_access_token!(opts) if access_token.nil? || expired? - a_hash[AUTH_METADATA_KEY] = "Bearer #{access_token}" - end +import os - # Returns a clone of a_hash updated with the authentication token - def apply(a_hash, opts = {}) - a_copy = a_hash.clone - apply!(a_copy, opts) - a_copy - end +import pkg_resources - # Returns a reference to the #apply method, suitable for passing as - # a closure - def updater_proc - lambda(&method(:apply)) - end - end - end -end +_ROOT_CERTIFICATES_RESOURCE_PATH = 'credentials/ca.pem' +_PRIVATE_KEY_RESOURCE_PATH = 'credentials/server1.key' +_CERTIFICATE_CHAIN_RESOURCE_PATH = 'credentials/server1.pem' + + +def test_root_certificates(): + return pkg_resources.resource_string( + __name__, _ROOT_CERTIFICATES_RESOURCE_PATH) + + +def prod_root_certificates(): + return open(os.environ['SSL_CERT_FILE'], mode='rb').read() + + +def private_key(): + return pkg_resources.resource_string(__name__, _PRIVATE_KEY_RESOURCE_PATH) + + +def certificate_chain(): + return pkg_resources.resource_string( + __name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) diff --git a/src/python/interop/interop/server.py b/src/python/interop/interop/server.py index 785d482fe5..4e4b127a9a 100644 --- a/src/python/interop/interop/server.py +++ b/src/python/interop/interop/server.py @@ -31,18 +31,15 @@ import argparse import logging -import pkg_resources import time from grpc.early_adopter import implementations from interop import methods +from interop import resources _ONE_DAY_IN_SECONDS = 60 * 60 * 24 -_PRIVATE_KEY_RESOURCE_PATH = 'credentials/server1.key' -_CERTIFICATE_CHAIN_RESOURCE_PATH = 'credentials/server1.pem' - def serve(): parser = argparse.ArgumentParser() @@ -54,10 +51,8 @@ def serve(): args = parser.parse_args() if args.use_tls: - private_key = pkg_resources.resource_string( - __name__, _PRIVATE_KEY_RESOURCE_PATH) - certificate_chain = pkg_resources.resource_string( - __name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) + private_key = resources.private_key() + certificate_chain = resources.certificate_chain() server = implementations.secure_server( methods.SERVER_METHODS, args.port, private_key, certificate_chain) else: diff --git a/src/python/interop/setup.py b/src/python/interop/setup.py index 4b7709f234..6db5435090 100644 --- a/src/python/interop/setup.py +++ b/src/python/interop/setup.py @@ -40,7 +40,9 @@ _PACKAGE_DIRECTORIES = { } _PACKAGE_DATA = { - 'interop': ['credentials/server1.key', 'credentials/server1.pem',] + 'interop': [ + 'credentials/ca.pem', 'credentials/server1.key', + 'credentials/server1.pem',] } _INSTALL_REQUIRES = ['grpc-2015>=0.0.1'] diff --git a/src/python/src/grpc/_adapter/_c.c b/src/python/src/grpc/_adapter/_c.c index 55b9d0512c..f096a55b61 100644 --- a/src/python/src/grpc/_adapter/_c.c +++ b/src/python/src/grpc/_adapter/_c.c @@ -38,6 +38,7 @@ #include "grpc/_adapter/_channel.h" #include "grpc/_adapter/_call.h" #include "grpc/_adapter/_server.h" +#include "grpc/_adapter/_client_credentials.h" #include "grpc/_adapter/_server_credentials.h" static PyObject *init(PyObject *self) { @@ -76,6 +77,9 @@ PyMODINIT_FUNC init_c(void) { if (pygrpc_add_server(module) == -1) { return; } + if (pygrpc_add_client_credentials(module) == -1) { + return; + } if (pygrpc_add_server_credentials(module) == -1) { return; } diff --git a/src/python/src/grpc/_adapter/_c_test.py b/src/python/src/grpc/_adapter/_c_test.py index d421692ec9..d81c63e346 100644 --- a/src/python/src/grpc/_adapter/_c_test.py +++ b/src/python/src/grpc/_adapter/_c_test.py @@ -70,7 +70,7 @@ class _CTest(unittest.TestCase): def testChannel(self): _c.init() - channel = _c.Channel('test host:12345') + channel = _c.Channel('test host:12345', None) del channel _c.shut_down() @@ -81,7 +81,7 @@ class _CTest(unittest.TestCase): _c.init() - channel = _c.Channel('%s:%d' % (host, 12345)) + channel = _c.Channel('%s:%d' % (host, 12345), None) call = _c.Call(channel, method, host, time.time() + _TIMEOUT) del call del channel @@ -136,6 +136,29 @@ class _CTest(unittest.TestCase): _c.shut_down() + def test_client_credentials(self): + root_certificates = b'Trust starts here. Really.' + private_key = b'This is a really bad private key, yo.' + certificate_chain = b'Trust me! Do I not look trustworty?' + + _c.init() + + client_credentials = _c.ClientCredentials( + None, None, None) + self.assertIsNotNone(client_credentials) + client_credentials = _c.ClientCredentials( + root_certificates, None, None) + self.assertIsNotNone(client_credentials) + client_credentials = _c.ClientCredentials( + None, private_key, certificate_chain) + self.assertIsNotNone(client_credentials) + client_credentials = _c.ClientCredentials( + root_certificates, private_key, certificate_chain) + self.assertIsNotNone(client_credentials) + del client_credentials + + _c.shut_down() + def test_server_credentials(self): root_certificates = b'Trust starts here. Really.' first_private_key = b'This is a really bad private key, yo.' diff --git a/src/python/src/grpc/_adapter/_call.c b/src/python/src/grpc/_adapter/_call.c index 325d3d5bbd..dca2e49373 100644 --- a/src/python/src/grpc/_adapter/_call.c +++ b/src/python/src/grpc/_adapter/_call.c @@ -161,7 +161,7 @@ static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) { } static const PyObject *pygrpc_call_premetadata(Call *self) { - /* TODO(b/18702680): Actually support metadata. */ + /* TODO(nathaniel): Metadata support. */ return pygrpc_translate_call_error( grpc_call_server_end_initial_metadata_old(self->c_call, 0)); } diff --git a/src/python/src/grpc/_adapter/_channel.c b/src/python/src/grpc/_adapter/_channel.c index 3ba943e4b2..9cf580bcfb 100644 --- a/src/python/src/grpc/_adapter/_channel.c +++ b/src/python/src/grpc/_adapter/_channel.c @@ -35,18 +35,28 @@ #include <Python.h> #include <grpc/grpc.h> +#include <grpc/grpc_security.h> + +#include "grpc/_adapter/_client_credentials.h" static int pygrpc_channel_init(Channel *self, PyObject *args, PyObject *kwds) { const char *hostport; - static char *kwlist[] = {"hostport", NULL}; + PyObject *client_credentials; + static char *kwlist[] = {"hostport", "client_credentials", NULL}; - if (!(PyArg_ParseTupleAndKeywords(args, kwds, "s:Channel", kwlist, - &hostport))) { + if (!(PyArg_ParseTupleAndKeywords(args, kwds, "sO:Channel", kwlist, + &hostport, &client_credentials))) { return -1; } - - self->c_channel = grpc_channel_create(hostport, NULL); - return 0; + if (client_credentials == Py_None) { + self->c_channel = grpc_channel_create(hostport, NULL); + return 0; + } else { + self->c_channel = grpc_secure_channel_create( + ((ClientCredentials *)client_credentials)->c_client_credentials, + hostport, NULL); + return 0; + } } static void pygrpc_channel_dealloc(Channel *self) { diff --git a/src/python/src/grpc/_adapter/_client_credentials.c b/src/python/src/grpc/_adapter/_client_credentials.c new file mode 100644 index 0000000000..e8ccff8d17 --- /dev/null +++ b/src/python/src/grpc/_adapter/_client_credentials.c @@ -0,0 +1,121 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "grpc/_adapter/_client_credentials.h" + +#include <Python.h> +#include <grpc/grpc_security.h> +#include <grpc/support/alloc.h> + +static int pygrpc_client_credentials_init(ClientCredentials *self, + PyObject *args, PyObject *kwds) { + char *root_certificates; + grpc_ssl_pem_key_cert_pair key_certificate_pair; + static char *kwlist[] = {"root_certificates", "private_key", + "certificate_chain", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwds, "zzz:ClientCredentials", kwlist, + &root_certificates, + &key_certificate_pair.private_key, + &key_certificate_pair.cert_chain)) { + return -1; + } + + if (key_certificate_pair.private_key != NULL && key_certificate_pair.cert_chain != NULL) { + self->c_client_credentials = + grpc_ssl_credentials_create(root_certificates, &key_certificate_pair); + } else { + self->c_client_credentials = + grpc_ssl_credentials_create(root_certificates, NULL); + } + return 0; +} + +static void pygrpc_client_credentials_dealloc(ClientCredentials *self) { + if (self->c_client_credentials != NULL) { + grpc_credentials_release(self->c_client_credentials); + } + self->ob_type->tp_free((PyObject *)self); +} + +PyTypeObject pygrpc_ClientCredentialsType = { + PyVarObject_HEAD_INIT(NULL, 0) + "_grpc.ClientCredencials", /*tp_name*/ + sizeof(ClientCredentials), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)pygrpc_client_credentials_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + "Wrapping of grpc_credentials.", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + 0, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)pygrpc_client_credentials_init, /* tp_init */ + 0, /* tp_alloc */ + PyType_GenericNew, /* tp_new */ +}; + +int pygrpc_add_client_credentials(PyObject *module) { + if (PyType_Ready(&pygrpc_ClientCredentialsType) < 0) { + return -1; + } + if (PyModule_AddObject(module, "ClientCredentials", + (PyObject *)&pygrpc_ClientCredentialsType) == -1) { + return -1; + } + return 0; +} diff --git a/src/python/src/grpc/_adapter/_client_credentials.h b/src/python/src/grpc/_adapter/_client_credentials.h new file mode 100644 index 0000000000..664dc80d75 --- /dev/null +++ b/src/python/src/grpc/_adapter/_client_credentials.h @@ -0,0 +1,48 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef _ADAPTER__CLIENT_CREDENTIALS_H_ +#define _ADAPTER__CLIENT_CREDENTIALS_H_ + +#include <Python.h> +#include <grpc/grpc_security.h> + +typedef struct { + PyObject_HEAD grpc_credentials *c_client_credentials; +} ClientCredentials; + +PyTypeObject pygrpc_ClientCredentialsType; + +int pygrpc_add_client_credentials(PyObject *module); + +#endif /* _ADAPTER__CLIENT_CREDENTIALS_H_ */ diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py index 8cce322d30..475d780c95 100644 --- a/src/python/src/grpc/_adapter/_face_test_case.py +++ b/src/python/src/grpc/_adapter/_face_test_case.py @@ -85,7 +85,8 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage): port = fore_link.port() rear_link = rear.RearLink( 'localhost', port, pool, - serialization.request_serializers, serialization.response_deserializers) + serialization.request_serializers, + serialization.response_deserializers, False, None, None, None) rear_link.start() front = tickets_implementations.front(pool, pool, pool) back = tickets_implementations.back( diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py index 6b3bcee9fa..5d7e677243 100644 --- a/src/python/src/grpc/_adapter/_links_test.py +++ b/src/python/src/grpc/_adapter/_links_test.py @@ -75,7 +75,7 @@ class RoundTripTest(unittest.TestCase): rear_link = rear.RearLink( 'localhost', port, self.rear_link_pool, {test_method: None}, - {test_method: None}) + {test_method: None}, False, None, None, None) rear_link.join_fore_link(test_fore_link) test_fore_link.join_rear_link(rear_link) rear_link.start() @@ -129,7 +129,7 @@ class RoundTripTest(unittest.TestCase): rear_link = rear.RearLink( 'localhost', port, self.rear_link_pool, {test_method: _IDENTITY}, - {test_method: _IDENTITY}) + {test_method: _IDENTITY}, False, None, None, None) rear_link.join_fore_link(test_fore_link) test_fore_link.join_rear_link(rear_link) rear_link.start() @@ -193,7 +193,7 @@ class RoundTripTest(unittest.TestCase): rear_link = rear.RearLink( 'localhost', port, self.rear_link_pool, {test_method: scenario.serialize_request}, - {test_method: scenario.deserialize_response}) + {test_method: scenario.deserialize_response}, False, None, None, None) rear_link.join_fore_link(test_fore_link) test_fore_link.join_rear_link(rear_link) rear_link.start() diff --git a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py index 9a13309a18..77821ba71a 100644 --- a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py +++ b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py @@ -50,7 +50,8 @@ class LonelyRearLinkTest(unittest.TestCase): self.pool.shutdown(wait=True) def testUpAndDown(self): - rear_link = rear.RearLink('nonexistent', 54321, self.pool, {}, {}) + rear_link = rear.RearLink( + 'nonexistent', 54321, self.pool, {}, {}, False, None, None, None) rear_link.start() rear_link.stop() @@ -63,7 +64,7 @@ class LonelyRearLinkTest(unittest.TestCase): rear_link = rear.RearLink( 'nonexistent', 54321, self.pool, {test_method: None}, - {test_method: None}) + {test_method: None}, False, None, None, None) rear_link.join_fore_link(fore_link) rear_link.start() diff --git a/src/python/src/grpc/_adapter/_low.py b/src/python/src/grpc/_adapter/_low.py index 2ef2eb879c..a24baaeb3e 100644 --- a/src/python/src/grpc/_adapter/_low.py +++ b/src/python/src/grpc/_adapter/_low.py @@ -52,5 +52,6 @@ Call = _c.Call Channel = _c.Channel CompletionQueue = _c.CompletionQueue Server = _c.Server +ClientCredentials = _c.ClientCredentials ServerCredentials = _c.ServerCredentials # pylint: enable=invalid-name diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py index 898c62c002..03e3f473a3 100644 --- a/src/python/src/grpc/_adapter/_low_test.py +++ b/src/python/src/grpc/_adapter/_low_test.py @@ -56,7 +56,7 @@ class LonelyClientTest(unittest.TestCase): finish_tag = object() completion_queue = _low.CompletionQueue() - channel = _low.Channel('%s:%d' % (host, port)) + channel = _low.Channel('%s:%d' % (host, port), None) client_call = _low.Call(channel, method, host, deadline) client_call.invoke(completion_queue, metadata_tag, finish_tag) @@ -87,7 +87,7 @@ class EchoTest(unittest.TestCase): self.server.start() self.client_completion_queue = _low.CompletionQueue() - self.channel = _low.Channel('%s:%d' % (self.host, port)) + self.channel = _low.Channel('%s:%d' % (self.host, port), None) def tearDown(self): self.server.stop() @@ -265,7 +265,7 @@ class CancellationTest(unittest.TestCase): self.server.start() self.client_completion_queue = _low.CompletionQueue() - self.channel = _low.Channel('%s:%d' % (self.host, port)) + self.channel = _low.Channel('%s:%d' % (self.host, port), None) def tearDown(self): self.server.stop() diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index 94ff66ffda..bfde5f5c57 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -92,7 +92,8 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): """An invocation-side bridge between RPC Framework and the C-ish _low code.""" def __init__( - self, host, port, pool, request_serializers, response_deserializers): + self, host, port, pool, request_serializers, response_deserializers, + secure, root_certificates, private_key, certificate_chain): """Constructor. Args: @@ -103,6 +104,13 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): serializer behaviors. response_deserializers: A dict from RPC method names to response object deserializer behaviors. + secure: A boolean indicating whether or not to use a secure connection. + root_certificates: The PEM-encoded root certificates or None to ask for + them to be retrieved from a default location. + private_key: The PEM-encoded private key to use or None if no private + key should be used. + certificate_chain: The PEM-encoded certificate chain to use or None if + no certificate chain should be used. """ self._condition = threading.Condition() self._host = host @@ -116,6 +124,14 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): self._channel = None self._rpc_states = {} self._spinning = False + if secure: + self._client_credentials = _low.ClientCredentials( + root_certificates, private_key, certificate_chain) + else: + self._client_credentials = None + self._root_certificates = root_certificates + self._private_key = private_key + self._certificate_chain = certificate_chain def _on_write_event(self, operation_id, event, rpc_state): if event.write_accepted: @@ -310,7 +326,8 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): """ with self._condition: self._completion_queue = _low.CompletionQueue() - self._channel = _low.Channel('%s:%d' % (self._host, self._port)) + self._channel = _low.Channel( + '%s:%d' % (self._host, self._port), self._client_credentials) return self def _stop(self): @@ -369,11 +386,17 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated): - def __init__(self, host, port, request_serializers, response_deserializers): + def __init__( + self, host, port, request_serializers, response_deserializers, secure, + root_certificates, private_key, certificate_chain): self._host = host self._port = port self._request_serializers = request_serializers self._response_deserializers = response_deserializers + self._secure = secure + self._root_certificates = root_certificates + self._private_key = private_key + self._certificate_chain = certificate_chain self._lock = threading.Lock() self._pool = None @@ -391,7 +414,8 @@ class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated): self._pool = logging_pool.pool(_THREAD_POOL_SIZE) self._rear_link = RearLink( self._host, self._port, self._pool, self._request_serializers, - self._response_deserializers) + self._response_deserializers, self._secure, self._root_certificates, + self._private_key, self._certificate_chain) self._rear_link.join_fore_link(self._fore_link) self._rear_link.start() return self @@ -422,6 +446,7 @@ class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated): self._rear_link.accept_front_to_back_ticket(ticket) +# TODO(issue 726): reconcile these two creation functions. def activated_rear_link( host, port, request_serializers, response_deserializers): """Creates a RearLink that is also an activated.Activated. @@ -436,6 +461,42 @@ def activated_rear_link( serializer behavior. response_deserializers: A dictionary from RPC method name to response object deserializer behavior. + secure: A boolean indicating whether or not to use a secure connection. + root_certificates: The PEM-encoded root certificates or None to ask for + them to be retrieved from a default location. + private_key: The PEM-encoded private key to use or None if no private key + should be used. + certificate_chain: The PEM-encoded certificate chain to use or None if no + certificate chain should be used. + """ + return _ActivatedRearLink( + host, port, request_serializers, response_deserializers, False, None, + None, None) + + + +def secure_activated_rear_link( + host, port, request_serializers, response_deserializers, root_certificates, + private_key, certificate_chain): + """Creates a RearLink that is also an activated.Activated. + + The returned object is only valid for use between calls to its start and stop + methods (or in context when used as a context manager). + + Args: + host: The host to which to connect for RPC service. + port: The port to which to connect for RPC service. + request_serializers: A dictionary from RPC method name to request object + serializer behavior. + response_deserializers: A dictionary from RPC method name to response + object deserializer behavior. + root_certificates: The PEM-encoded root certificates or None to ask for + them to be retrieved from a default location. + private_key: The PEM-encoded private key to use or None if no private key + should be used. + certificate_chain: The PEM-encoded certificate chain to use or None if no + certificate chain should be used. """ return _ActivatedRearLink( - host, port, request_serializers, response_deserializers) + host, port, request_serializers, response_deserializers, True, + root_certificates, private_key, certificate_chain) diff --git a/src/python/src/grpc/early_adopter/_assembly_utilities.py b/src/python/src/grpc/early_adopter/_assembly_utilities.py new file mode 100644 index 0000000000..facfc2bf0e --- /dev/null +++ b/src/python/src/grpc/early_adopter/_assembly_utilities.py @@ -0,0 +1,168 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import abc +import collections + +# assembly_interfaces is referenced from specification in this module. +from grpc.framework.assembly import interfaces as assembly_interfaces # pylint: disable=unused-import +from grpc.framework.assembly import utilities as assembly_utilities +from grpc.early_adopter import _reexport +from grpc.early_adopter import interfaces + + +# TODO(issue 726): Kill the "implementations" attribute of this in favor +# of the same-information-less-bogusly-represented "cardinalities". +class InvocationBreakdown(object): + """An intermediate representation of invocation-side views of RPC methods. + + Attributes: + cardinalities: A dictionary from RPC method name to interfaces.Cardinality + value. + implementations: A dictionary from RPC method name to + assembly_interfaces.MethodImplementation describing the method. + request_serializers: A dictionary from RPC method name to callable + behavior to be used serializing request values for the RPC. + response_deserializers: A dictionary from RPC method name to callable + behavior to be used deserializing response values for the RPC. + """ + __metaclass__ = abc.ABCMeta + + +class _EasyInvocationBreakdown( + InvocationBreakdown, + collections.namedtuple( + '_EasyInvocationBreakdown', + ('cardinalities', 'implementations', 'request_serializers', + 'response_deserializers'))): + pass + + +class ServiceBreakdown(object): + """An intermediate representation of service-side views of RPC methods. + + Attributes: + implementations: A dictionary from RPC method name + assembly_interfaces.MethodImplementation implementing the RPC method. + request_deserializers: A dictionary from RPC method name to callable + behavior to be used deserializing request values for the RPC. + response_serializers: A dictionary from RPC method name to callable + behavior to be used serializing response values for the RPC. + """ + __metaclass__ = abc.ABCMeta + + +class _EasyServiceBreakdown( + ServiceBreakdown, + collections.namedtuple( + '_EasyServiceBreakdown', + ('implementations', 'request_deserializers', 'response_serializers'))): + pass + + +def break_down_invocation(method_descriptions): + """Derives an InvocationBreakdown from several RPC method descriptions. + + Args: + method_descriptions: A dictionary from RPC method name to + interfaces.RpcMethodInvocationDescription describing the RPCs. + + Returns: + An InvocationBreakdown corresponding to the given method descriptions. + """ + cardinalities = {} + implementations = {} + request_serializers = {} + response_deserializers = {} + for name, method_description in method_descriptions.iteritems(): + cardinality = method_description.cardinality() + cardinalities[name] = cardinality + if cardinality is interfaces.Cardinality.UNARY_UNARY: + implementations[name] = assembly_utilities.unary_unary_inline(None) + elif cardinality is interfaces.Cardinality.UNARY_STREAM: + implementations[name] = assembly_utilities.unary_stream_inline(None) + elif cardinality is interfaces.Cardinality.STREAM_UNARY: + implementations[name] = assembly_utilities.stream_unary_inline(None) + elif cardinality is interfaces.Cardinality.STREAM_STREAM: + implementations[name] = assembly_utilities.stream_stream_inline(None) + request_serializers[name] = method_description.serialize_request + response_deserializers[name] = method_description.deserialize_response + return _EasyInvocationBreakdown( + cardinalities, implementations, request_serializers, + response_deserializers) + + +def break_down_service(method_descriptions): + """Derives a ServiceBreakdown from several RPC method descriptions. + + Args: + method_descriptions: A dictionary from RPC method name to + interfaces.RpcMethodServiceDescription describing the RPCs. + + Returns: + A ServiceBreakdown corresponding to the given method descriptions. + """ + implementations = {} + request_deserializers = {} + response_serializers = {} + for name, method_description in method_descriptions.iteritems(): + cardinality = method_description.cardinality() + if cardinality is interfaces.Cardinality.UNARY_UNARY: + def service( + request, face_rpc_context, + service_behavior=method_description.service_unary_unary): + return service_behavior( + request, _reexport.rpc_context(face_rpc_context)) + implementations[name] = assembly_utilities.unary_unary_inline(service) + elif cardinality is interfaces.Cardinality.UNARY_STREAM: + def service( + request, face_rpc_context, + service_behavior=method_description.service_unary_stream): + return service_behavior( + request, _reexport.rpc_context(face_rpc_context)) + implementations[name] = assembly_utilities.unary_stream_inline(service) + elif cardinality is interfaces.Cardinality.STREAM_UNARY: + def service( + request_iterator, face_rpc_context, + service_behavior=method_description.service_stream_unary): + return service_behavior( + request_iterator, _reexport.rpc_context(face_rpc_context)) + implementations[name] = assembly_utilities.stream_unary_inline(service) + elif cardinality is interfaces.Cardinality.STREAM_STREAM: + def service( + request_iterator, face_rpc_context, + service_behavior=method_description.service_stream_stream): + return service_behavior( + request_iterator, _reexport.rpc_context(face_rpc_context)) + implementations[name] = assembly_utilities.stream_stream_inline(service) + request_deserializers[name] = method_description.deserialize_request + response_serializers[name] = method_description.serialize_response + + return _EasyServiceBreakdown( + implementations, request_deserializers, response_serializers) diff --git a/src/python/src/grpc/early_adopter/_face_utilities.py b/src/python/src/grpc/early_adopter/_face_utilities.py deleted file mode 100644 index 3e37b08752..0000000000 --- a/src/python/src/grpc/early_adopter/_face_utilities.py +++ /dev/null @@ -1,178 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import abc -import collections - -from grpc.framework.face import interfaces as face_interfaces - -from grpc.early_adopter import interfaces - - -class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod): - - def __init__(self, unary_unary_server_rpc_method): - self._method = unary_unary_server_rpc_method - - def service(self, request, context): - """See face_interfaces.InlineValueInValueOutMethod.service for spec.""" - return self._method.service_unary_unary(request) - - -class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod): - - def __init__(self, unary_stream_server_rpc_method): - self._method = unary_stream_server_rpc_method - - def service(self, request, context): - """See face_interfaces.InlineValueInStreamOutMethod.service for spec.""" - return self._method.service_unary_stream(request) - - -class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod): - - def __init__(self, stream_unary_server_rpc_method): - self._method = stream_unary_server_rpc_method - - def service(self, request_iterator, context): - """See face_interfaces.InlineStreamInValueOutMethod.service for spec.""" - return self._method.service_stream_unary(request_iterator) - - -class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod): - - def __init__(self, stream_stream_server_rpc_method): - self._method = stream_stream_server_rpc_method - - def service(self, request_iterator, context): - """See face_interfaces.InlineStreamInStreamOutMethod.service for spec.""" - return self._method.service_stream_stream(request_iterator) - - -class ClientBreakdown(object): - """An intermediate representation of invocation-side views of RPC methods. - - Attributes: - request_serializers: A dictionary from RPC method name to callable - behavior to be used serializing request values for the RPC. - response_deserializers: A dictionary from RPC method name to callable - behavior to be used deserializing response values for the RPC. - """ - __metaclass__ = abc.ABCMeta - - -class _EasyClientBreakdown( - ClientBreakdown, - collections.namedtuple( - '_EasyClientBreakdown', - ('request_serializers', 'response_deserializers'))): - pass - - -class ServerBreakdown(object): - """An intermediate representation of implementations of RPC methods. - - Attributes: - unary_unary_methods: A dictionary from RPC method name to callable - behavior implementing the RPC method for unary-unary RPC methods. - unary_stream_methods: A dictionary from RPC method name to callable - behavior implementing the RPC method for unary-stream RPC methods. - stream_unary_methods: A dictionary from RPC method name to callable - behavior implementing the RPC method for stream-unary RPC methods. - stream_stream_methods: A dictionary from RPC method name to callable - behavior implementing the RPC method for stream-stream RPC methods. - request_deserializers: A dictionary from RPC method name to callable - behavior to be used deserializing request values for the RPC. - response_serializers: A dictionary from RPC method name to callable - behavior to be used serializing response values for the RPC. - """ - __metaclass__ = abc.ABCMeta - - - -class _EasyServerBreakdown( - ServerBreakdown, - collections.namedtuple( - '_EasyServerBreakdown', - ('unary_unary_methods', 'unary_stream_methods', 'stream_unary_methods', - 'stream_stream_methods', 'request_deserializers', - 'response_serializers'))): - pass - - -def client_break_down(methods): - """Derives a ClientBreakdown from several interfaces.ClientRpcMethods. - - Args: - methods: A dictionary from RPC mthod name to - interfaces.ClientRpcMethod object describing the RPCs. - - Returns: - A ClientBreakdown corresponding to the given methods. - """ - request_serializers = {} - response_deserializers = {} - for name, method in methods.iteritems(): - request_serializers[name] = method.serialize_request - response_deserializers[name] = method.deserialize_response - return _EasyClientBreakdown(request_serializers, response_deserializers) - - -def server_break_down(methods): - """Derives a ServerBreakdown from several interfaces.ServerRpcMethods. - - Args: - methods: A dictionary from RPC mthod name to - interfaces.ServerRpcMethod object describing the RPCs. - - Returns: - A ServerBreakdown corresponding to the given methods. - """ - unary_unary = {} - unary_stream = {} - stream_unary = {} - stream_stream = {} - request_deserializers = {} - response_serializers = {} - for name, method in methods.iteritems(): - cardinality = method.cardinality() - if cardinality is interfaces.Cardinality.UNARY_UNARY: - unary_unary[name] = _InlineUnaryUnaryMethod(method) - elif cardinality is interfaces.Cardinality.UNARY_STREAM: - unary_stream[name] = _InlineUnaryStreamMethod(method) - elif cardinality is interfaces.Cardinality.STREAM_UNARY: - stream_unary[name] = _InlineStreamUnaryMethod(method) - elif cardinality is interfaces.Cardinality.STREAM_STREAM: - stream_stream[name] = _InlineStreamStreamMethod(method) - request_deserializers[name] = method.deserialize_request - response_serializers[name] = method.serialize_response - - return _EasyServerBreakdown( - unary_unary, unary_stream, stream_unary, stream_stream, - request_deserializers, response_serializers) diff --git a/src/python/src/grpc/early_adopter/_reexport.py b/src/python/src/grpc/early_adopter/_reexport.py new file mode 100644 index 0000000000..35f4e85a72 --- /dev/null +++ b/src/python/src/grpc/early_adopter/_reexport.py @@ -0,0 +1,212 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from grpc.framework.face import exceptions as face_exceptions +from grpc.framework.face import interfaces as face_interfaces +from grpc.framework.foundation import future +from grpc.early_adopter import exceptions +from grpc.early_adopter import interfaces + +_ABORTION_REEXPORT = { + face_interfaces.Abortion.CANCELLED: interfaces.Abortion.CANCELLED, + face_interfaces.Abortion.EXPIRED: interfaces.Abortion.EXPIRED, + face_interfaces.Abortion.NETWORK_FAILURE: + interfaces.Abortion.NETWORK_FAILURE, + face_interfaces.Abortion.SERVICED_FAILURE: + interfaces.Abortion.SERVICED_FAILURE, + face_interfaces.Abortion.SERVICER_FAILURE: + interfaces.Abortion.SERVICER_FAILURE, +} + + +class _RpcError(exceptions.RpcError): + pass + + +def _reexport_error(face_rpc_error): + if isinstance(face_rpc_error, face_exceptions.CancellationError): + return exceptions.CancellationError() + elif isinstance(face_rpc_error, face_exceptions.ExpirationError): + return exceptions.ExpirationError() + else: + return _RpcError() + + +def _as_face_abortion_callback(abortion_callback): + def face_abortion_callback(face_abortion): + abortion_callback(_ABORTION_REEXPORT[face_abortion]) + return face_abortion_callback + + +class _ReexportedFuture(future.Future): + + def __init__(self, face_future): + self._face_future = face_future + + def cancel(self): + return self._face_future.cancel() + + def cancelled(self): + return self._face_future.cancelled() + + def running(self): + return self._face_future.running() + + def done(self): + return self._face_future.done() + + def result(self, timeout=None): + try: + return self._face_future.result(timeout=timeout) + except face_exceptions.RpcError as e: + raise _reexport_error(e) + + def exception(self, timeout=None): + face_error = self._face_future.exception(timeout=timeout) + return None if face_error is None else _reexport_error(face_error) + + def traceback(self, timeout=None): + return self._face_future.traceback(timeout=timeout) + + def add_done_callback(self, fn): + self._face_future.add_done_callback(lambda unused_face_future: fn(self)) + + +def _call_reexporting_errors(behavior, *args, **kwargs): + try: + return behavior(*args, **kwargs) + except face_exceptions.RpcError as e: + raise _reexport_error(e) + + +def _reexported_future(face_future): + return _ReexportedFuture(face_future) + + +class _CancellableIterator(interfaces.CancellableIterator): + + def __init__(self, face_cancellable_iterator): + self._face_cancellable_iterator = face_cancellable_iterator + + def __iter__(self): + return self + + def next(self): + return _call_reexporting_errors(self._face_cancellable_iterator.next) + + def cancel(self): + self._face_cancellable_iterator.cancel() + + +class _RpcContext(interfaces.RpcContext): + + def __init__(self, face_rpc_context): + self._face_rpc_context = face_rpc_context + + def is_active(self): + return self._face_rpc_context.is_active() + + def time_remaining(self): + return self._face_rpc_context.time_remaining() + + def add_abortion_callback(self, abortion_callback): + self._face_rpc_context.add_abortion_callback( + _as_face_abortion_callback(abortion_callback)) + + +class _UnaryUnarySyncAsync(interfaces.UnaryUnarySyncAsync): + + def __init__(self, face_unary_unary_sync_async): + self._underlying = face_unary_unary_sync_async + + def __call__(self, request, timeout): + return _call_reexporting_errors( + self._underlying, request, timeout) + + def async(self, request, timeout): + return _ReexportedFuture(self._underlying.async(request, timeout)) + + +class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync): + + def __init__(self, face_stream_unary_sync_async): + self._underlying = face_stream_unary_sync_async + + def __call__(self, request_iterator, timeout): + return _call_reexporting_errors( + self._underlying, request_iterator, timeout) + + def async(self, request_iterator, timeout): + return _ReexportedFuture(self._underlying.async(request_iterator, timeout)) + + +class _Stub(interfaces.Stub): + + def __init__(self, assembly_stub, cardinalities): + self._assembly_stub = assembly_stub + self._cardinalities = cardinalities + + def __enter__(self): + self._assembly_stub.__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._assembly_stub.__exit__(exc_type, exc_val, exc_tb) + return False + + def __getattr__(self, attr): + underlying_attr = self._assembly_stub.__getattr__(attr) + cardinality = self._cardinalities.get(attr) + # TODO(nathaniel): unify this trick with its other occurrence in the code. + if cardinality is None: + for name, cardinality in self._cardinalities.iteritems(): + last_slash_index = name.rfind('/') + if 0 <= last_slash_index and name[last_slash_index + 1:] == attr: + break + else: + raise AttributeError(attr) + if cardinality is interfaces.Cardinality.UNARY_UNARY: + return _UnaryUnarySyncAsync(underlying_attr) + elif cardinality is interfaces.Cardinality.UNARY_STREAM: + return lambda request, timeout: _CancellableIterator( + underlying_attr(request, timeout)) + elif cardinality is interfaces.Cardinality.STREAM_UNARY: + return _StreamUnarySyncAsync(underlying_attr) + elif cardinality is interfaces.Cardinality.STREAM_STREAM: + return lambda request_iterator, timeout: _CancellableIterator( + underlying_attr(request_iterator, timeout)) + else: + raise AttributeError(attr) + +def rpc_context(face_rpc_context): + return _RpcContext(face_rpc_context) + + +def stub(assembly_stub, cardinalities): + return _Stub(assembly_stub, cardinalities) diff --git a/src/ruby/spec/auth/signet_spec.rb b/src/python/src/grpc/early_adopter/exceptions.py index 1712edf296..5234d3b91c 100644 --- a/src/ruby/spec/auth/signet_spec.rb +++ b/src/python/src/grpc/early_adopter/exceptions.py @@ -27,44 +27,22 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -spec_dir = File.expand_path(File.join(File.dirname(__FILE__))) -$LOAD_PATH.unshift(spec_dir) -$LOAD_PATH.uniq! +"""Exceptions raised by GRPC. -require 'apply_auth_examples' -require 'grpc/auth/signet' -require 'jwt' -require 'openssl' -require 'spec_helper' +Only GRPC should instantiate and raise these exceptions. +""" -describe Signet::OAuth2::Client do - before(:example) do - @key = OpenSSL::PKey::RSA.new(2048) - @client = Signet::OAuth2::Client.new( - token_credential_uri: 'https://accounts.google.com/o/oauth2/token', - scope: 'https://www.googleapis.com/auth/userinfo.profile', - issuer: 'app@example.com', - audience: 'https://accounts.google.com/o/oauth2/token', - signing_key: @key - ) - end +import abc - def make_auth_stubs(with_access_token: '') - Faraday::Adapter::Test::Stubs.new do |stub| - stub.post('/o/oauth2/token') do |env| - params = Addressable::URI.form_unencode(env[:body]) - _claim, _header = JWT.decode(params.assoc('assertion').last, - @key.public_key) - want = ['grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer'] - expect(params.assoc('grant_type')).to eq(want) - build_json_response( - 'access_token' => with_access_token, - 'token_type' => 'Bearer', - 'expires_in' => 3600 - ) - end - end - end - it_behaves_like 'apply/apply! are OK' -end +class RpcError(Exception): + """Common super type for all exceptions raised by GRPC.""" + __metaclass__ = abc.ABCMeta + + +class CancellationError(RpcError): + """Indicates that an RPC has been cancelled.""" + + +class ExpirationError(RpcError): + """Indicates that an RPC has expired ("timed out").""" diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index 1d76d0f9e0..6195958624 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -31,15 +31,12 @@ import threading -from grpc._adapter import fore -from grpc.framework.base.packets import implementations as _tickets_implementations -from grpc.framework.face import implementations as _face_implementations -from grpc.framework.foundation import logging_pool -from grpc.early_adopter import _face_utilities +from grpc._adapter import fore as _fore +from grpc._adapter import rear as _rear +from grpc.early_adopter import _assembly_utilities +from grpc.early_adopter import _reexport from grpc.early_adopter import interfaces - -_MEGA_TIMEOUT = 60 * 60 * 24 -_THREAD_POOL_SIZE = 80 +from grpc.framework.assembly import implementations as _assembly_implementations class _Server(interfaces.Server): @@ -48,63 +45,122 @@ class _Server(interfaces.Server): self._lock = threading.Lock() self._breakdown = breakdown self._port = port - self._private_key = private_key - self._certificate_chain = certificate_chain + if private_key is None or certificate_chain is None: + self._key_chain_pairs = () + else: + self._key_chain_pairs = ((private_key, certificate_chain),) - self._pool = None self._fore_link = None - self._back = None + self._server = None - def start(self): - """See interfaces.Server.start for specification.""" + def _start(self): with self._lock: - if self._pool is None: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - servicer = _face_implementations.servicer( - self._pool, - inline_value_in_value_out_methods=self._breakdown.unary_unary_methods, - inline_value_in_stream_out_methods=self._breakdown.unary_stream_methods, - inline_stream_in_value_out_methods=self._breakdown.stream_unary_methods, - inline_stream_in_stream_out_methods=self._breakdown.stream_stream_methods) - self._fore_link = fore.ForeLink( - self._pool, self._breakdown.request_deserializers, - self._breakdown.response_serializers, None, - ((self._private_key, self._certificate_chain),), port=self._port) - self._fore_link.start() - port = self._fore_link.port() - self._back = _tickets_implementations.back( - servicer, self._pool, self._pool, self._pool, _MEGA_TIMEOUT, - _MEGA_TIMEOUT) - self._fore_link.join_rear_link(self._back) - self._back.join_fore_link(self._fore_link) - return port + if self._server is None: + self._fore_link = _fore.activated_fore_link( + self._port, self._breakdown.request_deserializers, + self._breakdown.response_serializers, None, self._key_chain_pairs) + + self._server = _assembly_implementations.assemble_service( + self._breakdown.implementations, self._fore_link) + self._server.start() else: raise ValueError('Server currently running!') - def stop(self): - """See interfaces.Server.stop for specification.""" + def _stop(self): with self._lock: - if self._pool is None: + if self._server is None: raise ValueError('Server not running!') else: - self._fore_link.stop() - self._pool.shutdown(wait=True) - self._pool = None + self._server.stop() + self._server = None + self._fore_link = None + + def __enter__(self): + self._start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._stop() + return False + + def start(self): + self._start() + + def stop(self): + self._stop() + + def port(self): + with self._lock: + return self._fore_link.port() + +def _build_stub(breakdown, activated_rear_link): + assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub( + breakdown.implementations, activated_rear_link) + return _reexport.stub(assembly_stub, breakdown.cardinalities) def _build_server(methods, port, private_key, certificate_chain): - breakdown = _face_utilities.server_break_down(methods) + breakdown = _assembly_utilities.break_down_service(methods) return _Server(breakdown, port, private_key, certificate_chain) +def insecure_stub(methods, host, port): + """Constructs an insecure interfaces.Stub. + + Args: + methods: A dictionary from RPC method name to + interfaces.RpcMethodInvocationDescription describing the RPCs to be + supported by the created stub. + host: The host to which to connect for RPC service. + port: The port to which to connect for RPC service. + + Returns: + An interfaces.Stub affording RPC invocation. + """ + breakdown = _assembly_utilities.break_down_invocation(methods) + activated_rear_link = _rear.activated_rear_link( + host, port, breakdown.request_serializers, + breakdown.response_deserializers) + return _build_stub(breakdown, activated_rear_link) + + +def secure_stub( + methods, host, port, root_certificates, private_key, certificate_chain): + """Constructs an insecure interfaces.Stub. + + Args: + methods: A dictionary from RPC method name to + interfaces.RpcMethodInvocationDescription describing the RPCs to be + supported by the created stub. + host: The host to which to connect for RPC service. + port: The port to which to connect for RPC service. + root_certificates: The PEM-encoded root certificates or None to ask for + them to be retrieved from a default location. + private_key: The PEM-encoded private key to use or None if no private key + should be used. + certificate_chain: The PEM-encoded certificate chain to use or None if no + certificate chain should be used. + + Returns: + An interfaces.Stub affording RPC invocation. + """ + breakdown = _assembly_utilities.break_down_invocation(methods) + activated_rear_link = _rear.secure_activated_rear_link( + host, port, breakdown.request_serializers, + breakdown.response_deserializers, root_certificates, private_key, + certificate_chain) + return _build_stub(breakdown, activated_rear_link) + + def insecure_server(methods, port): """Constructs an insecure interfaces.Server. Args: methods: A dictionary from RPC method name to - interfaces.ServerRpcMethod object describing the RPCs to + interfaces.RpcMethodServiceDescription describing the RPCs to be serviced by the created server. - port: The port on which to serve. + port: The desired port on which to serve or zero to ask for a port to + be automatically selected. Returns: An interfaces.Server that will run with no security and @@ -118,9 +174,10 @@ def secure_server(methods, port, private_key, certificate_chain): Args: methods: A dictionary from RPC method name to - interfaces.ServerRpcMethod object describing the RPCs to + interfaces.RpcMethodServiceDescription describing the RPCs to be serviced by the created server. - port: The port on which to serve. + port: The port on which to serve or zero to ask for a port to be + automatically selected. private_key: A pem-encoded private key. certificate_chain: A pem-encoded certificate chain. diff --git a/src/python/src/grpc/early_adopter/implementations_test.py b/src/python/src/grpc/early_adopter/implementations_test.py new file mode 100644 index 0000000000..9ef06c32cb --- /dev/null +++ b/src/python/src/grpc/early_adopter/implementations_test.py @@ -0,0 +1,176 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# TODO(nathaniel): Expand this test coverage. + +"""Test of the GRPC-backed ForeLink and RearLink.""" + +import unittest + +from grpc.early_adopter import implementations +from grpc.early_adopter import utilities +from grpc._junkdrawer import math_pb2 + +DIV = 'Div' +DIV_MANY = 'DivMany' +FIB = 'Fib' +SUM = 'Sum' + +def _fibbonacci(limit): + left, right = 0, 1 + for _ in xrange(limit): + yield left + left, right = right, left + right + + +def _div(request, unused_context): + return math_pb2.DivReply( + quotient=request.dividend / request.divisor, + remainder=request.dividend % request.divisor) + + +def _div_many(request_iterator, unused_context): + for request in request_iterator: + yield math_pb2.DivReply( + quotient=request.dividend / request.divisor, + remainder=request.dividend % request.divisor) + + +def _fib(request, unused_context): + for number in _fibbonacci(request.limit): + yield math_pb2.Num(num=number) + + +def _sum(request_iterator, unused_context): + accumulation = 0 + for request in request_iterator: + accumulation += request.num + return math_pb2.Num(num=accumulation) + + +_INVOCATION_DESCRIPTIONS = { + DIV: utilities.unary_unary_invocation_description( + math_pb2.DivArgs.SerializeToString, math_pb2.DivReply.FromString), + DIV_MANY: utilities.stream_stream_invocation_description( + math_pb2.DivArgs.SerializeToString, math_pb2.DivReply.FromString), + FIB: utilities.unary_stream_invocation_description( + math_pb2.FibArgs.SerializeToString, math_pb2.Num.FromString), + SUM: utilities.stream_unary_invocation_description( + math_pb2.Num.SerializeToString, math_pb2.Num.FromString), +} + +_SERVICE_DESCRIPTIONS = { + DIV: utilities.unary_unary_service_description( + _div, math_pb2.DivArgs.FromString, + math_pb2.DivReply.SerializeToString), + DIV_MANY: utilities.stream_stream_service_description( + _div_many, math_pb2.DivArgs.FromString, + math_pb2.DivReply.SerializeToString), + FIB: utilities.unary_stream_service_description( + _fib, math_pb2.FibArgs.FromString, math_pb2.Num.SerializeToString), + SUM: utilities.stream_unary_service_description( + _sum, math_pb2.Num.FromString, math_pb2.Num.SerializeToString), +} + +_TIMEOUT = 3 + + +class EarlyAdopterImplementationsTest(unittest.TestCase): + + def setUp(self): + self.server = implementations.insecure_server(_SERVICE_DESCRIPTIONS, 0) + self.server.start() + port = self.server.port() + self.stub = implementations.insecure_stub(_INVOCATION_DESCRIPTIONS, 'localhost', port) + + def tearDown(self): + self.server.stop() + + def testUpAndDown(self): + with self.stub: + pass + + def testUnaryUnary(self): + divisor = 59 + dividend = 973 + expected_quotient = dividend / divisor + expected_remainder = dividend % divisor + + with self.stub: + response = self.stub.Div( + math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT) + self.assertEqual(expected_quotient, response.quotient) + self.assertEqual(expected_remainder, response.remainder) + + def testUnaryStream(self): + stream_length = 43 + + with self.stub: + response_iterator = self.stub.Fib( + math_pb2.FibArgs(limit=stream_length), _TIMEOUT) + numbers = tuple(response.num for response in response_iterator) + for early, middle, later in zip(numbers, numbers[:1], numbers[:2]): + self.assertEqual(early + middle, later) + self.assertEqual(stream_length, len(numbers)) + + def testStreamUnary(self): + stream_length = 127 + + with self.stub: + response_future = self.stub.Sum.async( + (math_pb2.Num(num=index) for index in range(stream_length)), + _TIMEOUT) + self.assertEqual( + (stream_length * (stream_length - 1)) / 2, + response_future.result().num) + + def testStreamStream(self): + stream_length = 179 + divisor_offset = 71 + dividend_offset = 1763 + + with self.stub: + response_iterator = self.stub.DivMany( + (math_pb2.DivArgs( + divisor=divisor_offset + index, + dividend=dividend_offset + index) + for index in range(stream_length)), + _TIMEOUT) + for index, response in enumerate(response_iterator): + self.assertEqual( + (dividend_offset + index) / (divisor_offset + index), + response.quotient) + self.assertEqual( + (dividend_offset + index) % (divisor_offset + index), + response.remainder) + self.assertEqual(stream_length, index + 1) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/src/grpc/early_adopter/interfaces.py b/src/python/src/grpc/early_adopter/interfaces.py index 0ec371f8e8..b733873c1c 100644 --- a/src/python/src/grpc/early_adopter/interfaces.py +++ b/src/python/src/grpc/early_adopter/interfaces.py @@ -32,6 +32,11 @@ import abc import enum +# exceptions is referenced from specification in this module. +from grpc.early_adopter import exceptions # pylint: disable=unused-import +from grpc.framework.foundation import activated +from grpc.framework.foundation import future + @enum.unique class Cardinality(enum.Enum): @@ -43,24 +48,166 @@ class Cardinality(enum.Enum): STREAM_STREAM = 'request-streaming/response-streaming' -class RpcMethod(object): - """A type for the common aspects of RPC method specifications.""" +@enum.unique +class Abortion(enum.Enum): + """Categories of RPC abortion.""" + + CANCELLED = 'cancelled' + EXPIRED = 'expired' + NETWORK_FAILURE = 'network failure' + SERVICED_FAILURE = 'serviced failure' + SERVICER_FAILURE = 'servicer failure' + + +class CancellableIterator(object): + """Implements the Iterator protocol and affords a cancel method.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __iter__(self): + """Returns the self object in accordance with the Iterator protocol.""" + raise NotImplementedError() + + @abc.abstractmethod + def next(self): + """Returns a value or raises StopIteration per the Iterator protocol.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Requests cancellation of whatever computation underlies this iterator.""" + raise NotImplementedError() + + +class RpcContext(object): + """Provides RPC-related information and action.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def is_active(self): + """Describes whether the RPC is active or has terminated.""" + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + """Describes the length of allowed time remaining for the RPC. + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the RPC to complete before it is considered to have timed + out. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_abortion_callback(self, abortion_callback): + """Registers a callback to be called if the RPC is aborted. + Args: + abortion_callback: A callable to be called and passed an Abortion value + in the event of RPC abortion. + """ + raise NotImplementedError() + + +class UnaryUnarySyncAsync(object): + """Affords invoking a unary-unary RPC synchronously or asynchronously. + Values implementing this interface are directly callable and present an + "async" method. Both calls take a request value and a numeric timeout. + Direct invocation of a value of this type invokes its associated RPC and + blocks until the RPC's response is available. Calling the "async" method + of a value of this type invokes its associated RPC and immediately returns a + future.Future bound to the asynchronous execution of the RPC. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __call__(self, request, timeout): + """Synchronously invokes the underlying RPC. + Args: + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + Returns: + The response value for the RPC. + Raises: + exceptions.RpcError: Indicating that the RPC was aborted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def async(self, request, timeout): + """Asynchronously invokes the underlying RPC. + Args: + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future's result value will be the response value of the RPC. + In the event of RPC abortion, the returned Future's exception value + will be an exceptions.RpcError. + """ + raise NotImplementedError() + + +class StreamUnarySyncAsync(object): + """Affords invoking a stream-unary RPC synchronously or asynchronously. + Values implementing this interface are directly callable and present an + "async" method. Both calls take an iterator of request values and a numeric + timeout. Direct invocation of a value of this type invokes its associated RPC + and blocks until the RPC's response is available. Calling the "async" method + of a value of this type invokes its associated RPC and immediately returns a + future.Future bound to the asynchronous execution of the RPC. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __call__(self, request_iterator, timeout): + """Synchronously invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + The response value for the RPC. + + Raises: + exceptions.RpcError: Indicating that the RPC was aborted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def async(self, request_iterator, timeout): + """Asynchronously invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future's result value will be the response value of the RPC. + In the event of RPC abortion, the returned Future's exception value + will be an exceptions.RpcError. + """ + raise NotImplementedError() + + +class RpcMethodDescription(object): + """A type for the common aspects of RPC method descriptions.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod def cardinality(self): - """Identifies the cardinality of this RpcMethod. + """Identifies the cardinality of this RpcMethodDescription. Returns: A Cardinality value identifying whether or not this - RpcMethod is request-unary or request-streaming and - whether or not it is response-unary or - response-streaming. + RpcMethodDescription is request-unary or request-streaming and + whether or not it is response-unary or response-streaming. """ raise NotImplementedError() -class ClientRpcMethod(RpcMethod): +class RpcMethodInvocationDescription(RpcMethodDescription): """Invocation-side description of an RPC method.""" __metaclass__ = abc.ABCMeta @@ -69,7 +216,8 @@ class ClientRpcMethod(RpcMethod): """Serializes a request value. Args: - request: A request value appropriate for this RpcMethod. + request: A request value appropriate for the RPC method described by this + RpcMethodInvocationDescription. Returns: The serialization of the given request value as a @@ -82,9 +230,9 @@ class ClientRpcMethod(RpcMethod): """Deserializes a response value. Args: - serialized_response: A bytestring that is the - serialization of a response value appropriate for this - RpcMethod. + serialized_response: A bytestring that is the serialization of a response + value appropriate for the RPC method described by this + RpcMethodInvocationDescription. Returns: A response value corresponding to the given bytestring. @@ -92,7 +240,7 @@ class ClientRpcMethod(RpcMethod): raise NotImplementedError() -class ServerRpcMethod(RpcMethod): +class RpcMethodServiceDescription(RpcMethodDescription): """Service-side description of an RPC method.""" __metaclass__ = abc.ABCMeta @@ -101,9 +249,9 @@ class ServerRpcMethod(RpcMethod): """Deserializes a request value. Args: - serialized_request: A bytestring that is the - serialization of a request value appropriate for this - RpcMethod. + serialized_request: A bytestring that is the serialization of a request + value appropriate for the RPC method described by this + RpcMethodServiceDescription. Returns: A request value corresponding to the given bytestring. @@ -115,7 +263,8 @@ class ServerRpcMethod(RpcMethod): """Serializes a response value. Args: - response: A response value appropriate for this RpcMethod. + response: A response value appropriate for the RPC method described by + this RpcMethodServiceDescription. Returns: The serialization of the given response value as a @@ -124,80 +273,116 @@ class ServerRpcMethod(RpcMethod): raise NotImplementedError() @abc.abstractmethod - def service_unary_unary(self, request): + def service_unary_unary(self, request, context): """Carries out this RPC. This method may only be called if the cardinality of this - RpcMethod is Cardinality.UNARY_UNARY. + RpcMethodServiceDescription is Cardinality.UNARY_UNARY. Args: - request: A request value appropriate for this RpcMethod. + request: A request value appropriate for the RPC method described by this + RpcMethodServiceDescription. + context: An RpcContext object for the RPC. Returns: - A response value appropriate for this RpcMethod. + A response value appropriate for the RPC method described by this + RpcMethodServiceDescription. """ raise NotImplementedError() @abc.abstractmethod - def service_unary_stream(self, request): + def service_unary_stream(self, request, context): """Carries out this RPC. This method may only be called if the cardinality of this - RpcMethod is Cardinality.UNARY_STREAM. + RpcMethodServiceDescription is Cardinality.UNARY_STREAM. Args: - request: A request value appropriate for this RpcMethod. + request: A request value appropriate for the RPC method described by this + RpcMethodServiceDescription. + context: An RpcContext object for the RPC. Yields: - Zero or more response values appropriate for this - RpcMethod. + Zero or more response values appropriate for the RPC method described by + this RpcMethodServiceDescription. """ raise NotImplementedError() @abc.abstractmethod - def service_stream_unary(self, request_iterator): + def service_stream_unary(self, request_iterator, context): """Carries out this RPC. This method may only be called if the cardinality of this - RpcMethod is Cardinality.STREAM_UNARY. + RpcMethodServiceDescription is Cardinality.STREAM_UNARY. Args: - request_iterator: An iterator of request values - appropriate for this RpcMethod. + request_iterator: An iterator of request values appropriate for the RPC + method described by this RpcMethodServiceDescription. + context: An RpcContext object for the RPC. Returns: - A response value appropriate for this RpcMethod. + A response value appropriate for the RPC method described by this + RpcMethodServiceDescription. """ raise NotImplementedError() @abc.abstractmethod - def service_stream_stream(self, request_iterator): + def service_stream_stream(self, request_iterator, context): """Carries out this RPC. This method may only be called if the cardinality of this - RpcMethod is Cardinality.STREAM_STREAM. + RpcMethodServiceDescription is Cardinality.STREAM_STREAM. Args: - request_iterator: An iterator of request values - appropriate for this RpcMethod. + request_iterator: An iterator of request values appropriate for the RPC + method described by this RpcMethodServiceDescription. + context: An RpcContext object for the RPC. Yields: - Zero or more response values appropraite for this - RpcMethod. + Zero or more response values appropriate for the RPC method described by + this RpcMethodServiceDescription. """ raise NotImplementedError() -class Server(object): +class Stub(object): + """A stub with callable RPC method names for attributes. + + Instances of this type are context managers and only afford RPC invocation + when used in context. + + Instances of this type, when used in context, respond to attribute access + as follows: if the requested attribute is the name of a unary-unary RPC + method, the value of the attribute will be a UnaryUnarySyncAsync with which + to invoke the RPC method. If the requested attribute is the name of a + unary-stream RPC method, the value of the attribute will be a callable taking + a request object and a timeout parameter and returning a CancellableIterator + that yields the response values of the RPC. If the requested attribute is the + name of a stream-unary RPC method, the value of the attribute will be a + StreamUnarySyncAsync with which to invoke the RPC method. If the requested + attribute is the name of a stream-stream RPC method, the value of the + attribute will be a callable taking an iterator of request objects and a + timeout and returning a CancellableIterator that yields the response values + of the RPC. + + In all cases indication of abortion is indicated by raising of + exceptions.RpcError, exceptions.CancellationError, + and exceptions.ExpirationError. + """ + __metaclass__ = abc.ABCMeta + + +class Server(activated.Activated): """A GRPC Server.""" __metaclass__ = abc.ABCMeta @abc.abstractmethod - def start(self): - """Instructs this server to commence service of RPCs.""" - raise NotImplementedError() + def port(self): + """Reports the port on which the server is serving. - @abc.abstractmethod - def stop(self): - """Instructs this server to halt service of RPCs.""" + This method may only be called while the server is activated. + + Returns: + The port on which the server is serving. + """ raise NotImplementedError() diff --git a/src/python/src/grpc/early_adopter/utilities.py b/src/python/src/grpc/early_adopter/utilities.py index 9277d3f6ad..da8ef825aa 100644 --- a/src/python/src/grpc/early_adopter/utilities.py +++ b/src/python/src/grpc/early_adopter/utilities.py @@ -32,7 +32,9 @@ from grpc.early_adopter import interfaces -class _RpcMethod(interfaces.ClientRpcMethod, interfaces.ServerRpcMethod): +class _RpcMethodDescription( + interfaces.RpcMethodInvocationDescription, + interfaces.RpcMethodServiceDescription): def __init__( self, cardinality, unary_unary, unary_stream, stream_unary, @@ -49,44 +51,45 @@ class _RpcMethod(interfaces.ClientRpcMethod, interfaces.ServerRpcMethod): self._response_deserializer = response_deserializer def cardinality(self): - """See interfaces.RpcMethod.cardinality for specification.""" + """See interfaces.RpcMethodDescription.cardinality for specification.""" return self._cardinality def serialize_request(self, request): - """See interfaces.RpcMethod.serialize_request for specification.""" + """See interfaces.RpcMethodInvocationDescription.serialize_request.""" return self._request_serializer(request) def deserialize_request(self, serialized_request): - """See interfaces.RpcMethod.deserialize_request for specification.""" + """See interfaces.RpcMethodServiceDescription.deserialize_request.""" return self._request_deserializer(serialized_request) def serialize_response(self, response): - """See interfaces.RpcMethod.serialize_response for specification.""" + """See interfaces.RpcMethodServiceDescription.serialize_response.""" return self._response_serializer(response) def deserialize_response(self, serialized_response): - """See interfaces.RpcMethod.deserialize_response for specification.""" + """See interfaces.RpcMethodInvocationDescription.deserialize_response.""" return self._response_deserializer(serialized_response) - def service_unary_unary(self, request): - """See interfaces.RpcMethod.service_unary_unary for specification.""" - return self._unary_unary(request) + def service_unary_unary(self, request, context): + """See interfaces.RpcMethodServiceDescription.service_unary_unary.""" + return self._unary_unary(request, context) - def service_unary_stream(self, request): - """See interfaces.RpcMethod.service_unary_stream for specification.""" - return self._unary_stream(request) + def service_unary_stream(self, request, context): + """See interfaces.RpcMethodServiceDescription.service_unary_stream.""" + return self._unary_stream(request, context) - def service_stream_unary(self, request_iterator): - """See interfaces.RpcMethod.service_stream_unary for specification.""" - return self._stream_unary(request_iterator) + def service_stream_unary(self, request_iterator, context): + """See interfaces.RpcMethodServiceDescription.service_stream_unary.""" + return self._stream_unary(request_iterator, context) - def service_stream_stream(self, request_iterator): - """See interfaces.RpcMethod.service_stream_stream for specification.""" - return self._stream_stream(request_iterator) + def service_stream_stream(self, request_iterator, context): + """See interfaces.RpcMethodServiceDescription.service_stream_stream.""" + return self._stream_stream(request_iterator, context) -def unary_unary_client_rpc_method(request_serializer, response_deserializer): - """Constructs an interfaces.ClientRpcMethod for a unary-unary RPC method. +def unary_unary_invocation_description( + request_serializer, response_deserializer): + """Creates an interfaces.RpcMethodInvocationDescription for an RPC method. Args: request_serializer: A callable that when called on a request @@ -96,17 +99,17 @@ def unary_unary_client_rpc_method(request_serializer, response_deserializer): that bytestring. Returns: - An interfaces.ClientRpcMethod constructed from the given - arguments representing a unary-request/unary-response RPC - method. + An interfaces.RpcMethodInvocationDescription constructed from the given + arguments representing a unary-request/unary-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.UNARY_UNARY, None, None, None, None, request_serializer, None, None, response_deserializer) -def unary_stream_client_rpc_method(request_serializer, response_deserializer): - """Constructs an interfaces.ClientRpcMethod for a unary-stream RPC method. +def unary_stream_invocation_description( + request_serializer, response_deserializer): + """Creates an interfaces.RpcMethodInvocationDescription for an RPC method. Args: request_serializer: A callable that when called on a request @@ -116,17 +119,17 @@ def unary_stream_client_rpc_method(request_serializer, response_deserializer): that bytestring. Returns: - An interfaces.ClientRpcMethod constructed from the given - arguments representing a unary-request/streaming-response - RPC method. + An interfaces.RpcMethodInvocationDescription constructed from the given + arguments representing a unary-request/streaming-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.UNARY_STREAM, None, None, None, None, request_serializer, None, None, response_deserializer) -def stream_unary_client_rpc_method(request_serializer, response_deserializer): - """Constructs an interfaces.ClientRpcMethod for a stream-unary RPC method. +def stream_unary_invocation_description( + request_serializer, response_deserializer): + """Creates an interfaces.RpcMethodInvocationDescription for an RPC method. Args: request_serializer: A callable that when called on a request @@ -136,17 +139,17 @@ def stream_unary_client_rpc_method(request_serializer, response_deserializer): that bytestring. Returns: - An interfaces.ClientRpcMethod constructed from the given - arguments representing a streaming-request/unary-response - RPC method. + An interfaces.RpcMethodInvocationDescription constructed from the given + arguments representing a streaming-request/unary-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.STREAM_UNARY, None, None, None, None, request_serializer, None, None, response_deserializer) -def stream_stream_client_rpc_method(request_serializer, response_deserializer): - """Constructs an interfaces.ClientRpcMethod for a stream-stream RPC method. +def stream_stream_invocation_description( + request_serializer, response_deserializer): + """Creates an interfaces.RpcMethodInvocationDescription for an RPC method. Args: request_serializer: A callable that when called on a request @@ -156,23 +159,23 @@ def stream_stream_client_rpc_method(request_serializer, response_deserializer): that bytestring. Returns: - An interfaces.ClientRpcMethod constructed from the given - arguments representing a - streaming-request/streaming-response RPC method. + An interfaces.RpcMethodInvocationDescription constructed from the given + arguments representing a streaming-request/streaming-response RPC + method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.STREAM_STREAM, None, None, None, None, request_serializer, None, None, response_deserializer) -def unary_unary_server_rpc_method( +def unary_unary_service_description( behavior, request_deserializer, response_serializer): - """Constructs an interfaces.ServerRpcMethod for the given behavior. + """Creates an interfaces.RpcMethodServiceDescription for the given behavior. Args: behavior: A callable that implements a unary-unary RPC - method that accepts a single request and returns a single - response. + method that accepts a single request and an interfaces.RpcContext and + returns a single response. request_deserializer: A callable that when called on a bytestring returns the request value corresponding to that bytestring. @@ -181,23 +184,23 @@ def unary_unary_server_rpc_method( that value. Returns: - An interfaces.ServerRpcMethod constructed from the given + An interfaces.RpcMethodServiceDescription constructed from the given arguments representing a unary-request/unary-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.UNARY_UNARY, behavior, None, None, None, None, request_deserializer, response_serializer, None) -def unary_stream_server_rpc_method( +def unary_stream_service_description( behavior, request_deserializer, response_serializer): - """Constructs an interfaces.ServerRpcMethod for the given behavior. + """Creates an interfaces.RpcMethodServiceDescription for the given behavior. Args: behavior: A callable that implements a unary-stream RPC - method that accepts a single request and returns an - iterator of zero or more responses. + method that accepts a single request and an interfaces.RpcContext + and returns an iterator of zero or more responses. request_deserializer: A callable that when called on a bytestring returns the request value corresponding to that bytestring. @@ -206,23 +209,23 @@ def unary_stream_server_rpc_method( that value. Returns: - An interfaces.ServerRpcMethod constructed from the given + An interfaces.RpcMethodServiceDescription constructed from the given arguments representing a unary-request/streaming-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.UNARY_STREAM, None, behavior, None, None, None, request_deserializer, response_serializer, None) -def stream_unary_server_rpc_method( +def stream_unary_service_description( behavior, request_deserializer, response_serializer): - """Constructs an interfaces.ServerRpcMethod for the given behavior. + """Creates an interfaces.RpcMethodServiceDescription for the given behavior. Args: behavior: A callable that implements a stream-unary RPC method that accepts an iterator of zero or more requests - and returns a single response. + and an interfaces.RpcContext and returns a single response. request_deserializer: A callable that when called on a bytestring returns the request value corresponding to that bytestring. @@ -231,23 +234,24 @@ def stream_unary_server_rpc_method( that value. Returns: - An interfaces.ServerRpcMethod constructed from the given + An interfaces.RpcMethodServiceDescription constructed from the given arguments representing a streaming-request/unary-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.STREAM_UNARY, None, None, behavior, None, None, request_deserializer, response_serializer, None) -def stream_stream_server_rpc_method( +def stream_stream_service_description( behavior, request_deserializer, response_serializer): - """Constructs an interfaces.ServerRpcMethod for the given behavior. + """Creates an interfaces.RpcMethodServiceDescription for the given behavior. Args: behavior: A callable that implements a stream-stream RPC method that accepts an iterator of zero or more requests - and returns an iterator of zero or more responses. + and an interfaces.RpcContext and returns an iterator of + zero or more responses. request_deserializer: A callable that when called on a bytestring returns the request value corresponding to that bytestring. @@ -256,10 +260,10 @@ def stream_stream_server_rpc_method( that value. Returns: - An interfaces.ServerRpcMethod constructed from the given + An interfaces.RpcMethodServiceDescription constructed from the given arguments representing a streaming-request/streaming-response RPC method. """ - return _RpcMethod( + return _RpcMethodDescription( interfaces.Cardinality.STREAM_STREAM, None, None, None, behavior, None, request_deserializer, response_serializer, None) diff --git a/src/python/src/grpc/framework/assembly/implementations.py b/src/python/src/grpc/framework/assembly/implementations.py index b9d314844c..f7166ed99d 100644 --- a/src/python/src/grpc/framework/assembly/implementations.py +++ b/src/python/src/grpc/framework/assembly/implementations.py @@ -31,16 +31,18 @@ import threading +# tickets_interfaces, face_interfaces, and activated are referenced from +# specification in this module. from grpc.framework.assembly import interfaces from grpc.framework.base import util as base_utilities from grpc.framework.base.packets import implementations as tickets_implementations -from grpc.framework.base.packets import interfaces as tickets_interfaces +from grpc.framework.base.packets import interfaces as tickets_interfaces # pylint: disable=unused-import from grpc.framework.common import cardinality from grpc.framework.common import style from grpc.framework.face import implementations as face_implementations -from grpc.framework.face import interfaces as face_interfaces +from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import from grpc.framework.face import utilities as face_utilities -from grpc.framework.foundation import activated +from grpc.framework.foundation import activated # pylint: disable=unused-import from grpc.framework.foundation import logging_pool _ONE_DAY_IN_SECONDS = 60 * 60 * 24 @@ -138,7 +140,13 @@ class _DynamicInlineStub(object): with self._lock: behavior = self._behaviors.get(attr) if behavior is None: - raise AttributeError(attr) + for name, behavior in self._behaviors.iteritems(): + last_slash_index = name.rfind('/') + if 0 <= last_slash_index and name[last_slash_index + 1:] == attr: + return behavior + else: + raise AttributeError( + '_DynamicInlineStub instance has no attribute "%s"!' % attr) else: return behavior diff --git a/src/python/src/setup.py b/src/python/src/setup.py index e3f13fa5c8..26121dcfab 100644 --- a/src/python/src/setup.py +++ b/src/python/src/setup.py @@ -38,6 +38,7 @@ _EXTENSION_SOURCES = ( 'grpc/_adapter/_completion_queue.c', 'grpc/_adapter/_error.c', 'grpc/_adapter/_server.c', + 'grpc/_adapter/_client_credentials.c', 'grpc/_adapter/_server_credentials.c', ) @@ -80,6 +81,6 @@ _PACKAGE_DIRECTORIES = { } _core.setup( - name='grpc-2015', version='0.0.1', + name='grpc-2015', version='0.4.0', ext_modules=[_EXTENSION_MODULE], packages=_PACKAGES, package_dir=_PACKAGE_DIRECTORIES) diff --git a/src/ruby/bin/apis/pubsub_demo.rb b/src/ruby/bin/apis/pubsub_demo.rb index 6656a56130..9bb324ff64 100755 --- a/src/ruby/bin/apis/pubsub_demo.rb +++ b/src/ruby/bin/apis/pubsub_demo.rb @@ -31,10 +31,9 @@ # pubsub_demo demos accesses the Google PubSub API via its gRPC interface # -# TODO: update the Usage once the usable auth gem is available -# $ SSL_CERT_FILE=<path/to/ssl/certs> \ +# $ GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_key_file> \ +# SSL_CERT_FILE=<path/to/ssl/certs> \ # path/to/pubsub_demo.rb \ -# --service_account_key_file=<path_to_service_account> \ # [--action=<chosen_demo_action> ] # # There are options related to the chosen action, see #parse_args below. @@ -49,6 +48,7 @@ $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) require 'optparse' require 'grpc' +require 'googleauth' require 'google/protobuf' require 'google/protobuf/empty' @@ -59,7 +59,9 @@ require 'tech/pubsub/proto/pubsub_services' def load_prod_cert fail 'could not find a production cert' if ENV['SSL_CERT_FILE'].nil? p "loading prod certs from #{ENV['SSL_CERT_FILE']}" - File.open(ENV['SSL_CERT_FILE']).read + File.open(ENV['SSL_CERT_FILE']) do |f| + return f.read + end end # creates a SSL Credentials from the production certificates. @@ -68,14 +70,9 @@ def ssl_creds end # Builds the metadata authentication update proc. -# -# TODO: replace this once the ruby usable auth repo is available. def auth_proc(opts) - if GRPC::Auth::GCECredentials.on_gce? - return GRPC::Auth::GCECredentials.new.updater_proc - end - fd = StringIO.new(File.read(opts.oauth_key_file)) - GRPC::Auth::ServiceAccountCredentials.new(opts.oauth_scope, fd).updater_proc + auth_creds = Google::Auth.get_application_default(opts.oauth_scope) + return auth_creds.updater_proc end # Creates a stub for accessing the publisher service. @@ -216,14 +213,14 @@ class NamedActions end # Args is used to hold the command line info. -Args = Struct.new(:host, :oauth_scope, :oauth_key_file, :port, :action, - :project_id, :topic_name, :sub_name) +Args = Struct.new(:host, :oauth_scope, :port, :action, :project_id, :topic_name, + :sub_name) # validates the the command line options, returning them as an Arg. def parse_args args = Args.new('pubsub-staging.googleapis.com', 'https://www.googleapis.com/auth/pubsub', - nil, 443, 'list_some_topics', 'stoked-keyword-656') + 443, 'list_some_topics', 'stoked-keyword-656') OptionParser.new do |opts| opts.on('--oauth_scope scope', 'Scope for OAuth tokens') { |v| args['oauth_scope'] = v } @@ -233,10 +230,6 @@ def parse_args opts.on('--server_port SERVER_PORT', 'server port') do |v| args.port = v end - opts.on('--service_account_key_file PATH', - 'Path to the service account json key file') do |v| - args.oauth_key_file = v - end # instance_methods(false) gives only the methods defined in that class. scenes = NamedActions.instance_methods(false).map { |t| t.to_s } @@ -257,15 +250,11 @@ def parse_args end def _check_args(args) - %w(host port action).each do |a| + %w(host port action oauth_scope).each do |a| if args[a].nil? raise OptionParser::MissingArgument.new("please specify --#{a}") end end - if args['oauth_key_file'].nil? || args['oauth_scope'].nil? - fail(OptionParser::MissingArgument, - 'please specify both of --service_account_key_file and --oauth_scope') - end args end diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index 76402b7c89..b0b24b949f 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -48,6 +48,7 @@ require 'minitest' require 'minitest/assertions' require 'grpc' +require 'googleauth' require 'google/protobuf' require 'test/cpp/interop/test_services' @@ -56,7 +57,7 @@ require 'test/cpp/interop/empty' require 'signet/ssl_config' -include GRPC::Auth +AUTH_ENV = Google::Auth::ServiceAccountCredentials::ENV_VAR # loads the certificates used to access the test server securely. def load_test_certs @@ -101,22 +102,14 @@ def create_stub(opts) } # Add service account creds if specified - if %w(all service_account_creds).include?(opts.test_case) + wants_creds = %w(all compute_engine_creds service_account_creds) + if wants_creds.include?(opts.test_case) unless opts.oauth_scope.nil? - fd = StringIO.new(File.read(opts.oauth_key_file)) - logger.info("loading oauth certs from #{opts.oauth_key_file}") - auth_creds = ServiceAccountCredentials.new(opts.oauth_scope, fd) + auth_creds = Google::Auth.get_application_default(opts.oauth_scope) stub_opts[:update_metadata] = auth_creds.updater_proc end end - # Add compute engine creds if specified - if %w(all compute_engine_creds).include?(opts.test_case) - unless opts.oauth_scope.nil? - stub_opts[:update_metadata] = GCECredentials.new.update_proc - end - end - logger.info("... connecting securely to #{address}") Grpc::Testing::TestService::Stub.new(address, **stub_opts) else @@ -193,11 +186,11 @@ class NamedTests def service_account_creds # ignore this test if the oauth options are not set - if @args.oauth_scope.nil? || @args.oauth_key_file.nil? + if @args.oauth_scope.nil? p 'NOT RUN: service_account_creds; no service_account settings' return end - json_key = File.read(@args.oauth_key_file) + json_key = File.read(ENV[AUTH_ENV]) wanted_email = MultiJson.load(json_key)['client_email'] resp = perform_large_unary(fill_username: true, fill_oauth_scope: true) @@ -285,13 +278,13 @@ end # Args is used to hold the command line info. Args = Struct.new(:default_service_account, :host, :host_override, - :oauth_scope, :oauth_key_file, :port, :secure, :test_case, + :oauth_scope, :port, :secure, :test_case, :use_test_ca) # validates the the command line options, returning them as a Hash. def parse_args args = Args.new - args.host_override = 'foo.test.google.com' + args.host_override = 'foo.test.google.fr' OptionParser.new do |opts| opts.on('--oauth_scope scope', 'Scope for OAuth tokens') { |v| args['oauth_scope'] = v } @@ -302,10 +295,6 @@ def parse_args 'email address of the default service account') do |v| args['default_service_account'] = v end - opts.on('--service_account_key_file PATH', - 'Path to the service account json key file') do |v| - args['oauth_key_file'] = v - end opts.on('--server_host_override HOST_OVERRIDE', 'override host via a HTTP header') do |v| args['host_override'] = v @@ -333,10 +322,6 @@ def _check_args(args) fail(OptionParser::MissingArgument, "please specify --#{arg}") end end - if args['oauth_key_file'].nil? ^ args['oauth_scope'].nil? - fail(OptionParser::MissingArgument, - 'please specify both of --service_account_key_file and --oauth_scope') - end args end diff --git a/src/ruby/bin/math_client.rb b/src/ruby/bin/math_client.rb index cb085d4d42..db254efb00 100755 --- a/src/ruby/bin/math_client.rb +++ b/src/ruby/bin/math_client.rb @@ -127,7 +127,7 @@ def main if options['secure'] stub_opts = { :creds => test_creds, - GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com' + GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr' } p stub_opts p options['host'] diff --git a/src/ruby/bin/noproto_client.rb b/src/ruby/bin/noproto_client.rb index 44710520d2..f3fd110347 100755 --- a/src/ruby/bin/noproto_client.rb +++ b/src/ruby/bin/noproto_client.rb @@ -89,7 +89,7 @@ def main if options['secure'] stub_opts = { :creds => test_creds, - GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com' + GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr' } p stub_opts p options['host'] diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index bc59c234e5..25a3ff5ce2 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -22,6 +22,7 @@ Gem::Specification.new do |s| s.add_dependency 'faraday', '~> 0.9' s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' + s.add_dependency 'googleauth', '~> 0.1' s.add_dependency 'logging', '~> 1.8' s.add_dependency 'jwt', '~> 1.2.1' s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index a2a609f59e..dd02ef7666 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -27,8 +27,6 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -require 'grpc/auth/compute_engine.rb' -require 'grpc/auth/service_account.rb' require 'grpc/errors' require 'grpc/grpc' require 'grpc/logconfig' diff --git a/src/ruby/lib/grpc/auth/compute_engine.rb b/src/ruby/lib/grpc/auth/compute_engine.rb deleted file mode 100644 index 5cb1e1a4dc..0000000000 --- a/src/ruby/lib/grpc/auth/compute_engine.rb +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -require 'faraday' -require 'grpc/auth/signet' - -module GRPC - # Module Auth provides classes that provide Google-specific authentication - # used to access Google gRPC services. - module Auth - # Extends Signet::OAuth2::Client so that the auth token is obtained from - # the GCE metadata server. - class GCECredentials < Signet::OAuth2::Client - COMPUTE_AUTH_TOKEN_URI = 'http://metadata/computeMetadata/v1/'\ - 'instance/service-accounts/default/token' - COMPUTE_CHECK_URI = 'http://metadata.google.internal' - - # Detect if this appear to be a GCE instance, by checking if metadata - # is available - def self.on_gce?(options = {}) - c = options[:connection] || Faraday.default_connection - resp = c.get(COMPUTE_CHECK_URI) - return false unless resp.status == 200 - return false unless resp.headers.key?('Metadata-Flavor') - return resp.headers['Metadata-Flavor'] == 'Google' - rescue Faraday::ConnectionFailed - return false - end - - # Overrides the super class method to change how access tokens are - # fetched. - def fetch_access_token(options = {}) - c = options[:connection] || Faraday.default_connection - c.headers = { 'Metadata-Flavor' => 'Google' } - resp = c.get(COMPUTE_AUTH_TOKEN_URI) - Signet::OAuth2.parse_credentials(resp.body, - resp.headers['content-type']) - end - end - end -end diff --git a/src/ruby/lib/grpc/auth/service_account.rb b/src/ruby/lib/grpc/auth/service_account.rb deleted file mode 100644 index 14b81a9e03..0000000000 --- a/src/ruby/lib/grpc/auth/service_account.rb +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -require 'grpc/auth/signet' -require 'multi_json' -require 'openssl' - -# Reads the private key and client email fields from service account JSON key. -def read_json_key(json_key_io) - json_key = MultiJson.load(json_key_io.read) - fail 'missing client_email' unless json_key.key?('client_email') - fail 'missing private_key' unless json_key.key?('private_key') - [json_key['private_key'], json_key['client_email']] -end - -module GRPC - # Module Auth provides classes that provide Google-specific authentication - # used to access Google gRPC services. - module Auth - # Authenticates requests using Google's Service Account credentials. - # (cf https://developers.google.com/accounts/docs/OAuth2ServiceAccount) - class ServiceAccountCredentials < Signet::OAuth2::Client - TOKEN_CRED_URI = 'https://www.googleapis.com/oauth2/v3/token' - AUDIENCE = TOKEN_CRED_URI - - # Initializes a ServiceAccountCredentials. - # - # @param scope [string|array] the scope(s) to access - # @param json_key_io [IO] an IO from which the JSON key can be read - def initialize(scope, json_key_io) - private_key, client_email = read_json_key(json_key_io) - super(token_credential_uri: TOKEN_CRED_URI, - audience: AUDIENCE, - scope: scope, - issuer: client_email, - signing_key: OpenSSL::PKey::RSA.new(private_key)) - end - end - end -end diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index 7fc0d83501..f234984eec 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -400,7 +400,12 @@ module GRPC # @param deadline [TimeConst] def new_active_call(ch, marshal, unmarshal, deadline = nil) absolute_deadline = Core::TimeConsts.from_relative_time(deadline) - call = @ch.create_call(ch, @host, absolute_deadline) + # It should be OK to to pass the hostname:port to create_call, but at + # the moment this fails a security check. This will be corrected. + # + # TODO: # remove this after create_call is updated + host = @host.split(':')[0] + call = @ch.create_call(ch, host, absolute_deadline) ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline, started: false) end diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index d4eb0ed24f..513a53724f 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -29,5 +29,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '0.0.1' + VERSION = '0.5.0' end diff --git a/src/ruby/spec/auth/apply_auth_examples.rb b/src/ruby/spec/auth/apply_auth_examples.rb deleted file mode 100644 index 09b393026f..0000000000 --- a/src/ruby/spec/auth/apply_auth_examples.rb +++ /dev/null @@ -1,163 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -spec_dir = File.expand_path(File.join(File.dirname(__FILE__))) -$LOAD_PATH.unshift(spec_dir) -$LOAD_PATH.uniq! - -require 'faraday' -require 'spec_helper' - -def build_json_response(payload) - [200, - { 'Content-Type' => 'application/json; charset=utf-8' }, - MultiJson.dump(payload)] -end - -WANTED_AUTH_KEY = :Authorization - -shared_examples 'apply/apply! are OK' do - # tests that use these examples need to define - # - # @client which should be an auth client - # - # @make_auth_stubs, which should stub out the expected http behaviour of the - # auth client - describe '#fetch_access_token' do - it 'should set access_token to the fetched value' do - token = '1/abcdef1234567890' - stubs = make_auth_stubs with_access_token: token - c = Faraday.new do |b| - b.adapter(:test, stubs) - end - - @client.fetch_access_token!(connection: c) - expect(@client.access_token).to eq(token) - stubs.verify_stubbed_calls - end - end - - describe '#apply!' do - it 'should update the target hash with fetched access token' do - token = '1/abcdef1234567890' - stubs = make_auth_stubs with_access_token: token - c = Faraday.new do |b| - b.adapter(:test, stubs) - end - - md = { foo: 'bar' } - @client.apply!(md, connection: c) - want = { :foo => 'bar', WANTED_AUTH_KEY => "Bearer #{token}" } - expect(md).to eq(want) - stubs.verify_stubbed_calls - end - end - - describe 'updater_proc' do - it 'should provide a proc that updates a hash with the access token' do - token = '1/abcdef1234567890' - stubs = make_auth_stubs with_access_token: token - c = Faraday.new do |b| - b.adapter(:test, stubs) - end - - md = { foo: 'bar' } - the_proc = @client.updater_proc - got = the_proc.call(md, connection: c) - want = { :foo => 'bar', WANTED_AUTH_KEY => "Bearer #{token}" } - expect(got).to eq(want) - stubs.verify_stubbed_calls - end - end - - describe '#apply' do - it 'should not update the original hash with the access token' do - token = '1/abcdef1234567890' - stubs = make_auth_stubs with_access_token: token - c = Faraday.new do |b| - b.adapter(:test, stubs) - end - - md = { foo: 'bar' } - @client.apply(md, connection: c) - want = { foo: 'bar' } - expect(md).to eq(want) - stubs.verify_stubbed_calls - end - - it 'should add the token to the returned hash' do - token = '1/abcdef1234567890' - stubs = make_auth_stubs with_access_token: token - c = Faraday.new do |b| - b.adapter(:test, stubs) - end - - md = { foo: 'bar' } - got = @client.apply(md, connection: c) - want = { :foo => 'bar', WANTED_AUTH_KEY => "Bearer #{token}" } - expect(got).to eq(want) - stubs.verify_stubbed_calls - end - - it 'should not fetch a new token if the current is not expired' do - token = '1/abcdef1234567890' - stubs = make_auth_stubs with_access_token: token - c = Faraday.new do |b| - b.adapter(:test, stubs) - end - - n = 5 # arbitrary - n.times do |_t| - md = { foo: 'bar' } - got = @client.apply(md, connection: c) - want = { :foo => 'bar', WANTED_AUTH_KEY => "Bearer #{token}" } - expect(got).to eq(want) - end - stubs.verify_stubbed_calls - end - - it 'should fetch a new token if the current one is expired' do - token_1 = '1/abcdef1234567890' - token_2 = '2/abcdef1234567890' - - [token_1, token_2].each do |t| - stubs = make_auth_stubs with_access_token: t - c = Faraday.new do |b| - b.adapter(:test, stubs) - end - md = { foo: 'bar' } - got = @client.apply(md, connection: c) - want = { :foo => 'bar', WANTED_AUTH_KEY => "Bearer #{t}" } - expect(got).to eq(want) - stubs.verify_stubbed_calls - @client.expires_at -= 3601 # default is to expire in 1hr - end - end - end -end diff --git a/src/ruby/spec/auth/compute_engine_spec.rb b/src/ruby/spec/auth/compute_engine_spec.rb deleted file mode 100644 index c43214d086..0000000000 --- a/src/ruby/spec/auth/compute_engine_spec.rb +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -spec_dir = File.expand_path(File.join(File.dirname(__FILE__))) -$LOAD_PATH.unshift(spec_dir) -$LOAD_PATH.uniq! - -require 'apply_auth_examples' -require 'faraday' -require 'grpc/auth/compute_engine' -require 'spec_helper' - -describe GRPC::Auth::GCECredentials do - MD_URI = '/computeMetadata/v1/instance/service-accounts/default/token' - GCECredentials = GRPC::Auth::GCECredentials - - before(:example) do - @client = GCECredentials.new - end - - def make_auth_stubs(with_access_token: '') - Faraday::Adapter::Test::Stubs.new do |stub| - stub.get(MD_URI) do |env| - headers = env[:request_headers] - expect(headers['Metadata-Flavor']).to eq('Google') - build_json_response( - 'access_token' => with_access_token, - 'token_type' => 'Bearer', - 'expires_in' => 3600) - end - end - end - - it_behaves_like 'apply/apply! are OK' - - describe '#on_gce?' do - it 'should be true when Metadata-Flavor is Google' do - stubs = Faraday::Adapter::Test::Stubs.new do |stub| - stub.get('/') do |_env| - [200, - { 'Metadata-Flavor' => 'Google' }, - ''] - end - end - c = Faraday.new do |b| - b.adapter(:test, stubs) - end - expect(GCECredentials.on_gce?(connection: c)).to eq(true) - stubs.verify_stubbed_calls - end - - it 'should be false when Metadata-Flavor is not Google' do - stubs = Faraday::Adapter::Test::Stubs.new do |stub| - stub.get('/') do |_env| - [200, - { 'Metadata-Flavor' => 'NotGoogle' }, - ''] - end - end - c = Faraday.new do |b| - b.adapter(:test, stubs) - end - expect(GCECredentials.on_gce?(connection: c)).to eq(false) - stubs.verify_stubbed_calls - end - - it 'should be false if the response is not 200' do - stubs = Faraday::Adapter::Test::Stubs.new do |stub| - stub.get('/') do |_env| - [404, - { 'Metadata-Flavor' => 'Google' }, - ''] - end - end - c = Faraday.new do |b| - b.adapter(:test, stubs) - end - expect(GCECredentials.on_gce?(connection: c)).to eq(false) - stubs.verify_stubbed_calls - end - end -end diff --git a/src/ruby/spec/auth/service_account_spec.rb b/src/ruby/spec/auth/service_account_spec.rb deleted file mode 100644 index 2f14a1ae05..0000000000 --- a/src/ruby/spec/auth/service_account_spec.rb +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -spec_dir = File.expand_path(File.join(File.dirname(__FILE__))) -$LOAD_PATH.unshift(spec_dir) -$LOAD_PATH.uniq! - -require 'apply_auth_examples' -require 'grpc/auth/service_account' -require 'jwt' -require 'multi_json' -require 'openssl' -require 'spec_helper' - -describe GRPC::Auth::ServiceAccountCredentials do - before(:example) do - @key = OpenSSL::PKey::RSA.new(2048) - cred_json = { - private_key_id: 'a_private_key_id', - private_key: @key.to_pem, - client_email: 'app@developer.gserviceaccount.com', - client_id: 'app.apps.googleusercontent.com', - type: 'service_account' - } - cred_json_text = MultiJson.dump(cred_json) - @client = GRPC::Auth::ServiceAccountCredentials.new( - 'https://www.googleapis.com/auth/userinfo.profile', - StringIO.new(cred_json_text)) - end - - def make_auth_stubs(with_access_token: '') - Faraday::Adapter::Test::Stubs.new do |stub| - stub.post('/oauth2/v3/token') do |env| - params = Addressable::URI.form_unencode(env[:body]) - _claim, _header = JWT.decode(params.assoc('assertion').last, - @key.public_key) - want = ['grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer'] - expect(params.assoc('grant_type')).to eq(want) - build_json_response( - 'access_token' => with_access_token, - 'token_type' => 'Bearer', - 'expires_in' => 3600 - ) - end - end - end - - it_behaves_like 'apply/apply! are OK' -end diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index 52c985786a..030ff328f2 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -353,7 +353,7 @@ describe 'the secure http client/server' do @server = GRPC::Core::Server.new(@server_queue, nil, server_creds) server_port = @server.add_http2_port(server_host, true) @server.start - args = { Channel::SSL_TARGET => 'foo.test.google.com' } + args = { Channel::SSL_TARGET => 'foo.test.google.fr' } @ch = Channel.new("0.0.0.0:#{server_port}", args, GRPC::Core::Credentials.new(certs[0], nil, nil)) end diff --git a/src/ruby/spec/credentials_spec.rb b/src/ruby/spec/credentials_spec.rb index 001fecd12b..fc97d11a87 100644 --- a/src/ruby/spec/credentials_spec.rb +++ b/src/ruby/spec/credentials_spec.rb @@ -68,10 +68,4 @@ describe Credentials do expect { cred1.compose(cred2) }.to_not raise_error end end - - describe 'Credentials#default' do - it 'is not implemented yet' do - expect { Credentials.default }.to raise_error RuntimeError - end - end end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 297a133831..adf354f4ee 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -116,7 +116,7 @@ describe 'ClientStub' do host = FAKE_HOST blk = proc do opts = { - GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.com', + GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr', a_channel_arg: 'an_arg', creds: GRPC::Core::Credentials.new(certs[0], nil, nil) } |