diff options
Diffstat (limited to 'src')
95 files changed, 3624 insertions, 846 deletions
diff --git a/src/compiler/csharp_generator_helpers.h b/src/compiler/csharp_generator_helpers.h index 1370627633..5639ea058b 100644 --- a/src/compiler/csharp_generator_helpers.h +++ b/src/compiler/csharp_generator_helpers.h @@ -41,7 +41,7 @@ namespace grpc_csharp_generator { inline bool ServicesFilename(const grpc::protobuf::FileDescriptor *file, grpc::string *file_name_or_error) { - *file_name_or_error = grpc_generator::FileNameInUpperCamel(file) + "Grpc.cs"; + *file_name_or_error = grpc_generator::FileNameInUpperCamel(file, false) + "Grpc.cs"; return true; } diff --git a/src/compiler/generator_helpers.h b/src/compiler/generator_helpers.h index 68b807b057..e1bb66a875 100644 --- a/src/compiler/generator_helpers.h +++ b/src/compiler/generator_helpers.h @@ -125,16 +125,23 @@ inline grpc::string LowerUnderscoreToUpperCamel(grpc::string str) { return result; } -inline grpc::string FileNameInUpperCamel(const grpc::protobuf::FileDescriptor *file) { +inline grpc::string FileNameInUpperCamel(const grpc::protobuf::FileDescriptor *file, + bool include_package_path) { std::vector<grpc::string> tokens = tokenize(StripProto(file->name()), "/"); grpc::string result = ""; - for (unsigned int i = 0; i < tokens.size() - 1; i++) { - result += tokens[i] + "/"; + if (include_package_path) { + for (unsigned int i = 0; i < tokens.size() - 1; i++) { + result += tokens[i] + "/"; + } } result += LowerUnderscoreToUpperCamel(tokens.back()); return result; } +inline grpc::string FileNameInUpperCamel(const grpc::protobuf::FileDescriptor *file) { + return FileNameInUpperCamel(file, true); +} + enum MethodType { METHODTYPE_NO_STREAMING, METHODTYPE_CLIENT_STREAMING, diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc index 483c6573a8..a3157db0fb 100644 --- a/src/compiler/objective_c_generator.cc +++ b/src/compiler/objective_c_generator.cc @@ -44,7 +44,6 @@ using ::google::protobuf::compiler::objectivec::ClassName; using ::grpc::protobuf::io::Printer; using ::grpc::protobuf::MethodDescriptor; using ::grpc::protobuf::ServiceDescriptor; -using ::grpc::string; using ::std::map; namespace grpc_objective_c_generator { @@ -52,7 +51,7 @@ namespace { void PrintProtoRpcDeclarationAsPragma(Printer *printer, const MethodDescriptor *method, - map<string, string> vars) { + map< ::grpc::string, ::grpc::string> vars) { vars["client_stream"] = method->client_streaming() ? "stream " : ""; vars["server_stream"] = method->server_streaming() ? "stream " : ""; @@ -62,7 +61,7 @@ void PrintProtoRpcDeclarationAsPragma(Printer *printer, } void PrintMethodSignature(Printer *printer, const MethodDescriptor *method, - const map<string, string> &vars) { + const map< ::grpc::string, ::grpc::string> &vars) { // TODO(jcanizales): Print method comments. printer->Print(vars, "- ($return_type$)$method_name$With"); @@ -85,7 +84,7 @@ void PrintMethodSignature(Printer *printer, const MethodDescriptor *method, } void PrintSimpleSignature(Printer *printer, const MethodDescriptor *method, - map<string, string> vars) { + map< ::grpc::string, ::grpc::string> vars) { vars["method_name"] = grpc_generator::LowercaseFirstLetter(vars["method_name"]); vars["return_type"] = "void"; @@ -93,14 +92,14 @@ void PrintSimpleSignature(Printer *printer, const MethodDescriptor *method, } void PrintAdvancedSignature(Printer *printer, const MethodDescriptor *method, - map<string, string> vars) { + map< ::grpc::string, ::grpc::string> vars) { vars["method_name"] = "RPCTo" + vars["method_name"]; vars["return_type"] = "ProtoRPC *"; PrintMethodSignature(printer, method, vars); } -inline map<string, string> GetMethodVars(const MethodDescriptor *method) { - map<string, string> res; +inline map< ::grpc::string, ::grpc::string> GetMethodVars(const MethodDescriptor *method) { + map< ::grpc::string, ::grpc::string> res; res["method_name"] = method->name(); res["request_type"] = method->input_type()->name(); res["response_type"] = method->output_type()->name(); @@ -110,7 +109,7 @@ inline map<string, string> GetMethodVars(const MethodDescriptor *method) { } void PrintMethodDeclarations(Printer *printer, const MethodDescriptor *method) { - map<string, string> vars = GetMethodVars(method); + map< ::grpc::string, ::grpc::string> vars = GetMethodVars(method); PrintProtoRpcDeclarationAsPragma(printer, method, vars); @@ -121,7 +120,7 @@ void PrintMethodDeclarations(Printer *printer, const MethodDescriptor *method) { } void PrintSimpleImplementation(Printer *printer, const MethodDescriptor *method, - map<string, string> vars) { + map< ::grpc::string, ::grpc::string> vars) { printer->Print("{\n"); printer->Print(vars, " [[self RPCTo$method_name$With"); if (method->client_streaming()) { @@ -139,7 +138,7 @@ void PrintSimpleImplementation(Printer *printer, const MethodDescriptor *method, void PrintAdvancedImplementation(Printer *printer, const MethodDescriptor *method, - map<string, string> vars) { + map< ::grpc::string, ::grpc::string> vars) { printer->Print("{\n"); printer->Print(vars, " return [self RPCToMethod:@\"$method_name$\"\n"); @@ -164,7 +163,7 @@ void PrintAdvancedImplementation(Printer *printer, void PrintMethodImplementations(Printer *printer, const MethodDescriptor *method) { - map<string, string> vars = GetMethodVars(method); + map< ::grpc::string, ::grpc::string> vars = GetMethodVars(method); PrintProtoRpcDeclarationAsPragma(printer, method, vars); @@ -179,14 +178,14 @@ void PrintMethodImplementations(Printer *printer, } // namespace -string GetHeader(const ServiceDescriptor *service) { - string output; +::grpc::string GetHeader(const ServiceDescriptor *service) { + ::grpc::string output; { // Scope the output stream so it closes and finalizes output to the string. grpc::protobuf::io::StringOutputStream output_stream(&output); Printer printer(&output_stream, '$'); - map<string, string> vars = {{"service_class", ServiceClassName(service)}}; + map< ::grpc::string, ::grpc::string> vars = {{"service_class", ServiceClassName(service)}}; printer.Print(vars, "@protocol $service_class$ <NSObject>\n\n"); @@ -209,14 +208,14 @@ string GetHeader(const ServiceDescriptor *service) { return output; } -string GetSource(const ServiceDescriptor *service) { - string output; +::grpc::string GetSource(const ServiceDescriptor *service) { + ::grpc::string output; { // Scope the output stream so it closes and finalizes output to the string. grpc::protobuf::io::StringOutputStream output_stream(&output); Printer printer(&output_stream, '$'); - map<string, string> vars = {{"service_name", service->name()}, + map< ::grpc::string,::grpc::string> vars = {{"service_name", service->name()}, {"service_class", ServiceClassName(service)}, {"package", service->file()->package()}}; diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc index 10f06ad4df..17440358bb 100644 --- a/src/compiler/objective_c_plugin.cc +++ b/src/compiler/objective_c_plugin.cc @@ -39,44 +39,43 @@ #include "src/compiler/objective_c_generator.h" #include "src/compiler/objective_c_generator_helpers.h" -using ::grpc::string; - class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { public: ObjectiveCGrpcGenerator() {} virtual ~ObjectiveCGrpcGenerator() {} virtual bool Generate(const grpc::protobuf::FileDescriptor *file, - const string ¶meter, + const ::grpc::string ¶meter, grpc::protobuf::compiler::GeneratorContext *context, - string *error) const { + ::grpc::string *error) const { if (file->service_count() == 0) { // No services. Do nothing. return true; } - string file_name = grpc_generator::FileNameInUpperCamel(file); - string prefix = file->options().objc_class_prefix(); + ::grpc::string file_name = grpc_generator::FileNameInUpperCamel(file); + ::grpc::string prefix = file->options().objc_class_prefix(); { // Generate .pbrpc.h - string imports = string("#import \"") + file_name + ".pbobjc.h\"\n\n" + ::grpc::string imports = ::grpc::string("#import \"") + file_name + + ".pbobjc.h\"\n\n" "#import <ProtoRPC/ProtoService.h>\n" "#import <RxLibrary/GRXWriteable.h>\n" "#import <RxLibrary/GRXWriter.h>\n"; // TODO(jcanizales): Instead forward-declare the input and output types // and import the files in the .pbrpc.m - string proto_imports; + ::grpc::string proto_imports; for (int i = 0; i < file->dependency_count(); i++) { - string header = grpc_objective_c_generator::MessageHeaderName( + ::grpc::string header = grpc_objective_c_generator::MessageHeaderName( file->dependency(i)); - proto_imports += string("#import \"") + header + "\"\n"; + proto_imports += ::grpc::string("#import \"") + header + "\"\n"; } - string declarations; + ::grpc::string declarations; for (int i = 0; i < file->service_count(); i++) { const grpc::protobuf::ServiceDescriptor *service = file->service(i); declarations += grpc_objective_c_generator::GetHeader(service); @@ -89,11 +88,12 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { { // Generate .pbrpc.m - string imports = string("#import \"") + file_name + ".pbrpc.h\"\n\n" + ::grpc::string imports = ::grpc::string("#import \"") + file_name + + ".pbrpc.h\"\n\n" "#import <ProtoRPC/ProtoRPC.h>\n" "#import <RxLibrary/GRXWriter+Immediate.h>\n"; - string definitions; + ::grpc::string definitions; for (int i = 0; i < file->service_count(); i++) { const grpc::protobuf::ServiceDescriptor *service = file->service(i); definitions += grpc_objective_c_generator::GetSource(service); @@ -108,7 +108,7 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { private: // Write the given code into the given file. void Write(grpc::protobuf::compiler::GeneratorContext *context, - const string &filename, const string &code) const { + const ::grpc::string &filename, const ::grpc::string &code) const { std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output( context->Open(filename)); grpc::protobuf::io::CodedOutputStream coded_out(output.get()); diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c new file mode 100644 index 0000000000..a8397a3da1 --- /dev/null +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -0,0 +1,501 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/resolvers/zookeeper_resolver.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include <grpc/grpc_zookeeper.h> +#include <zookeeper/zookeeper.h> + +#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/resolver_registry.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/support/string.h" +#include "src/core/json/json.h" + +/** Zookeeper session expiration time in milliseconds */ +#define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000 + +typedef struct { + /** base class: must be first */ + grpc_resolver base; + /** refcount */ + gpr_refcount refs; + /** name to resolve */ + char *name; + /** subchannel factory */ + grpc_subchannel_factory *subchannel_factory; + /** load balancing policy factory */ + grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, + size_t num_subchannels); + + /** mutex guarding the rest of the state */ + gpr_mu mu; + /** are we currently resolving? */ + int resolving; + /** which version of resolved_config have we published? */ + int published_version; + /** which version of resolved_config is current? */ + int resolved_version; + /** pending next completion, or NULL */ + grpc_iomgr_closure *next_completion; + /** target config address for next completion */ + grpc_client_config **target_config; + /** current (fully resolved) config */ + grpc_client_config *resolved_config; + + /** zookeeper handle */ + zhandle_t *zookeeper_handle; + /** zookeeper resolved addresses */ + grpc_resolved_addresses *resolved_addrs; + /** total number of addresses to be resolved */ + int resolved_total; + /** number of addresses resolved */ + int resolved_num; +} zookeeper_resolver; + +static void zookeeper_destroy(grpc_resolver *r); + +static void zookeeper_start_resolving_locked(zookeeper_resolver *r); +static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r); + +static void zookeeper_shutdown(grpc_resolver *r); +static void zookeeper_channel_saw_error(grpc_resolver *r, + struct sockaddr *failing_address, + int failing_address_len); +static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config, + grpc_iomgr_closure *on_complete); + +static const grpc_resolver_vtable zookeeper_resolver_vtable = { + zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, + zookeeper_next}; + +static void zookeeper_shutdown(grpc_resolver *resolver) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->next_completion != NULL) { + *r->target_config = NULL; + grpc_iomgr_add_callback(r->next_completion); + r->next_completion = NULL; + } + zookeeper_close(r->zookeeper_handle); + gpr_mu_unlock(&r->mu); +} + +static void zookeeper_channel_saw_error(grpc_resolver *resolver, + struct sockaddr *sa, int len) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; + gpr_mu_lock(&r->mu); + if (r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); +} + +static void zookeeper_next(grpc_resolver *resolver, + grpc_client_config **target_config, + grpc_iomgr_closure *on_complete) { + zookeeper_resolver *r = (zookeeper_resolver *)resolver; + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->next_completion == NULL); + r->next_completion = on_complete; + r->target_config = target_config; + if (r->resolved_version == 0 && r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } else { + zookeeper_maybe_finish_next_locked(r); + } + gpr_mu_unlock(&r->mu); +} + +/** Zookeeper global watcher for connection management + TODO: better connection management besides logs */ +static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type, + int state, const char *path, + void *watcher_ctx) { + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_EXPIRED_SESSION_STATE) { + gpr_log(GPR_ERROR, "Zookeeper session expired"); + } else if (state == ZOO_AUTH_FAILED_STATE) { + gpr_log(GPR_ERROR, "Zookeeper authentication failed"); + } + } +} + +/** Zookeeper watcher triggered by changes to watched nodes + Once triggered, it tries to resolve again to get updated addresses */ +static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, + const char *path, void *watcher_ctx) { + if (watcher_ctx != NULL) { + zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx; + if (state == ZOO_CONNECTED_STATE) { + gpr_mu_lock(&r->mu); + if (r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); + } + } +} + +/** Callback function after getting all resolved addresses + Creates a subchannel for each address */ +static void zookeeper_on_resolved(void *arg, + grpc_resolved_addresses *addresses) { + zookeeper_resolver *r = arg; + grpc_client_config *config = NULL; + grpc_subchannel **subchannels; + grpc_subchannel_args args; + grpc_lb_policy *lb_policy; + size_t i; + if (addresses != NULL) { + config = grpc_client_config_create(); + subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); + for (i = 0; i < addresses->naddrs; i++) { + memset(&args, 0, sizeof(args)); + args.addr = (struct sockaddr *)(addresses->addrs[i].addr); + args.addr_len = addresses->addrs[i].len; + subchannels[i] = grpc_subchannel_factory_create_subchannel( + r->subchannel_factory, &args); + } + lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs); + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(lb_policy, "construction"); + grpc_resolved_addresses_destroy(addresses); + gpr_free(subchannels); + } + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->resolving == 1); + r->resolving = 0; + if (r->resolved_config != NULL) { + grpc_client_config_unref(r->resolved_config); + } + r->resolved_config = config; + r->resolved_version++; + zookeeper_maybe_finish_next_locked(r); + gpr_mu_unlock(&r->mu); + + GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving"); +} + +/** Callback function for each DNS resolved address */ +static void zookeeper_dns_resolved(void *arg, + grpc_resolved_addresses *addresses) { + size_t i; + zookeeper_resolver *r = arg; + int resolve_done = 0; + + gpr_mu_lock(&r->mu); + r->resolved_num++; + r->resolved_addrs->addrs = + gpr_realloc(r->resolved_addrs->addrs, + sizeof(grpc_resolved_address) * + (r->resolved_addrs->naddrs + addresses->naddrs)); + for (i = 0; i < addresses->naddrs; i++) { + memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr, + addresses->addrs[i].addr, addresses->addrs[i].len); + r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len = + addresses->addrs[i].len; + } + + r->resolved_addrs->naddrs += addresses->naddrs; + grpc_resolved_addresses_destroy(addresses); + + /** Wait for all addresses to be resolved */ + resolve_done = (r->resolved_num == r->resolved_total); + gpr_mu_unlock(&r->mu); + if (resolve_done) { + zookeeper_on_resolved(r, r->resolved_addrs); + } +} + +/** Parses JSON format address of a zookeeper node */ +static char *zookeeper_parse_address(const char *value, int value_len) { + grpc_json *json; + grpc_json *cur; + const char *host; + const char *port; + char* buffer; + char *address = NULL; + + buffer = gpr_malloc(value_len); + memcpy(buffer, value, value_len); + json = grpc_json_parse_string_with_len(buffer, value_len); + if (json != NULL) { + host = NULL; + port = NULL; + for (cur = json->child; cur != NULL; cur = cur->next) { + if (!strcmp(cur->key, "host")) { + host = cur->value; + if (port != NULL) { + break; + } + } else if (!strcmp(cur->key, "port")) { + port = cur->value; + if (host != NULL) { + break; + } + } + } + if (host != NULL && port != NULL) { + gpr_asprintf(&address, "%s:%s", host, port); + } + grpc_json_destroy(json); + } + gpr_free(buffer); + + return address; +} + +static void zookeeper_get_children_node_completion(int rc, const char *value, + int value_len, + const struct Stat *stat, + const void *arg) { + char *address = NULL; + zookeeper_resolver *r = (zookeeper_resolver *)arg; + int resolve_done = 0; + + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name); + return; + } + + address = zookeeper_parse_address(value, value_len); + if (address != NULL) { + /** Further resolves address by DNS */ + grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + gpr_free(address); + } else { + gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name); + gpr_mu_lock(&r->mu); + r->resolved_total--; + resolve_done = (r->resolved_num == r->resolved_total); + gpr_mu_unlock(&r->mu); + if (resolve_done) { + zookeeper_on_resolved(r, r->resolved_addrs); + } + } +} + +static void zookeeper_get_children_completion( + int rc, const struct String_vector *children, const void *arg) { + char *path; + int status; + int i; + zookeeper_resolver *r = (zookeeper_resolver *)arg; + + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + return; + } + + if (children->count == 0) { + gpr_log(GPR_ERROR, "Error in resolving zookeeper address %s", r->name); + return; + } + + r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->resolved_addrs->addrs = NULL; + r->resolved_addrs->naddrs = 0; + r->resolved_total = children->count; + + /** TODO: Replace expensive heap allocation with stack + if we can get maximum length of zookeeper path */ + for (i = 0; i < children->count; i++) { + gpr_asprintf(&path, "%s/%s", r->name, children->data[i]); + status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r, + zookeeper_get_children_node_completion, r); + gpr_free(path); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path); + } + } +} + +static void zookeeper_get_node_completion(int rc, const char *value, + int value_len, + const struct Stat *stat, + const void *arg) { + int status; + char *address = NULL; + zookeeper_resolver *r = (zookeeper_resolver *)arg; + r->resolved_addrs = NULL; + r->resolved_total = 0; + r->resolved_num = 0; + + if (rc != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); + return; + } + + /** If zookeeper node of path r->name does not have address + (i.e. service node), get its children */ + address = zookeeper_parse_address(value, value_len); + if (address != NULL) { + r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + r->resolved_addrs->addrs = NULL; + r->resolved_addrs->naddrs = 0; + r->resolved_total = 1; + /** Further resolves address by DNS */ + grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + gpr_free(address); + return; + } + + status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher, + r, zookeeper_get_children_completion, r); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + } +} + +static void zookeeper_resolve_address(zookeeper_resolver *r) { + int status; + status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r, + zookeeper_get_node_completion, r); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); + } +} + +static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { + GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving"); + GPR_ASSERT(r->resolving == 0); + r->resolving = 1; + zookeeper_resolve_address(r); +} + +static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { + if (r->next_completion != NULL && + r->resolved_version != r->published_version) { + *r->target_config = r->resolved_config; + if (r->resolved_config != NULL) { + grpc_client_config_ref(r->resolved_config); + } + grpc_iomgr_add_callback(r->next_completion); + r->next_completion = NULL; + r->published_version = r->resolved_version; + } +} + +static void zookeeper_destroy(grpc_resolver *gr) { + zookeeper_resolver *r = (zookeeper_resolver *)gr; + gpr_mu_destroy(&r->mu); + if (r->resolved_config != NULL) { + grpc_client_config_unref(r->resolved_config); + } + grpc_subchannel_factory_unref(r->subchannel_factory); + gpr_free(r->name); + gpr_free(r); +} + +static grpc_resolver *zookeeper_create( + grpc_uri *uri, + grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, + size_t num_subchannels), + grpc_subchannel_factory *subchannel_factory) { + zookeeper_resolver *r; + size_t length; + char *path = uri->path; + + if (0 == strcmp(uri->authority, "")) { + gpr_log(GPR_ERROR, "No authority specified in zookeeper uri"); + return NULL; + } + + /** Removes the trailing slash if exists */ + length = strlen(path); + if (length > 1 && path[length - 1] == '/') { + path[length - 1] = 0; + } + + r = gpr_malloc(sizeof(zookeeper_resolver)); + memset(r, 0, sizeof(*r)); + gpr_ref_init(&r->refs, 1); + gpr_mu_init(&r->mu); + grpc_resolver_init(&r->base, &zookeeper_resolver_vtable); + r->name = gpr_strdup(path); + + r->subchannel_factory = subchannel_factory; + r->lb_policy_factory = lb_policy_factory; + grpc_subchannel_factory_ref(subchannel_factory); + + /** Initializes zookeeper client */ + zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); + r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_global_watcher, + GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); + if (r->zookeeper_handle == NULL) { + gpr_log(GPR_ERROR, "Unable to connect to zookeeper server"); + return NULL; + } + + return &r->base; +} + +static void zookeeper_plugin_init() { + grpc_register_resolver_type("zookeeper", + grpc_zookeeper_resolver_factory_create()); +} + +void grpc_zookeeper_register() { + grpc_register_plugin(zookeeper_plugin_init, NULL); +} + +/* + * FACTORY + */ + +static void zookeeper_factory_ref(grpc_resolver_factory *factory) {} + +static void zookeeper_factory_unref(grpc_resolver_factory *factory) {} + +static grpc_resolver *zookeeper_factory_create_resolver( + grpc_resolver_factory *factory, grpc_uri *uri, + grpc_subchannel_factory *subchannel_factory) { + return zookeeper_create(uri, grpc_create_pick_first_lb_policy, + subchannel_factory); +} + +static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { + zookeeper_factory_ref, zookeeper_factory_unref, + zookeeper_factory_create_resolver}; +static grpc_resolver_factory zookeeper_resolver_factory = { + &zookeeper_factory_vtable}; + +grpc_resolver_factory *grpc_zookeeper_resolver_factory_create() { + return &zookeeper_resolver_factory; +} diff --git a/src/core/client_config/resolvers/zookeeper_resolver.h b/src/core/client_config/resolvers/zookeeper_resolver.h new file mode 100644 index 0000000000..a6f002dd6d --- /dev/null +++ b/src/core/client_config/resolvers/zookeeper_resolver.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H + +#include "src/core/client_config/resolver_factory.h" + +/** Create a zookeeper resolver factory */ +grpc_resolver_factory *grpc_zookeeper_resolver_factory_create(void); + +#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H */ diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index dbf4721d13..8d4f3b9c76 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -44,7 +44,7 @@ int grpc_compression_algorithm_parse(const char* name, size_t name_length, if (name_length == 0) { return 0; } - if (strncmp(name, "none", name_length) == 0) { + if (strncmp(name, "identity", name_length) == 0) { *algorithm = GRPC_COMPRESS_NONE; } else if (strncmp(name, "gzip", name_length) == 0) { *algorithm = GRPC_COMPRESS_GZIP; @@ -60,7 +60,7 @@ int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, char **name) { switch (algorithm) { case GRPC_COMPRESS_NONE: - *name = "none"; + *name = "identity"; break; case GRPC_COMPRESS_DEFLATE: *name = "deflate"; diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c new file mode 100644 index 0000000000..db0aef8120 --- /dev/null +++ b/src/core/iomgr/udp_server.c @@ -0,0 +1,438 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_SOCKET + +#include "src/core/iomgr/udp_server.h" + +#include <errno.h> +#include <fcntl.h> +#include <limits.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/un.h> +#include <unistd.h> + +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/support/string.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> + +#define INIT_PORT_CAP 2 + +/* one listening port */ +typedef struct { + int fd; + grpc_fd *emfd; + grpc_udp_server *server; + union { + gpr_uint8 untyped[GRPC_MAX_SOCKADDR_SIZE]; + struct sockaddr sockaddr; + struct sockaddr_un un; + } addr; + int addr_len; + grpc_iomgr_closure read_closure; + grpc_iomgr_closure destroyed_closure; + grpc_udp_server_read_cb read_cb; +} server_port; + +static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) { + struct stat st; + + if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) { + unlink(un->sun_path); + } +} + +/* the overall server */ +struct grpc_udp_server { + grpc_udp_server_cb cb; + void *cb_arg; + + gpr_mu mu; + gpr_cv cv; + + /* active port count: how many ports are actually still listening */ + size_t active_ports; + /* destroyed port count: how many ports are completely destroyed */ + size_t destroyed_ports; + + /* is this server shutting down? (boolean) */ + int shutdown; + + /* all listening ports */ + server_port *ports; + size_t nports; + size_t port_capacity; + + /* shutdown callback */ + void (*shutdown_complete)(void *); + void *shutdown_complete_arg; + + /* all pollsets interested in new connections */ + grpc_pollset **pollsets; + /* number of pollsets in the pollsets array */ + size_t pollset_count; +}; + +grpc_udp_server *grpc_udp_server_create(void) { + grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); + gpr_mu_init(&s->mu); + gpr_cv_init(&s->cv); + s->active_ports = 0; + s->destroyed_ports = 0; + s->shutdown = 0; + s->cb = NULL; + s->cb_arg = NULL; + s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->nports = 0; + s->port_capacity = INIT_PORT_CAP; + + return s; +} + +static void finish_shutdown(grpc_udp_server *s) { + s->shutdown_complete(s->shutdown_complete_arg); + + gpr_mu_destroy(&s->mu); + gpr_cv_destroy(&s->cv); + + gpr_free(s->ports); + gpr_free(s); +} + +static void destroyed_port(void *server, int success) { + grpc_udp_server *s = server; + gpr_mu_lock(&s->mu); + s->destroyed_ports++; + if (s->destroyed_ports == s->nports) { + gpr_mu_unlock(&s->mu); + finish_shutdown(s); + } else { + gpr_mu_unlock(&s->mu); + } +} + +static void dont_care_about_shutdown_completion(void *ignored) {} + +/* called when all listening endpoints have been shutdown, so no further + events will be received on them - at this point it's safe to destroy + things */ +static void deactivated_all_ports(grpc_udp_server *s) { + size_t i; + + /* delete ALL the things */ + gpr_mu_lock(&s->mu); + + if (!s->shutdown) { + gpr_mu_unlock(&s->mu); + return; + } + + if (s->nports) { + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + if (sp->addr.sockaddr.sa_family == AF_UNIX) { + unlink_if_unix_domain_socket(&sp->addr.un); + } + sp->destroyed_closure.cb = destroyed_port; + sp->destroyed_closure.cb_arg = s; + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown"); + } + gpr_mu_unlock(&s->mu); + } else { + gpr_mu_unlock(&s->mu); + finish_shutdown(s); + } +} + +void grpc_udp_server_destroy( + grpc_udp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg), + void *shutdown_complete_arg) { + size_t i; + gpr_mu_lock(&s->mu); + + GPR_ASSERT(!s->shutdown); + s->shutdown = 1; + + s->shutdown_complete = shutdown_complete + ? shutdown_complete + : dont_care_about_shutdown_completion; + s->shutdown_complete_arg = shutdown_complete_arg; + + /* shutdown all fd's */ + if (s->active_ports) { + for (i = 0; i < s->nports; i++) { + grpc_fd_shutdown(s->ports[i].emfd); + } + gpr_mu_unlock(&s->mu); + } else { + gpr_mu_unlock(&s->mu); + deactivated_all_ports(s); + } +} + +/* Prepare a recently-created socket for listening. */ +static int prepare_socket(int fd, const struct sockaddr *addr, int addr_len) { + struct sockaddr_storage sockname_temp; + socklen_t sockname_len; + int get_local_ip; + int rc; + + if (fd < 0) { + goto error; + } + + get_local_ip = 1; + rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, + &get_local_ip, sizeof(get_local_ip)); + if (rc == 0 && addr->sa_family == AF_INET6) { + rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, + &get_local_ip, sizeof(get_local_ip)); + } + + if (bind(fd, addr, addr_len) < 0) { + char *addr_str; + grpc_sockaddr_to_string(&addr_str, addr, 0); + gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); + gpr_free(addr_str); + goto error; + } + + sockname_len = sizeof(sockname_temp); + if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) { + goto error; + } + + return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + +error: + if (fd >= 0) { + close(fd); + } + return -1; +} + +/* event manager callback when reads are ready */ +static void on_read(void *arg, int success) { + server_port *sp = arg; + + if (success == 0) { + gpr_mu_lock(&sp->server->mu); + if (0 == --sp->server->active_ports) { + gpr_mu_unlock(&sp->server->mu); + deactivated_all_ports(sp->server); + } else { + gpr_mu_unlock(&sp->server->mu); + } + return; + } + + /* Tell the registered callback that data is available to read. */ + GPR_ASSERT(sp->read_cb); + sp->read_cb(sp->fd, sp->server->cb, sp->server->cb_arg); + + /* Re-arm the notification event so we get another chance to read. */ + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); +} + +static int add_socket_to_server(grpc_udp_server *s, int fd, + const struct sockaddr *addr, int addr_len, + grpc_udp_server_read_cb read_cb) { + server_port *sp; + int port; + char *addr_str; + char *name; + + port = prepare_socket(fd, addr, addr_len); + if (port >= 0) { + grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + gpr_asprintf(&name, "udp-server-listener:%s", addr_str); + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb && "must add ports before starting server"); + /* append it to the list under a lock */ + if (s->nports == s->port_capacity) { + s->port_capacity *= 2; + s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); + } + sp = &s->ports[s->nports++]; + sp->server = s; + sp->fd = fd; + sp->emfd = grpc_fd_create(fd, name); + memcpy(sp->addr.untyped, addr, addr_len); + sp->addr_len = addr_len; + sp->read_cb = read_cb; + GPR_ASSERT(sp->emfd); + gpr_mu_unlock(&s->mu); + } + + return port; +} + +int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, + int addr_len, grpc_udp_server_read_cb read_cb) { + int allocated_port1 = -1; + int allocated_port2 = -1; + unsigned i; + int fd; + grpc_dualstack_mode dsmode; + struct sockaddr_in6 addr6_v4mapped; + struct sockaddr_in wild4; + struct sockaddr_in6 wild6; + struct sockaddr_in addr4_copy; + struct sockaddr *allocated_addr = NULL; + struct sockaddr_storage sockname_temp; + socklen_t sockname_len; + int port; + + if (((struct sockaddr *)addr)->sa_family == AF_UNIX) { + unlink_if_unix_domain_socket(addr); + } + + /* Check if this is a wildcard port, and if so, try to keep the port the same + as some previously created listener. */ + if (grpc_sockaddr_get_port(addr) == 0) { + for (i = 0; i < s->nports; i++) { + sockname_len = sizeof(sockname_temp); + if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp, + &sockname_len)) { + port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + if (port > 0) { + allocated_addr = malloc(addr_len); + memcpy(allocated_addr, addr, addr_len); + grpc_sockaddr_set_port(allocated_addr, port); + addr = allocated_addr; + break; + } + } + } + } + + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = (const struct sockaddr *)&addr6_v4mapped; + addr_len = sizeof(addr6_v4mapped); + } + + /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ + if (grpc_sockaddr_is_wildcard(addr, &port)) { + grpc_sockaddr_make_wildcards(port, &wild4, &wild6); + + /* Try listening on IPv6 first. */ + addr = (struct sockaddr *)&wild6; + addr_len = sizeof(wild6); + fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); + allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb); + if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { + goto done; + } + + /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ + if (port == 0 && allocated_port1 > 0) { + grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1); + } + addr = (struct sockaddr *)&wild4; + addr_len = sizeof(wild4); + } + + fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); + if (fd < 0) { + gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + } + if (dsmode == GRPC_DSMODE_IPV4 && + grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { + addr = (struct sockaddr *)&addr4_copy; + addr_len = sizeof(addr4_copy); + } + allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb); + +done: + gpr_free(allocated_addr); + return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; +} + +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { + return (index < s->nports) ? s->ports[index].fd : -1; +} + +void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, + size_t pollset_count, + grpc_udp_server_cb new_transport_cb, void *cb_arg) { + size_t i, j; + GPR_ASSERT(new_transport_cb); + gpr_mu_lock(&s->mu); + GPR_ASSERT(!s->cb); + GPR_ASSERT(s->active_ports == 0); + s->cb = new_transport_cb; + s->cb_arg = cb_arg; + s->pollsets = pollsets; + for (i = 0; i < s->nports; i++) { + for (j = 0; j < pollset_count; j++) { + grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd); + } + s->ports[i].read_closure.cb = on_read; + s->ports[i].read_closure.cb_arg = &s->ports[i]; + grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure); + s->active_ports++; + } + gpr_mu_unlock(&s->mu); +} + +/* TODO(rjshade): Add a test for this method. */ +void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len, + const struct sockaddr *peer_address) { + int rc; + rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address)); + if (rc < 0) { + gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno)); + } +} + +#endif diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h new file mode 100644 index 0000000000..fcc4ba6e97 --- /dev/null +++ b/src/core/iomgr/udp_server.h @@ -0,0 +1,85 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H +#define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H + +#include "src/core/iomgr/endpoint.h" + +/* Forward decl of grpc_udp_server */ +typedef struct grpc_udp_server grpc_udp_server; + +/* New server callback: ep is the newly connected connection */ +typedef void (*grpc_udp_server_cb)(void *arg, grpc_endpoint *ep); + +/* Called when data is available to read from the socket. */ +typedef void (*grpc_udp_server_read_cb)(int fd, + grpc_udp_server_cb new_transport_cb, + void *cb_arg); + +/* Create a server, initially not bound to any ports */ +grpc_udp_server *grpc_udp_server_create(void); + +/* Start listening to bound ports */ +void grpc_udp_server_start(grpc_udp_server *server, grpc_pollset **pollsets, + size_t pollset_count, grpc_udp_server_cb cb, + void *cb_arg); + +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); + +/* Add a port to the server, returning port number on success, or negative + on failure. + + The :: and 0.0.0.0 wildcard addresses are treated identically, accepting + both IPv4 and IPv6 connections, but :: is the preferred style. This usually + creates one socket, but possibly two on systems which support IPv6, + but not dualstack sockets. */ + +/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle + all of the multiple socket port matching logic in one place */ +int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, int addr_len, + grpc_udp_server_read_cb read_cb); + +void grpc_udp_server_destroy(grpc_udp_server *server, + void (*shutdown_done)(void *shutdown_done_arg), + void *shutdown_done_arg); + +/* Write the contents of buffer to the underlying UDP socket. */ +/* +void grpc_udp_server_write(grpc_udp_server *s, + const char *buffer, + int buf_len, + const struct sockaddr* to); + */ + +#endif /* GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 7486e5e2e5..f3012d0c59 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1008,7 +1008,7 @@ static void call_on_done_recv(void *pc, int success) { next_child_call = child_call->sibling_next; if (child_call->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call); + grpc_call_cancel(child_call, NULL); GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0); } child_call = next_child_call; @@ -1309,18 +1309,22 @@ void grpc_call_destroy(grpc_call *c) { c->cancel_alarm |= c->have_alarm; cancel = c->read_state != READ_STATE_STREAM_CLOSED; unlock(c); - if (cancel) grpc_call_cancel(c); + if (cancel) grpc_call_cancel(c, NULL); GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1); } -grpc_call_error grpc_call_cancel(grpc_call *call) { - return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled"); +grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { + GPR_ASSERT(!reserved); + return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled", + NULL); } grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, - const char *description) { + const char *description, + void *reserved) { grpc_call_error r; + (void) reserved; lock(c); r = cancel_with_status(c, status, description); unlock(c); @@ -1561,13 +1565,14 @@ static int are_write_flags_valid(gpr_uint32 flags) { } grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, - size_t nops, void *tag) { + size_t nops, void *tag, void *reserved) { grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT]; size_t in; size_t out; const grpc_op *op; grpc_ioreq *req; void (*finish_func)(grpc_call *, int, void *) = finish_batch; + GPR_ASSERT(!reserved); GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index b80398858d..89fe152a0e 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -172,7 +172,8 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, gpr_uint32 propagation_mask, grpc_completion_queue *cq, const char *method, const char *host, - gpr_timespec deadline) { + gpr_timespec deadline, void *reserved) { + GPR_ASSERT(!reserved); return grpc_channel_create_call_internal( channel, parent_call, propagation_mask, cq, grpc_mdelem_from_metadata_strings( @@ -186,8 +187,9 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel, } void *grpc_channel_register_call(grpc_channel *channel, const char *method, - const char *host) { + const char *host, void *reserved) { registered_call *rc = gpr_malloc(sizeof(registered_call)); + GPR_ASSERT(!reserved); rc->path = grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), grpc_mdstr_from_string(channel->metadata_context, method, 0)); @@ -204,8 +206,9 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, grpc_call *grpc_channel_create_registered_call( grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask, grpc_completion_queue *completion_queue, void *registered_call_handle, - gpr_timespec deadline) { + gpr_timespec deadline, void *reserved) { registered_call *rc = registered_call_handle; + GPR_ASSERT(!reserved); return grpc_channel_create_call_internal( channel, parent_call, propagation_mask, completion_queue, GRPC_MDELEM_REF(rc->path), diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 707d615688..82ddfac757 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -155,7 +155,8 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = { - connect to it (trying alternatives as presented) - perform handshakes */ grpc_channel *grpc_insecure_channel_create(const char *target, - const grpc_channel_args *args) { + const grpc_channel_args *args, + void *reserved) { grpc_channel *channel = NULL; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; @@ -163,6 +164,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, subchannel_factory *f; grpc_mdctx *mdctx = grpc_mdctx_create(); int n = 0; + GPR_ASSERT(!reserved); /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 36d69cfe5f..378b3f71a1 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -69,8 +69,9 @@ struct grpc_completion_queue { plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; }; -grpc_completion_queue *grpc_completion_queue_create(void) { +grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); + GPR_ASSERT(!reserved); memset(cc, 0, sizeof(*cc)); /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->pending_events, 1); @@ -166,9 +167,11 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success, } grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, - gpr_timespec deadline) { + gpr_timespec deadline, + void *reserved) { grpc_event ret; grpc_pollset_worker worker; + GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); @@ -232,11 +235,12 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, } grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, - gpr_timespec deadline) { + gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker worker; + GPR_ASSERT(!reserved); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 442bc72f21..d9044549f2 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -33,8 +33,11 @@ #include <grpc/support/port_platform.h> +#include <memory.h> + #include <grpc/census.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/time.h> #include "src/core/channel/channel_stack.h" #include "src/core/client_config/resolver_registry.h" @@ -49,6 +52,8 @@ #include "src/core/transport/chttp2_transport.h" #include "src/core/transport/connectivity_state.h" +#define MAX_PLUGINS 128 + static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; static int g_initializations; @@ -58,7 +63,23 @@ static void do_basic_init(void) { g_initializations = 0; } +typedef struct grpc_plugin { + void (*init)(); + void (*destroy)(); +} grpc_plugin; + +static grpc_plugin g_all_of_the_plugins[MAX_PLUGINS]; +static int g_number_of_plugins = 0; + +void grpc_register_plugin(void (*init)(void), void (*destroy)(void)) { + GPR_ASSERT(g_number_of_plugins != MAX_PLUGINS); + g_all_of_the_plugins[g_number_of_plugins].init = init; + g_all_of_the_plugins[g_number_of_plugins].destroy = destroy; + g_number_of_plugins++; +} + void grpc_init(void) { + int i; gpr_once_init(&g_basic_init, do_basic_init); gpr_mu_lock(&g_init_mu); @@ -87,11 +108,17 @@ void grpc_init(void) { } } grpc_timers_global_init(); + for (i = 0; i < g_number_of_plugins; i++) { + if (g_all_of_the_plugins[i].init != NULL) { + g_all_of_the_plugins[i].init(); + } + } } gpr_mu_unlock(&g_init_mu); } void grpc_shutdown(void) { + int i; gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { grpc_iomgr_shutdown(); @@ -99,6 +126,11 @@ void grpc_shutdown(void) { grpc_timers_global_destroy(); grpc_tracer_shutdown(); grpc_resolver_registry_shutdown(); + for (i = 0; i < g_number_of_plugins; i++) { + if (g_all_of_the_plugins[i].destroy != NULL) { + g_all_of_the_plugins[i].destroy(); + } + } } gpr_mu_unlock(&g_init_mu); } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index cd1dc589e1..f883275951 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -761,8 +761,10 @@ static const grpc_channel_filter server_surface_filter = { }; void grpc_server_register_completion_queue(grpc_server *server, - grpc_completion_queue *cq) { + grpc_completion_queue *cq, + void *reserved) { size_t i, n; + GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c index 1e26c67693..9237eb5a90 100644 --- a/src/core/surface/server_create.c +++ b/src/core/surface/server_create.c @@ -36,8 +36,9 @@ #include "src/core/surface/server.h" #include "src/core/channel/compress_filter.h" -grpc_server *grpc_server_create(const grpc_channel_args *args) { +grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { const grpc_channel_filter *filters[] = {&grpc_compress_filter}; + (void) reserved; return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters), args); } diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c index a5dfec9d50..0a9669b0ab 100644 --- a/src/core/transport/stream_op.c +++ b/src/core/transport/stream_op.c @@ -258,6 +258,7 @@ static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) { GPR_ASSERT(storage->md); storage->prev = list->tail; storage->next = NULL; + storage->reserved = NULL; if (list->tail != NULL) { list->tail->next = storage; } else { diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 227320cf2a..37f18b02d9 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -77,6 +77,7 @@ typedef struct grpc_linked_mdelem { grpc_mdelem *md; struct grpc_linked_mdelem *next; struct grpc_linked_mdelem *prev; + void *reserved; } grpc_linked_mdelem; typedef struct grpc_mdelem_list { diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 1c2eecf786..0582b59a6d 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -67,7 +67,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, c_call = grpc_channel_create_registered_call( c_channel_, context->propagate_from_call_, context->propagation_options_.c_bitmask(), cq->cq(), - method.channel_tag(), context->raw_deadline()); + method.channel_tag(), context->raw_deadline(), nullptr); } else { const char* host_str = NULL; if (!context->authority().empty()) { @@ -78,7 +78,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, c_call = grpc_channel_create_call(c_channel_, context->propagate_from_call_, context->propagation_options_.c_bitmask(), cq->cq(), method.name(), host_str, - context->raw_deadline()); + context->raw_deadline(), nullptr); } grpc_census_call_set_context(c_call, context->census_context()); GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call); @@ -93,13 +93,14 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); ops->FillOps(cops, &nops); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), cops, nops, ops)); + grpc_call_start_batch(call->call(), cops, nops, ops, nullptr)); GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); } void* Channel::RegisterMethod(const char* method) { return grpc_channel_register_call(c_channel_, method, - host_.empty() ? NULL : host_.c_str()); + host_.empty() ? NULL : host_.c_str(), + nullptr); } grpc_connectivity_state Channel::GetState(bool try_to_connect) { diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index dd86e7b108..b8caa1eae4 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -77,19 +77,19 @@ void ClientContext::set_call(grpc_call* call, channel_ = channel; if (creds_ && !creds_->ApplyToCall(call_)) { grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, - "Failed to set credentials to rpc."); + "Failed to set credentials to rpc.", nullptr); } } void ClientContext::set_compression_algorithm( grpc_compression_algorithm algorithm) { - char* algorithm_name = NULL; + char* algorithm_name = nullptr; if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) { gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.", algorithm); abort(); } - GPR_ASSERT(algorithm_name != NULL); + GPR_ASSERT(algorithm_name != nullptr); AddMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name); } @@ -102,7 +102,7 @@ std::shared_ptr<const AuthContext> ClientContext::auth_context() const { void ClientContext::TryCancel() { if (call_) { - grpc_call_cancel(call_); + grpc_call_cancel(call_, nullptr); } } diff --git a/src/cpp/client/insecure_credentials.cc b/src/cpp/client/insecure_credentials.cc index d8dcaa1436..2f9357b568 100644 --- a/src/cpp/client/insecure_credentials.cc +++ b/src/cpp/client/insecure_credentials.cc @@ -49,7 +49,7 @@ class InsecureCredentialsImpl GRPC_FINAL : public Credentials { grpc_channel_args channel_args; args.SetChannelArgs(&channel_args); return std::shared_ptr<ChannelInterface>(new Channel( - grpc_insecure_channel_create(target.c_str(), &channel_args))); + grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr))); } // InsecureCredentials should not be applied to a call. diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index 593963f672..fca33f8f54 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -40,7 +40,9 @@ namespace grpc { -CompletionQueue::CompletionQueue() { cq_ = grpc_completion_queue_create(); } +CompletionQueue::CompletionQueue() { + cq_ = grpc_completion_queue_create(nullptr); +} CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} @@ -51,7 +53,7 @@ void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( void** tag, bool* ok, gpr_timespec deadline) { for (;;) { - auto ev = grpc_completion_queue_next(cq_, deadline); + auto ev = grpc_completion_queue_next(cq_, deadline, nullptr); switch (ev.type) { case GRPC_QUEUE_TIMEOUT: return TIMEOUT; @@ -70,8 +72,8 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( } bool CompletionQueue::Pluck(CompletionQueueTag* tag) { - auto ev = - grpc_completion_queue_pluck(cq_, tag, gpr_inf_future(GPR_CLOCK_REALTIME)); + auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr); bool ok = ev.success != 0; void* ignored = tag; GPR_ASSERT(tag->FinalizeResult(&ignored, &ok)); @@ -81,8 +83,8 @@ bool CompletionQueue::Pluck(CompletionQueueTag* tag) { } void CompletionQueue::TryPluck(CompletionQueueTag* tag) { - auto ev = - grpc_completion_queue_pluck(cq_, tag, gpr_time_0(GPR_CLOCK_REALTIME)); + auto deadline = gpr_time_0(GPR_CLOCK_REALTIME); + auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr); if (ev.type == GRPC_QUEUE_TIMEOUT) return; bool ok = ev.success != 0; void* ignored = tag; diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 63f4a3a0bc..94ae5ba636 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -158,14 +158,13 @@ Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) GrpcBufferWriter writer(bp); return msg.SerializeToZeroCopyStream(&writer) ? Status::OK - : Status(StatusCode::INVALID_ARGUMENT, - "Failed to serialize message"); + : Status(StatusCode::INTERNAL, "Failed to serialize message"); } Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, int max_message_size) { if (!buffer) { - return Status(StatusCode::INVALID_ARGUMENT, "No payload"); + return Status(StatusCode::INTERNAL, "No payload"); } GrpcBufferReader reader(buffer); ::grpc::protobuf::io::CodedInputStream decoder(&reader); @@ -173,11 +172,11 @@ Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, decoder.SetTotalBytesLimit(max_message_size, max_message_size); } if (!msg->ParseFromCodedStream(&decoder)) { - return Status(StatusCode::INVALID_ARGUMENT, + return Status(StatusCode::INTERNAL, msg->InitializationErrorString()); } if (!decoder.ConsumedEntireMessage()) { - return Status(StatusCode::INVALID_ARGUMENT, "Did not read entire message"); + return Status(StatusCode::INTERNAL, "Did not read entire message"); } return Status::OK; } diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index ab87b22f5f..a70b555855 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -67,11 +67,17 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || method->method_type() == RpcMethod::SERVER_STREAMING), + call_details_(nullptr), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); } - ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); } + ~SyncRequest() { + if (call_details_) { + delete call_details_; + } + grpc_metadata_array_destroy(&request_metadata_); + } static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; @@ -84,7 +90,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } - void SetupRequest() { cq_ = grpc_completion_queue_create(); } + void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { grpc_completion_queue_destroy(cq_); @@ -94,17 +100,32 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { void Request(grpc_server* server, grpc_completion_queue* notify_cq) { GPR_ASSERT(cq_ && !in_flight_); in_flight_ = true; - GPR_ASSERT(GRPC_CALL_OK == - grpc_server_request_registered_call( - server, tag_, &call_, &deadline_, &request_metadata_, - has_request_payload_ ? &request_payload_ : nullptr, cq_, - notify_cq, this)); + if (tag_) { + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_registered_call( + server, tag_, &call_, &deadline_, &request_metadata_, + has_request_payload_ ? &request_payload_ : nullptr, cq_, + notify_cq, this)); + } else { + if (!call_details_) { + call_details_ = new grpc_call_details; + grpc_call_details_init(call_details_); + } + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + server, &call_, call_details_, + &request_metadata_, cq_, notify_cq, this)); + } } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { if (!*status) { grpc_completion_queue_destroy(cq_); } + if (call_details_) { + deadline_ = call_details_->deadline; + grpc_call_details_destroy(call_details_); + grpc_call_details_init(call_details_); + } return true; } @@ -157,6 +178,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { bool in_flight_; const bool has_request_payload_; grpc_call* call_; + grpc_call_details* call_details_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; grpc_byte_buffer* request_payload_; @@ -170,9 +192,9 @@ static grpc_server* CreateServer(int max_message_size) { arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH); arg.value.integer = max_message_size; grpc_channel_args args = {1, &arg}; - return grpc_server_create(&args); + return grpc_server_create(&args, nullptr); } else { - return grpc_server_create(nullptr); + return grpc_server_create(nullptr, nullptr); } } @@ -183,10 +205,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), + has_generic_service_(false), server_(CreateServer(max_message_size)), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { - grpc_server_register_completion_queue(server_, cq_.cq()); + grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); } Server::~Server() { @@ -217,13 +240,13 @@ bool Server::RegisterService(const grpc::string *host, RpcService* service) { method->name()); return false; } - SyncRequest request(method, tag); - sync_methods_->emplace_back(request); + sync_methods_->emplace_back(method, tag); } return true; } -bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) { +bool Server::RegisterAsyncService(const grpc::string* host, + AsynchronousService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an asynchronous service against one server."); service->server_ = this; @@ -245,6 +268,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) { GPR_ASSERT(service->server_ == nullptr && "Can only register an async generic service against one server."); service->server_ = this; + has_generic_service_ = true; } int Server::AddListeningPort(const grpc::string& addr, @@ -258,6 +282,14 @@ bool Server::Start() { started_ = true; grpc_server_start(server_); + if (!has_generic_service_) { + unknown_method_.reset(new RpcServiceMethod( + "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler)); + // Use of emplace_back with just constructor arguments is not accepted here + // by gcc-4.4 because it can't match the anonymous nullptr with a proper + // constructor implicitly. Construct the object and use push_back. + sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr)); + } // Start processing rpcs. if (!sync_methods_->empty()) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { @@ -297,8 +329,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { size_t nops = 0; grpc_op cops[MAX_OPS]; ops->FillOps(cops, &nops); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), cops, nops, ops)); + auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); + GPR_ASSERT(GRPC_CALL_OK == result); } Server::BaseAsyncRequest::BaseAsyncRequest( diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index f723d4611a..09118879f4 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -38,6 +38,7 @@ #include <grpc++/impl/service_type.h> #include <grpc++/server.h> #include <grpc++/thread_pool_interface.h> +#include <grpc++/fixed_size_thread_pool.h> namespace grpc { @@ -100,10 +101,17 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { thread_pool_ = CreateDefaultThreadPool(); thread_pool_owned = true; } + // Async services only, create a thread pool to handle requests to unknown + // services. + if (!thread_pool_ && !generic_service_ && !async_services_.empty()) { + thread_pool_ = new FixedSizeThreadPool(1); + thread_pool_owned = true; + } std::unique_ptr<Server> server( new Server(thread_pool_, thread_pool_owned, max_message_size_)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { - grpc_server_register_completion_queue(server->server_, (*cq)->cq()); + grpc_server_register_completion_queue(server->server_, (*cq)->cq(), + nullptr); } for (auto service = services_.begin(); service != services_.end(); service++) { diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 04373397f9..bb34040a2f 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -91,6 +91,7 @@ void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops->data.recv_close_on_server.cancelled = &cancelled_; ops->flags = 0; + ops->reserved = NULL; *nops = 1; } diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec index 2dc10d24c2..f1f8f7c709 100644 --- a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec +++ b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec @@ -15,7 +15,7 @@ <copyright>Copyright 2015, Google Inc.</copyright> <tags>gRPC RPC Protocol HTTP/2 Auth OAuth2</tags> <dependencies> - <dependency id="Google.Apis.Auth" version="1.9.2" /> + <dependency id="Google.Apis.Auth" version="1.9.3" /> <dependency id="Grpc.Core" version="$version$" /> </dependencies> </metadata> diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 9379ae01f1..bf2bbd873b 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -339,7 +339,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_shutdown(void) { grpc_shutdown(); } GPR_EXPORT grpc_completion_queue *GPR_CALLTYPE grpcsharp_completion_queue_create(void) { - return grpc_completion_queue_create(); + return grpc_completion_queue_create(NULL); } GPR_EXPORT void GPR_CALLTYPE @@ -354,13 +354,14 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) { GPR_EXPORT grpc_event GPR_CALLTYPE grpcsharp_completion_queue_next(grpc_completion_queue *cq) { - return grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME)); + return grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), + NULL); } GPR_EXPORT grpc_event GPR_CALLTYPE grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag) { return grpc_completion_queue_pluck(cq, tag, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); } /* Channel */ @@ -368,7 +369,7 @@ grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag) { GPR_EXPORT grpc_channel *GPR_CALLTYPE grpcsharp_insecure_channel_create(const char *target, const grpc_channel_args *args) { - return grpc_insecure_channel_create(target, args); + return grpc_insecure_channel_create(target, args, NULL); } GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) { @@ -382,7 +383,7 @@ grpcsharp_channel_create_call(grpc_channel *channel, grpc_call *parent_call, const char *method, const char *host, gpr_timespec deadline) { return grpc_channel_create_call(channel, parent_call, propagation_mask, cq, - method, host, deadline); + method, host, deadline, NULL); } GPR_EXPORT grpc_connectivity_state GPR_CALLTYPE @@ -475,13 +476,13 @@ GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) { /* Call */ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) { - return grpc_call_cancel(call); + return grpc_call_cancel(call, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description) { - return grpc_call_cancel_with_status(call, status, description); + return grpc_call_cancel_with_status(call, status, description, NULL); } GPR_EXPORT char *GPR_CALLTYPE grpcsharp_call_get_peer(grpc_call *call) { @@ -538,7 +539,8 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, &(ctx->recv_status_on_client.status_details_capacity); ops[5].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -575,7 +577,8 @@ grpcsharp_call_start_client_streaming(grpc_call *call, &(ctx->recv_status_on_client.status_details_capacity); ops[3].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( @@ -615,7 +618,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( &(ctx->recv_status_on_client.status_details_capacity); ops[4].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -648,7 +652,8 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, &(ctx->recv_status_on_client.status_details_capacity); ops[2].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -668,7 +673,7 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, ops[1].data.send_initial_metadata.metadata = NULL; ops[1].flags = 0; - return grpc_call_start_batch(call, ops, nops, ctx); + return grpc_call_start_batch(call, ops, nops, ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -679,7 +684,8 @@ grpcsharp_call_send_close_from_client(grpc_call *call, ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; ops[0].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( @@ -705,7 +711,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( ops[1].data.send_initial_metadata.metadata = NULL; ops[1].flags = 0; - return grpc_call_start_batch(call, ops, nops, ctx); + return grpc_call_start_batch(call, ops, nops, ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -715,7 +721,8 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) { ops[0].op = GRPC_OP_RECV_MESSAGE; ops[0].data.recv_message = &(ctx->recv_message); ops[0].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -727,7 +734,8 @@ grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) { (&ctx->recv_close_on_server_cancelled); ops[0].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -744,7 +752,8 @@ grpcsharp_call_send_initial_metadata(grpc_call *call, ctx->send_initial_metadata.metadata; ops[0].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); } /* Server */ @@ -752,8 +761,8 @@ grpcsharp_call_send_initial_metadata(grpc_call *call, GPR_EXPORT grpc_server *GPR_CALLTYPE grpcsharp_server_create(grpc_completion_queue *cq, const grpc_channel_args *args) { - grpc_server *server = grpc_server_create(args); - grpc_server_register_completion_queue(server, cq); + grpc_server *server = grpc_server_create(args, NULL); + grpc_server_register_completion_queue(server, cq, NULL); return server; } diff --git a/src/node/examples/perf_test.js b/src/node/examples/perf_test.js index 214b9384d5..ba8fbf88d2 100644 --- a/src/node/examples/perf_test.js +++ b/src/node/examples/perf_test.js @@ -40,7 +40,7 @@ var interop_server = require('../interop/interop_server.js'); function runTest(iterations, callback) { var testServer = interop_server.getServer(0, false); - testServer.server.listen(); + testServer.server.start(); var client = new testProto.TestService('localhost:' + testServer.port, grpc.Credentials.createInsecure()); diff --git a/src/node/examples/qps_test.js b/src/node/examples/qps_test.js index 1ce4dbe070..ec968b8540 100644 --- a/src/node/examples/qps_test.js +++ b/src/node/examples/qps_test.js @@ -60,7 +60,7 @@ var interop_server = require('../interop/interop_server.js'); */ function runTest(concurrent_calls, seconds, callback) { var testServer = interop_server.getServer(0, false); - testServer.server.listen(); + testServer.server.start(); var client = new testProto.TestService('localhost:' + testServer.port, grpc.Credentials.createInsecure()); diff --git a/src/node/examples/route_guide_server.js b/src/node/examples/route_guide_server.js index bb8e79b5bd..465b32f54f 100644 --- a/src/node/examples/route_guide_server.js +++ b/src/node/examples/route_guide_server.js @@ -248,7 +248,7 @@ if (require.main === module) { throw err; } feature_list = JSON.parse(data); - routeServer.listen(); + routeServer.start(); }); } diff --git a/src/node/examples/stock_server.js b/src/node/examples/stock_server.js index dfcfe30eb4..12e5479584 100644 --- a/src/node/examples/stock_server.js +++ b/src/node/examples/stock_server.js @@ -81,7 +81,7 @@ stockServer.addProtoService(examples.Stock.service, { if (require.main === module) { stockServer.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure()); - stockServer.listen(); + stockServer.start(); } module.exports = stockServer; diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index c5c8313385..705c80ffc1 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -502,6 +502,22 @@ NAN_METHOD(Call::New) { return NanThrowTypeError( "Call's third argument must be a date or a number"); } + // These arguments are at the end because they are optional + grpc_call *parent_call = NULL; + if (Call::HasInstance(args[4])) { + Call *parent_obj = ObjectWrap::Unwrap<Call>(args[4]->ToObject()); + parent_call = parent_obj->wrapped_call; + } else if (!(args[4]->IsUndefined() || args[4]->IsNull())) { + return NanThrowTypeError( + "Call's fifth argument must be another call, if provided"); + } + gpr_uint32 propagate_flags = GRPC_PROPAGATE_DEFAULTS; + if (args[5]->IsUint32()) { + propagate_flags = args[5]->Uint32Value(); + } else if (!(args[5]->IsUndefined() || args[5]->IsNull())) { + return NanThrowTypeError( + "Call's sixth argument must be propagate flags, if provided"); + } Handle<Object> channel_object = args[0]->ToObject(); Channel *channel = ObjectWrap::Unwrap<Channel>(channel_object); if (channel->GetWrappedChannel() == NULL) { @@ -514,14 +530,14 @@ NAN_METHOD(Call::New) { if (args[3]->IsString()) { NanUtf8String host_override(args[3]); wrapped_call = grpc_channel_create_call( - wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS, + wrapped_channel, parent_call, propagate_flags, CompletionQueueAsyncWorker::GetQueue(), *method, - *host_override, MillisecondsToTimespec(deadline)); + *host_override, MillisecondsToTimespec(deadline), NULL); } else if (args[3]->IsUndefined() || args[3]->IsNull()) { wrapped_call = grpc_channel_create_call( - wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS, + wrapped_channel, parent_call, propagate_flags, CompletionQueueAsyncWorker::GetQueue(), *method, - NULL, MillisecondsToTimespec(deadline)); + NULL, MillisecondsToTimespec(deadline), NULL); } else { return NanThrowTypeError("Call's fourth argument must be a string"); } @@ -601,7 +617,7 @@ NAN_METHOD(Call::StartBatch) { NanCallback *callback = new NanCallback(callback_func); grpc_call_error error = grpc_call_start_batch( call->wrapped_call, &ops[0], nops, new struct tag( - callback, op_vector.release(), resources)); + callback, op_vector.release(), resources), NULL); if (error != GRPC_CALL_OK) { return NanThrowError("startBatch failed", error); } @@ -615,7 +631,7 @@ NAN_METHOD(Call::Cancel) { return NanThrowTypeError("cancel can only be called on Call objects"); } Call *call = ObjectWrap::Unwrap<Call>(args.This()); - grpc_call_error error = grpc_call_cancel(call->wrapped_call); + grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL); if (error != GRPC_CALL_OK) { return NanThrowError("cancel failed", error); } diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc index 457a58c057..a61c830099 100644 --- a/src/node/ext/channel.cc +++ b/src/node/ext/channel.cc @@ -33,12 +33,17 @@ #include <vector> +#include "grpc/support/log.h" + #include <node.h> #include <nan.h> #include "grpc/grpc.h" #include "grpc/grpc_security.h" +#include "call.h" #include "channel.h" +#include "completion_queue_async_worker.h" #include "credentials.h" +#include "timeval.h" namespace grpc { namespace node { @@ -51,6 +56,7 @@ using v8::Handle; using v8::HandleScope; using v8::Integer; using v8::Local; +using v8::Number; using v8::Object; using v8::Persistent; using v8::String; @@ -76,6 +82,12 @@ void Channel::Init(Handle<Object> exports) { NanNew<FunctionTemplate>(Close)->GetFunction()); NanSetPrototypeTemplate(tpl, "getTarget", NanNew<FunctionTemplate>(GetTarget)->GetFunction()); + NanSetPrototypeTemplate( + tpl, "getConnectivityState", + NanNew<FunctionTemplate>(GetConnectivityState)->GetFunction()); + NanSetPrototypeTemplate( + tpl, "watchConnectivityState", + NanNew<FunctionTemplate>(WatchConnectivityState)->GetFunction()); NanAssignPersistent(fun_tpl, tpl); Handle<Function> ctr = tpl->GetFunction(); constructor = new NanCallback(ctr); @@ -111,7 +123,7 @@ NAN_METHOD(Channel::New) { grpc_channel_args *channel_args_ptr; if (args[2]->IsUndefined()) { channel_args_ptr = NULL; - wrapped_channel = grpc_insecure_channel_create(*host, NULL); + wrapped_channel = grpc_insecure_channel_create(*host, NULL, NULL); } else if (args[2]->IsObject()) { Handle<Object> args_hash(args[2]->ToObject()->Clone()); Handle<Array> keys(args_hash->GetOwnPropertyNames()); @@ -145,7 +157,8 @@ NAN_METHOD(Channel::New) { return NanThrowTypeError("Channel expects a string and an object"); } if (creds == NULL) { - wrapped_channel = grpc_insecure_channel_create(*host, channel_args_ptr); + wrapped_channel = grpc_insecure_channel_create(*host, channel_args_ptr, + NULL); } else { wrapped_channel = grpc_secure_channel_create(creds, *host, channel_args_ptr); @@ -185,5 +198,52 @@ NAN_METHOD(Channel::GetTarget) { NanReturnValue(NanNew(grpc_channel_get_target(channel->wrapped_channel))); } +NAN_METHOD(Channel::GetConnectivityState) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "getConnectivityState can only be called on Channel objects"); + } + Channel *channel = ObjectWrap::Unwrap<Channel>(args.This()); + int try_to_connect = (int)args[0]->Equals(NanTrue()); + NanReturnValue(grpc_channel_check_connectivity_state(channel->wrapped_channel, + try_to_connect)); +} + +NAN_METHOD(Channel::WatchConnectivityState) { + NanScope(); + if (!HasInstance(args.This())) { + return NanThrowTypeError( + "watchConnectivityState can only be called on Channel objects"); + } + if (!args[0]->IsUint32()) { + return NanThrowTypeError( + "watchConnectivityState's first argument must be a channel state"); + } + if (!(args[1]->IsNumber() || args[1]->IsDate())) { + return NanThrowTypeError( + "watchConnectivityState's second argument must be a date or a number"); + } + if (!args[2]->IsFunction()) { + return NanThrowTypeError( + "watchConnectivityState's third argument must be a callback"); + } + grpc_connectivity_state last_state = + static_cast<grpc_connectivity_state>(args[0]->Uint32Value()); + double deadline = args[1]->NumberValue(); + Handle<Function> callback_func = args[2].As<Function>(); + NanCallback *callback = new NanCallback(callback_func); + Channel *channel = ObjectWrap::Unwrap<Channel>(args.This()); + unique_ptr<OpVec> ops(new OpVec()); + grpc_channel_watch_connectivity_state( + channel->wrapped_channel, last_state, MillisecondsToTimespec(deadline), + CompletionQueueAsyncWorker::GetQueue(), + new struct tag(callback, + ops.release(), + shared_ptr<Resources>(nullptr))); + CompletionQueueAsyncWorker::Next(); + NanReturnUndefined(); +} + } // namespace node } // namespace grpc diff --git a/src/node/ext/channel.h b/src/node/ext/channel.h index e2182cb45c..458f71d093 100644 --- a/src/node/ext/channel.h +++ b/src/node/ext/channel.h @@ -64,6 +64,8 @@ class Channel : public ::node::ObjectWrap { static NAN_METHOD(New); static NAN_METHOD(Close); static NAN_METHOD(GetTarget); + static NAN_METHOD(GetConnectivityState); + static NAN_METHOD(WatchConnectivityState); static NanCallback *constructor; static v8::Persistent<v8::FunctionTemplate> fun_tpl; diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc index 1215c97e19..bf2cd946a5 100644 --- a/src/node/ext/completion_queue_async_worker.cc +++ b/src/node/ext/completion_queue_async_worker.cc @@ -63,9 +63,9 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {} void CompletionQueueAsyncWorker::Execute() { result = - grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); if (!result.success) { - SetErrorMessage("The batch encountered an error"); + SetErrorMessage("The async function encountered an error"); } } @@ -85,7 +85,7 @@ void CompletionQueueAsyncWorker::Init(Handle<Object> exports) { NanScope(); current_threads = 0; waiting_next_calls = 0; - queue = grpc_completion_queue_create(); + queue = grpc_completion_queue_create(NULL); } void CompletionQueueAsyncWorker::HandleOKCallback() { diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc index 4e31cbaa27..d93dafda79 100644 --- a/src/node/ext/node_grpc.cc +++ b/src/node/ext/node_grpc.cc @@ -159,12 +159,51 @@ void InitOpTypeConstants(Handle<Object> exports) { op_type->Set(NanNew("RECV_CLOSE_ON_SERVER"), RECV_CLOSE_ON_SERVER); } +void InitPropagateConstants(Handle<Object> exports) { + NanScope(); + Handle<Object> propagate = NanNew<Object>(); + exports->Set(NanNew("propagate"), propagate); + Handle<Value> DEADLINE(NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_DEADLINE)); + propagate->Set(NanNew("DEADLINE"), DEADLINE); + Handle<Value> CENSUS_STATS_CONTEXT( + NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)); + propagate->Set(NanNew("CENSUS_STATS_CONTEXT"), CENSUS_STATS_CONTEXT); + Handle<Value> CENSUS_TRACING_CONTEXT( + NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT)); + propagate->Set(NanNew("CENSUS_TRACING_CONTEXT"), CENSUS_TRACING_CONTEXT); + Handle<Value> CANCELLATION( + NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_CANCELLATION)); + propagate->Set(NanNew("CANCELLATION"), CANCELLATION); + Handle<Value> DEFAULTS(NanNew<Uint32, uint32_t>(GRPC_PROPAGATE_DEFAULTS)); + propagate->Set(NanNew("DEFAULTS"), DEFAULTS); +} + +void InitConnectivityStateConstants(Handle<Object> exports) { + NanScope(); + Handle<Object> channel_state = NanNew<Object>(); + exports->Set(NanNew("connectivityState"), channel_state); + Handle<Value> IDLE(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_IDLE)); + channel_state->Set(NanNew("IDLE"), IDLE); + Handle<Value> CONNECTING(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_CONNECTING)); + channel_state->Set(NanNew("CONNECTING"), CONNECTING); + Handle<Value> READY(NanNew<Uint32, uint32_t>(GRPC_CHANNEL_READY)); + channel_state->Set(NanNew("READY"), READY); + Handle<Value> TRANSIENT_FAILURE( + NanNew<Uint32, uint32_t>(GRPC_CHANNEL_TRANSIENT_FAILURE)); + channel_state->Set(NanNew("TRANSIENT_FAILURE"), TRANSIENT_FAILURE); + Handle<Value> FATAL_FAILURE( + NanNew<Uint32, uint32_t>(GRPC_CHANNEL_FATAL_FAILURE)); + channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE); +} + void init(Handle<Object> exports) { NanScope(); grpc_init(); InitStatusConstants(exports); InitCallErrorConstants(exports); InitOpTypeConstants(exports); + InitPropagateConstants(exports); + InitConnectivityStateConstants(exports); grpc::node::Call::Init(exports); grpc::node::Channel::Init(exports); diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc index 1dc179db3d..8e39644846 100644 --- a/src/node/ext/server.cc +++ b/src/node/ext/server.cc @@ -113,8 +113,8 @@ class NewCallOp : public Op { }; Server::Server(grpc_server *server) : wrapped_server(server) { - shutdown_queue = grpc_completion_queue_create(); - grpc_server_register_completion_queue(server, shutdown_queue); + shutdown_queue = grpc_completion_queue_create(NULL); + grpc_server_register_completion_queue(server, shutdown_queue, NULL); } Server::~Server() { @@ -158,7 +158,7 @@ void Server::ShutdownServer() { this->shutdown_queue, NULL); grpc_completion_queue_pluck(this->shutdown_queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); this->wrapped_server = NULL; } } @@ -176,7 +176,7 @@ NAN_METHOD(Server::New) { grpc_server *wrapped_server; grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue(); if (args[0]->IsUndefined()) { - wrapped_server = grpc_server_create(NULL); + wrapped_server = grpc_server_create(NULL, NULL); } else if (args[0]->IsObject()) { Handle<Object> args_hash(args[0]->ToObject()); Handle<Array> keys(args_hash->GetOwnPropertyNames()); @@ -205,12 +205,12 @@ NAN_METHOD(Server::New) { return NanThrowTypeError("Arg values must be strings"); } } - wrapped_server = grpc_server_create(&channel_args); + wrapped_server = grpc_server_create(&channel_args, NULL); free(channel_args.args); } else { return NanThrowTypeError("Server expects an object"); } - grpc_server_register_completion_queue(wrapped_server, queue); + grpc_server_register_completion_queue(wrapped_server, queue, NULL); Server *server = new Server(wrapped_server); server->Wrap(args.This()); NanReturnValue(args.This()); diff --git a/src/node/index.js b/src/node/index.js index b26ab35f2c..93c65ac5c4 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -135,6 +135,11 @@ exports.Server = server.Server; exports.status = grpc.status; /** + * Propagate flag name to number mapping + */ +exports.propagate = grpc.propagate; + +/** * Call error name to code number mapping */ exports.callError = grpc.callError; diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index ece22cce31..1242a0f939 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -194,7 +194,7 @@ if (require.main === module) { }); var server_obj = getServer(argv.port, argv.use_tls === 'true'); console.log('Server attaching to port ' + argv.port); - server_obj.server.listen(); + server_obj.server.start(); } /** diff --git a/src/node/src/client.js b/src/node/src/client.js index 5cde438572..50cbf4a133 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -216,14 +216,19 @@ ClientDuplexStream.prototype.getPeer = getPeer; function getCall(channel, method, options) { var deadline; var host; + var parent; + var propagate_flags; if (options) { deadline = options.deadline; host = options.host; + parent = _.get(options, 'parent.call'); + propagate_flags = options.propagate_flags; } if (deadline === undefined) { deadline = Infinity; } - return new grpc.Call(channel, method, deadline, host); + return new grpc.Call(channel, method, deadline, host, + parent, propagate_flags); } /** @@ -558,6 +563,35 @@ exports.makeClientConstructor = function(methods, serviceName) { this.updateMetadata = updateMetadata; } + /** + * Wait for the client to be ready. The callback will be called when the + * client has successfully connected to the server, and it will be called + * with an error if the attempt to connect to the server has unrecoverablly + * failed or if the deadline expires. This function will make the channel + * start connecting if it has not already done so. + * @param {(Date|Number)} deadline When to stop waiting for a connection. Pass + * Infinity to wait forever. + * @param {function(Error)} callback The callback to call when done attempting + * to connect. + */ + Client.prototype.$waitForReady = function(deadline, callback) { + var self = this; + var checkState = function(err) { + if (err) { + callback(new Error('Failed to connect before the deadline')); + } + var new_state = self.channel.getConnectivityState(true); + if (new_state === grpc.connectivityState.READY) { + callback(); + } else if (new_state === grpc.connectivityState.FATAL_FAILURE) { + callback(new Error('Failed to connect to server')); + } else { + self.channel.watchConnectivityState(new_state, deadline, checkState); + } + }; + checkState(); + }; + _.each(methods, function(attrs, name) { var method_type; if (attrs.requestStream) { diff --git a/src/node/src/server.js b/src/node/src/server.js index 5c62f5990c..8b86173f08 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -432,6 +432,7 @@ function handleUnary(call, handler, metadata) { }); emitter.metadata = metadata; waitForCancel(call, emitter); + emitter.call = call; var batch = {}; batch[grpc.opType.RECV_MESSAGE] = true; call.startBatch(batch, function(err, result) { diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js index c991d7b25b..d81df2a36d 100644 --- a/src/node/test/channel_test.js +++ b/src/node/test/channel_test.js @@ -36,6 +36,26 @@ var assert = require('assert'); var grpc = require('bindings')('grpc.node'); +/** + * This is used for testing functions with multiple asynchronous calls that + * can happen in different orders. This should be passed the number of async + * function invocations that can occur last, and each of those should call this + * function's return value + * @param {function()} done The function that should be called when a test is + * complete. + * @param {number} count The number of calls to the resulting function if the + * test passes. + * @return {function()} The function that should be called at the end of each + * sequence of asynchronous functions. + */ +function multiDone(done, count) { + return function() { + count -= 1; + if (count <= 0) { + done(); + } + }; +} var insecureCreds = grpc.Credentials.createInsecure(); describe('channel', function() { @@ -86,14 +106,16 @@ describe('channel', function() { }); }); describe('close', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', insecureCreds, {}); + }); it('should succeed silently', function() { - var channel = new grpc.Channel('hostname', insecureCreds, {}); assert.doesNotThrow(function() { channel.close(); }); }); it('should be idempotent', function() { - var channel = new grpc.Channel('hostname', insecureCreds, {}); assert.doesNotThrow(function() { channel.close(); channel.close(); @@ -101,9 +123,68 @@ describe('channel', function() { }); }); describe('getTarget', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', insecureCreds, {}); + }); it('should return a string', function() { - var channel = new grpc.Channel('localhost', insecureCreds, {}); assert.strictEqual(typeof channel.getTarget(), 'string'); }); }); + describe('getConnectivityState', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('hostname', insecureCreds, {}); + }); + it('should return IDLE for a new channel', function() { + assert.strictEqual(channel.getConnectivityState(), + grpc.connectivityState.IDLE); + }); + }); + describe('watchConnectivityState', function() { + var channel; + beforeEach(function() { + channel = new grpc.Channel('localhost', insecureCreds, {}); + }); + afterEach(function() { + channel.close(); + }); + it('should time out if called alone', function(done) { + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert(err); + done(); + }); + }); + it('should complete if a connection attempt is forced', function(done) { + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.getConnectivityState(true); + }); + it('should complete twice if called twice', function(done) { + done = multiDone(done, 2); + var old_state = channel.getConnectivityState(); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.watchConnectivityState(old_state, deadline, function(err, value) { + assert.ifError(err); + assert.notEqual(value.new_state, old_state); + done(); + }); + channel.getConnectivityState(true); + }); + }); }); diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js index ecc98ec443..fa06ad4e4d 100644 --- a/src/node/test/constant_test.js +++ b/src/node/test/constant_test.js @@ -78,6 +78,31 @@ var callErrorNames = [ 'INVALID_FLAGS' ]; +/** + * List of all propagate flag names + * @const + * @type {Array.<string>} + */ +var propagateFlagNames = [ + 'DEADLINE', + 'CENSUS_STATS_CONTEXT', + 'CENSUS_TRACING_CONTEXT', + 'CANCELLATION', + 'DEFAULTS' +]; +/* + * List of all connectivity state names + * @const + * @type {Array.<string>} + */ +var connectivityStateNames = [ + 'IDLE', + 'CONNECTING', + 'READY', + 'TRANSIENT_FAILURE', + 'FATAL_FAILURE' +]; + describe('constants', function() { it('should have all of the status constants', function() { for (var i = 0; i < statusNames.length; i++) { @@ -91,4 +116,16 @@ describe('constants', function() { 'call error missing: ' + callErrorNames[i]); } }); + it('should have all of the propagate flags', function() { + for (var i = 0; i < propagateFlagNames.length; i++) { + assert(grpc.propagate.hasOwnProperty(propagateFlagNames[i]), + 'call error missing: ' + propagateFlagNames[i]); + } + }); + it('should have all of the connectivity states', function() { + for (var i = 0; i < connectivityStateNames.length; i++) { + assert(grpc.connectivityState.hasOwnProperty(connectivityStateNames[i]), + 'connectivity status missing: ' + connectivityStateNames[i]); + } + }); }); diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js index a9df43909e..20c9a07ffa 100644 --- a/src/node/test/server_test.js +++ b/src/node/test/server_test.js @@ -83,7 +83,7 @@ describe('server', function() { server = new grpc.Server(); }); }); - describe('listen', function() { + describe('start', function() { var server; before(function() { server = new grpc.Server(); @@ -92,7 +92,7 @@ describe('server', function() { after(function() { server.shutdown(); }); - it('should listen without error', function() { + it('should start without error', function() { assert.doesNotThrow(function() { server.start(); }); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index dda2f8d127..52515cc8e7 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -47,6 +47,27 @@ var mathService = math_proto.lookup('math.Math'); var _ = require('lodash'); +/** + * This is used for testing functions with multiple asynchronous calls that + * can happen in different orders. This should be passed the number of async + * function invocations that can occur last, and each of those should call this + * function's return value + * @param {function()} done The function that should be called when a test is + * complete. + * @param {number} count The number of calls to the resulting function if the + * test passes. + * @return {function()} The function that should be called at the end of each + * sequence of asynchronous functions. + */ +function multiDone(done, count) { + return function() { + count -= 1; + if (count <= 0) { + done(); + } + }; +} + var server_insecure_creds = grpc.ServerCredentials.createInsecure(); describe('File loader', function() { @@ -112,6 +133,58 @@ describe('Server.prototype.addProtoService', function() { }); }); }); +describe('Client#$waitForReady', function() { + var server; + var port; + var Client; + var client; + before(function() { + server = new grpc.Server(); + port = server.bind('localhost:0', grpc.ServerCredentials.createInsecure()); + server.start(); + Client = surface_client.makeProtobufClientConstructor(mathService); + }); + beforeEach(function() { + client = new Client('localhost:' + port, grpc.Credentials.createInsecure()); + }); + after(function() { + server.shutdown(); + }); + it('should complete when called alone', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + }); + it('should complete when a call is initiated', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + var call = client.div({}, function(err, response) {}); + call.cancel(); + }); + it('should complete if called more than once', function(done) { + done = multiDone(done, 2); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + }); + it('should complete if called when already ready', function(done) { + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + client.$waitForReady(Infinity, function(error) { + assert.ifError(error); + done(); + }); + }); + }); +}); describe('Echo service', function() { var server; var client; @@ -272,12 +345,14 @@ describe('Echo metadata', function() { }); }); describe('Other conditions', function() { + var test_service; + var Client; var client; var server; var port; before(function() { var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); - var test_service = test_proto.lookup('TestService'); + test_service = test_proto.lookup('TestService'); server = new grpc.Server(); server.addProtoService(test_service, { unary: function(call, cb) { @@ -339,7 +414,7 @@ describe('Other conditions', function() { } }); port = server.bind('localhost:0', server_insecure_creds); - var Client = surface_client.makeProtobufClientConstructor(test_service); + Client = surface_client.makeProtobufClientConstructor(test_service); client = new Client('localhost:' + port, grpc.Credentials.createInsecure()); server.start(); }); @@ -592,6 +667,166 @@ describe('Other conditions', function() { }); }); }); + describe('Call propagation', function() { + var proxy; + var proxy_impl; + beforeEach(function() { + proxy = new grpc.Server(); + proxy_impl = { + unary: function(call) {}, + clientStream: function(stream) {}, + serverStream: function(stream) {}, + bidiStream: function(stream) {} + }; + }); + afterEach(function() { + console.log('Shutting down server'); + proxy.shutdown(); + }); + describe('Cancellation', function() { + it('With a unary call', function(done) { + done = multiDone(done, 2); + proxy_impl.unary = function(parent, callback) { + client.unary(parent.request, function(err, value) { + try { + assert(err); + assert.strictEqual(err.code, grpc.status.CANCELLED); + } finally { + callback(err, value); + done(); + } + }, null, {parent: parent}); + call.cancel(); + }; + proxy.addProtoService(test_service, proxy_impl); + var proxy_port = proxy.bind('localhost:0', server_insecure_creds); + proxy.start(); + var proxy_client = new Client('localhost:' + proxy_port, + grpc.Credentials.createInsecure()); + var call = proxy_client.unary({}, function(err, value) { + done(); + }); + }); + it('With a client stream call', function(done) { + done = multiDone(done, 2); + proxy_impl.clientStream = function(parent, callback) { + client.clientStream(function(err, value) { + try { + assert(err); + assert.strictEqual(err.code, grpc.status.CANCELLED); + } finally { + callback(err, value); + done(); + } + }, null, {parent: parent}); + call.cancel(); + }; + proxy.addProtoService(test_service, proxy_impl); + var proxy_port = proxy.bind('localhost:0', server_insecure_creds); + proxy.start(); + var proxy_client = new Client('localhost:' + proxy_port, + grpc.Credentials.createInsecure()); + var call = proxy_client.clientStream(function(err, value) { + done(); + }); + }); + it('With a server stream call', function(done) { + done = multiDone(done, 2); + proxy_impl.serverStream = function(parent) { + var child = client.serverStream(parent.request, null, + {parent: parent}); + child.on('error', function(err) { + assert(err); + assert.strictEqual(err.code, grpc.status.CANCELLED); + done(); + }); + call.cancel(); + }; + proxy.addProtoService(test_service, proxy_impl); + var proxy_port = proxy.bind('localhost:0', server_insecure_creds); + proxy.start(); + var proxy_client = new Client('localhost:' + proxy_port, + grpc.Credentials.createInsecure()); + var call = proxy_client.serverStream({}); + call.on('error', function(err) { + done(); + }); + }); + it('With a bidi stream call', function(done) { + done = multiDone(done, 2); + proxy_impl.bidiStream = function(parent) { + var child = client.bidiStream(null, {parent: parent}); + child.on('error', function(err) { + assert(err); + assert.strictEqual(err.code, grpc.status.CANCELLED); + done(); + }); + call.cancel(); + }; + proxy.addProtoService(test_service, proxy_impl); + var proxy_port = proxy.bind('localhost:0', server_insecure_creds); + proxy.start(); + var proxy_client = new Client('localhost:' + proxy_port, + grpc.Credentials.createInsecure()); + var call = proxy_client.bidiStream(); + call.on('error', function(err) { + done(); + }); + }); + }); + describe('Deadline', function() { + /* jshint bitwise:false */ + var deadline_flags = (grpc.propagate.DEFAULTS & + ~grpc.propagate.CANCELLATION); + it('With a client stream call', function(done) { + done = multiDone(done, 2); + proxy_impl.clientStream = function(parent, callback) { + client.clientStream(function(err, value) { + try { + assert(err); + assert.strictEqual(err.code, grpc.status.DEADLINE_EXCEEDED); + } finally { + callback(err, value); + done(); + } + }, null, {parent: parent, propagate_flags: deadline_flags}); + }; + proxy.addProtoService(test_service, proxy_impl); + var proxy_port = proxy.bind('localhost:0', server_insecure_creds); + proxy.start(); + var proxy_client = new Client('localhost:' + proxy_port, + grpc.Credentials.createInsecure()); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + proxy_client.clientStream(function(err, value) { + done(); + }, null, {deadline: deadline}); + }); + it('With a bidi stream call', function(done) { + done = multiDone(done, 2); + proxy_impl.bidiStream = function(parent) { + var child = client.bidiStream( + null, {parent: parent, propagate_flags: deadline_flags}); + child.on('error', function(err) { + assert(err); + assert.strictEqual(err.code, grpc.status.DEADLINE_EXCEEDED); + done(); + }); + }; + proxy.addProtoService(test_service, proxy_impl); + var proxy_port = proxy.bind('localhost:0', server_insecure_creds); + proxy.start(); + var proxy_client = new Client('localhost:' + proxy_port, + grpc.Credentials.createInsecure()); + var deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 1); + var call = proxy_client.bidiStream(null, {deadline: deadline}); + call.on('error', function(err) { + done(); + }); + }); + }); + }); }); describe('Cancelling surface client', function() { var client; diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m index 696069c200..ea2b01ee1d 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m @@ -43,7 +43,7 @@ - (instancetype)init { if ((self = [super init])) { - _unmanagedQueue = grpc_completion_queue_create(); + _unmanagedQueue = grpc_completion_queue_create(NULL); // This is for the following block to capture the pointer by value (instead // of retaining self and doing self->_unmanagedQueue). This is essential @@ -64,7 +64,8 @@ while (YES) { // The following call blocks until an event is available. grpc_event event = grpc_completion_queue_next(unmanagedQueue, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_REALTIME), + NULL); GRPCQueueCompletionHandler handler; switch (event.type) { case GRPC_OP_COMPLETE: diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m index d902f95b51..a7142d0f00 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.m +++ b/src/objective-c/GRPCClient/private/GRPCHost.m @@ -97,7 +97,7 @@ queue.unmanagedQueue, path.UTF8String, self.hostName.UTF8String, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); } - (GRPCChannel *)channel { diff --git a/src/objective-c/GRPCClient/private/GRPCUnsecuredChannel.m b/src/objective-c/GRPCClient/private/GRPCUnsecuredChannel.m index 070a529629..15b6ffc75c 100644 --- a/src/objective-c/GRPCClient/private/GRPCUnsecuredChannel.m +++ b/src/objective-c/GRPCClient/private/GRPCUnsecuredChannel.m @@ -38,7 +38,7 @@ @implementation GRPCUnsecuredChannel - (instancetype)initWithHost:(NSString *)host { - return (self = [super initWithChannel:grpc_insecure_channel_create(host.UTF8String, NULL)]); + return (self = [super initWithChannel:grpc_insecure_channel_create(host.UTF8String, NULL, NULL)]); } // TODO(jcanizales): GRPCSecureChannel and GRPCUnsecuredChannel are just convenience initializers diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index 951c051036..fe3d51da53 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -282,7 +282,7 @@ for (GRPCOperation *operation in operations) { [operation finish]; } - })); + }), NULL); gpr_free(ops_array); if (error != GRPC_CALL_OK) { @@ -293,7 +293,7 @@ } - (void)cancel { - grpc_call_cancel(_call); + grpc_call_cancel(_call, NULL); } - (void)dealloc { diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 01ec909b79..1cf766c312 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -241,7 +241,7 @@ PHP_METHOD(Call, __construct) { deadline_obj TSRMLS_CC); call->wrapped = grpc_channel_create_call( channel->wrapped, NULL, GRPC_PROPAGATE_DEFAULTS, completion_queue, method, - channel->target, deadline->wrapped); + channel->target, deadline->wrapped, NULL); } /** @@ -400,7 +400,8 @@ PHP_METHOD(Call, startBatch) { ops[op_num].flags = 0; op_num++; } - error = grpc_call_start_batch(call->wrapped, ops, op_num, call->wrapped); + error = grpc_call_start_batch(call->wrapped, ops, op_num, call->wrapped, + NULL); if (error != GRPC_CALL_OK) { zend_throw_exception(spl_ce_LogicException, "start_batch was called incorrectly", @@ -408,7 +409,7 @@ PHP_METHOD(Call, startBatch) { goto cleanup; } event = grpc_completion_queue_pluck(completion_queue, call->wrapped, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); if (!event.success) { zend_throw_exception(spl_ce_LogicException, "The batch failed for some reason", @@ -489,7 +490,7 @@ PHP_METHOD(Call, getPeer) { PHP_METHOD(Call, cancel) { wrapped_grpc_call *call = (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC); - grpc_call_cancel(call->wrapped); + grpc_call_cancel(call->wrapped, NULL); } static zend_function_entry call_methods[] = { diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c index 447cfc15be..f8ce04d902 100644 --- a/src/php/ext/grpc/channel.c +++ b/src/php/ext/grpc/channel.c @@ -154,7 +154,7 @@ PHP_METHOD(Channel, __construct) { override = target; override_len = target_length; if (args_array == NULL) { - channel->wrapped = grpc_insecure_channel_create(target, NULL); + channel->wrapped = grpc_insecure_channel_create(target, NULL, NULL); } else { array_hash = Z_ARRVAL_P(args_array); if (zend_hash_find(array_hash, "credentials", sizeof("credentials"), @@ -184,7 +184,7 @@ PHP_METHOD(Channel, __construct) { } php_grpc_read_args_array(args_array, &args); if (creds == NULL) { - channel->wrapped = grpc_insecure_channel_create(target, &args); + channel->wrapped = grpc_insecure_channel_create(target, &args, NULL); } else { gpr_log(GPR_DEBUG, "Initialized secure channel"); channel->wrapped = @@ -255,7 +255,7 @@ PHP_METHOD(Channel, watchConnectivityState) { deadline->wrapped, completion_queue, NULL); grpc_event event = grpc_completion_queue_pluck( completion_queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); RETURN_BOOL(event.success); } diff --git a/src/php/ext/grpc/completion_queue.c b/src/php/ext/grpc/completion_queue.c index c653a592ef..741204b0b1 100644 --- a/src/php/ext/grpc/completion_queue.c +++ b/src/php/ext/grpc/completion_queue.c @@ -38,14 +38,13 @@ grpc_completion_queue *completion_queue; void grpc_php_init_completion_queue(TSRMLS_D) { - completion_queue = grpc_completion_queue_create(); + completion_queue = grpc_completion_queue_create(NULL); } void grpc_php_shutdown_completion_queue(TSRMLS_D) { grpc_completion_queue_shutdown(completion_queue); while (grpc_completion_queue_next(completion_queue, - gpr_inf_future(GPR_CLOCK_REALTIME)) - .type != GRPC_QUEUE_SHUTDOWN) - ; + gpr_inf_future(GPR_CLOCK_REALTIME), + NULL).type != GRPC_QUEUE_SHUTDOWN); grpc_completion_queue_destroy(completion_queue); } diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c index d58aa884ca..ca129e76ca 100644 --- a/src/php/ext/grpc/server.c +++ b/src/php/ext/grpc/server.c @@ -66,7 +66,7 @@ void free_wrapped_grpc_server(void *object TSRMLS_DC) { grpc_server_shutdown_and_notify(server->wrapped, completion_queue, NULL); grpc_server_cancel_all_calls(server->wrapped); grpc_completion_queue_pluck(completion_queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); grpc_server_destroy(server->wrapped); } efree(server); @@ -109,13 +109,14 @@ PHP_METHOD(Server, __construct) { return; } if (args_array == NULL) { - server->wrapped = grpc_server_create(NULL); + server->wrapped = grpc_server_create(NULL, NULL); } else { php_grpc_read_args_array(args_array, &args); - server->wrapped = grpc_server_create(&args); + server->wrapped = grpc_server_create(&args, NULL); efree(args.args); } - grpc_server_register_completion_queue(server->wrapped, completion_queue); + grpc_server_register_completion_queue(server->wrapped, completion_queue, + NULL); } /** @@ -146,7 +147,7 @@ PHP_METHOD(Server, requestCall) { goto cleanup; } event = grpc_completion_queue_pluck(completion_queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); if (!event.success) { zend_throw_exception(spl_ce_LogicException, "Failed to request a call for some reason", diff --git a/src/python/grpcio/grpc/_adapter/_c/types/call.c b/src/python/grpcio/grpc/_adapter/_c/types/call.c index 5e46605c45..42a50151f6 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/call.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/call.c @@ -132,7 +132,7 @@ PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs) } } tag = pygrpc_produce_batch_tag(user_tag, self, ops, nops); - errcode = grpc_call_start_batch(self->c_call, tag->ops, tag->nops, tag); + errcode = grpc_call_start_batch(self->c_call, tag->ops, tag->nops, tag, NULL); gpr_free(ops); return PyInt_FromLong(errcode); } @@ -152,13 +152,13 @@ PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs) { return NULL; } code = PyInt_AsLong(py_code); - errcode = grpc_call_cancel_with_status(self->c_call, code, details); + errcode = grpc_call_cancel_with_status(self->c_call, code, details, NULL); } else if (py_code != NULL || details != NULL) { PyErr_SetString(PyExc_ValueError, "if `code` is specified, so must `details`"); return NULL; } else { - errcode = grpc_call_cancel(self->c_call); + errcode = grpc_call_cancel(self->c_call, NULL); } return PyInt_FromLong(errcode); } diff --git a/src/python/grpcio/grpc/_adapter/_c/types/channel.c b/src/python/grpcio/grpc/_adapter/_c/types/channel.c index eb9d43d154..c577ac05eb 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/channel.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/channel.c @@ -108,7 +108,7 @@ Channel *pygrpc_Channel_new( if (creds) { self->c_chan = grpc_secure_channel_create(creds->c_creds, target, &c_args); } else { - self->c_chan = grpc_insecure_channel_create(target, &c_args); + self->c_chan = grpc_insecure_channel_create(target, &c_args, NULL); } pygrpc_discard_channel_args(c_args); return self; @@ -133,7 +133,7 @@ Call *pygrpc_Channel_create_call( call = pygrpc_Call_new_empty(cq); call->c_call = grpc_channel_create_call( self->c_chan, NULL, GRPC_PROPAGATE_DEFAULTS, cq->c_cq, method, host, - pygrpc_cast_double_to_gpr_timespec(deadline)); + pygrpc_cast_double_to_gpr_timespec(deadline), NULL); return call; } diff --git a/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c b/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c index 2dd44b6ddd..d8bb89ca4b 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c @@ -90,7 +90,7 @@ PyTypeObject pygrpc_CompletionQueue_type = { CompletionQueue *pygrpc_CompletionQueue_new( PyTypeObject *type, PyObject *args, PyObject *kwargs) { CompletionQueue *self = (CompletionQueue *)type->tp_alloc(type, 0); - self->c_cq = grpc_completion_queue_create(); + self->c_cq = grpc_completion_queue_create(NULL); return self; } @@ -111,7 +111,7 @@ PyObject *pygrpc_CompletionQueue_next( } Py_BEGIN_ALLOW_THREADS; event = grpc_completion_queue_next( - self->c_cq, pygrpc_cast_double_to_gpr_timespec(deadline)); + self->c_cq, pygrpc_cast_double_to_gpr_timespec(deadline), NULL); Py_END_ALLOW_THREADS; transliterated_event = pygrpc_consume_event(event); return transliterated_event; diff --git a/src/python/grpcio/grpc/_adapter/_c/types/server.c b/src/python/grpcio/grpc/_adapter/_c/types/server.c index 2a11d09d21..15c98f28eb 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/server.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/server.c @@ -104,8 +104,8 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) return NULL; } self = (Server *)type->tp_alloc(type, 0); - self->c_serv = grpc_server_create(&c_args); - grpc_server_register_completion_queue(self->c_serv, cq->c_cq); + self->c_serv = grpc_server_create(&c_args, NULL); + grpc_server_register_completion_queue(self->c_serv, cq->c_cq, NULL); pygrpc_discard_channel_args(c_args); self->cq = cq; Py_INCREF(self->cq); diff --git a/src/ruby/bin/interop/test/cpp/interop/test.rb b/src/python/grpcio/grpc/framework/interfaces/base/__init__.py index 5948b50eaa..7086519106 100644 --- a/src/ruby/bin/interop/test/cpp/interop/test.rb +++ b/src/python/grpcio/grpc/framework/interfaces/base/__init__.py @@ -27,17 +27,4 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: test/cpp/interop/test.proto -require 'google/protobuf' - -require 'test/cpp/interop/empty' -require 'test/cpp/interop/messages' -Google::Protobuf::DescriptorPool.generated_pool.build do -end - -module Grpc - module Testing - end -end diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py new file mode 100644 index 0000000000..9d1651daac --- /dev/null +++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py @@ -0,0 +1,273 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""The base interface of RPC Framework.""" + +import abc +import enum + +# abandonment is referenced from specification in this module. +from grpc.framework.foundation import abandonment # pylint: disable=unused-import + + +class NoSuchMethodError(Exception): + """Indicates that an unrecognized operation has been called.""" + + +@enum.unique +class Outcome(enum.Enum): + """Operation outcomes.""" + + COMPLETED = 'completed' + CANCELLED = 'cancelled' + EXPIRED = 'expired' + LOCAL_SHUTDOWN = 'local shutdown' + REMOTE_SHUTDOWN = 'remote shutdown' + RECEPTION_FAILURE = 'reception failure' + TRANSMISSION_FAILURE = 'transmission failure' + LOCAL_FAILURE = 'local failure' + REMOTE_FAILURE = 'remote failure' + + +class Completion(object): + """An aggregate of the values exchanged upon operation completion. + + Attributes: + terminal_metadata: A terminal metadata value for the operaton. + code: A code value for the operation. + message: A message value for the operation. + """ + __metaclass__ = abc.ABCMeta + + +class OperationContext(object): + """Provides operation-related information and action.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def outcome(self): + """Indicates the operation's outcome (or that the operation is ongoing). + + Returns: + None if the operation is still active or the Outcome value for the + operation if it has terminated. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_termination_callback(self, callback): + """Adds a function to be called upon operation termination. + + Args: + callback: A callable to be passed an Outcome value on operation + termination. + + Returns: + None if the operation has not yet terminated and the passed callback will + later be called when it does terminate, or if the operation has already + terminated an Outcome value describing the operation termination and the + passed callback will not be called as a result of this method call. + """ + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + """Describes the length of allowed time remaining for the operation. + + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the operation to complete before it is considered to have + timed out. Zero is returned if the operation has terminated. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Cancels the operation if the operation has not yet terminated.""" + raise NotImplementedError() + + @abc.abstractmethod + def fail(self, exception): + """Indicates that the operation has failed. + + Args: + exception: An exception germane to the operation failure. May be None. + """ + raise NotImplementedError() + + +class Operator(object): + """An interface through which to participate in an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def advance( + self, initial_metadata=None, payload=None, completion=None, + allowance=None): + """Progresses the operation. + + Args: + initial_metadata: An initial metadata value. Only one may ever be + communicated in each direction for an operation, and they must be + communicated no later than either the first payload or the completion. + payload: A payload value. + completion: A Completion value. May only ever be non-None once in either + direction, and no payloads may be passed after it has been communicated. + allowance: A positive integer communicating the number of additional + payloads allowed to be passed by the remote side of the operation. + """ + raise NotImplementedError() + + +class Subscription(object): + """Describes customer code's interest in values from the other side. + + Attributes: + kind: A Kind value describing the overall kind of this value. + termination_callback: A callable to be passed the Outcome associated with + the operation after it has terminated. Must be non-None if kind is + Kind.TERMINATION_ONLY. Must be None otherwise. + allowance: A callable behavior that accepts positive integers representing + the number of additional payloads allowed to be passed to the other side + of the operation. Must be None if kind is Kind.FULL. Must not be None + otherwise. + operator: An Operator to be passed values from the other side of the + operation. Must be non-None if kind is Kind.FULL. Must be None otherwise. + """ + + @enum.unique + class Kind(enum.Enum): + + NONE = 'none' + TERMINATION_ONLY = 'termination only' + FULL = 'full' + + +class Servicer(object): + """Interface for service implementations.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, group, method, context, output_operator): + """Services an operation. + + Args: + group: The group identifier of the operation to be serviced. + method: The method identifier of the operation to be serviced. + context: An OperationContext object affording contextual information and + actions. + output_operator: An Operator that will accept output values of the + operation. + + Returns: + A Subscription via which this object may or may not accept more values of + the operation. + + Raises: + NoSuchMethodError: If this Servicer does not handle operations with the + given group and method. + abandonment.Abandoned: If the operation has been aborted and there no + longer is any reason to service the operation. + """ + raise NotImplementedError() + + +class End(object): + """Common type for entry-point objects on both sides of an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def start(self): + """Starts this object's service of operations.""" + raise NotImplementedError() + + @abc.abstractmethod + def stop_gracefully(self): + """Gracefully stops this object's service of operations. + + Operations in progress will be allowed to complete, and this method blocks + until all of them have. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stop_immediately(self): + """Immediately stops this object's service of operations. + + Operations in progress will not be allowed to complete. + """ + raise NotImplementedError() + + @abc.abstractmethod + def operate( + self, group, method, subscription, timeout, initial_metadata=None, + payload=None, completion=None): + """Commences an operation. + + Args: + group: The group identifier of the invoked operation. + method: The method identifier of the invoked operation. + subscription: A Subscription to which the results of the operation will be + passed. + timeout: A length of time in seconds to allow for the operation. + initial_metadata: An initial metadata value to be sent to the other side + of the operation. May be None if the initial metadata will be later + passed via the returned operator or if there will be no initial metadata + passed at all. + payload: An initial payload for the operation. + completion: A Completion value indicating the end of transmission to the + other side of the operation. + + Returns: + A pair of objects affording information about the operation and action + continuing the operation. The first element of the returned pair is an + OperationContext for the operation and the second element of the + returned pair is an Operator to which operation values not passed in + this call should later be passed. + """ + raise NotImplementedError() + + @abc.abstractmethod + def operation_stats(self): + """Reports the number of terminated operations broken down by outcome. + + Returns: + A dictionary from Outcome value to an integer identifying the number + of operations that terminated with that outcome. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_idle_action(self, action): + """Adds an action to be called when this End has no ongoing operations. + + Args: + action: A callable that accepts no arguments. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/interfaces/base/utilities.py b/src/python/grpcio/grpc/framework/interfaces/base/utilities.py new file mode 100644 index 0000000000..a9ee1a0981 --- /dev/null +++ b/src/python/grpcio/grpc/framework/interfaces/base/utilities.py @@ -0,0 +1,79 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for use with the base interface of RPC Framework.""" + +import collections + +from grpc.framework.interfaces.base import base + + +class _Completion( + base.Completion, + collections.namedtuple( + '_Completion', ('terminal_metadata', 'code', 'message',))): + """A trivial implementation of base.Completion.""" + + +class _Subscription( + base.Subscription, + collections.namedtuple( + '_Subscription', + ('kind', 'termination_callback', 'allowance', 'operator',))): + """A trivial implementation of base.Subscription.""" + +_NONE_SUBSCRIPTION = _Subscription( + base.Subscription.Kind.NONE, None, None, None) + + +def completion(terminal_metadata, code, message): + """Creates a base.Completion aggregating the given operation values. + + Args: + terminal_metadata: A terminal metadata value for an operaton. + code: A code value for an operation. + message: A message value for an operation. + + Returns: + A base.Completion aggregating the given operation values. + """ + return _Completion(terminal_metadata, code, message) + + +def full_subscription(operator): + """Creates a "full" base.Subscription for the given base.Operator. + + Args: + operator: A base.Operator to be used in an operation. + + Returns: + A base.Subscription of kind base.Subscription.Kind.FULL wrapping the given + base.Operator. + """ + return _Subscription(base.Subscription.Kind.FULL, None, None, operator) diff --git a/src/ruby/.rspec b/src/ruby/.rspec index cd7c5fb5b2..2320752db4 100755 --- a/src/ruby/.rspec +++ b/src/ruby/.rspec @@ -1,4 +1,5 @@ -I. +-Ipb --require spec_helper --format documentation --color diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml index 47e382afa7..312bdca384 100644 --- a/src/ruby/.rubocop.yml +++ b/src/ruby/.rubocop.yml @@ -5,6 +5,7 @@ inherit_from: .rubocop_todo.yml AllCops: Exclude: - 'bin/apis/**/*' - - 'bin/interop/test/**/*' - 'bin/math.rb' - 'bin/math_services.rb' + - 'pb/grpc/health/v1alpha/*' + - 'pb/test/**/*' diff --git a/src/ruby/Rakefile b/src/ruby/Rakefile index 02af9a84b8..cc7832b12d 100755 --- a/src/ruby/Rakefile +++ b/src/ruby/Rakefile @@ -20,7 +20,8 @@ SPEC_SUITES = [ { id: :bidi, title: 'bidi tests', dir: %w(spec/generic), tag: 'bidi' }, { id: :server, title: 'rpc server thread tests', dir: %w(spec/generic), - tag: 'server' } + tag: 'server' }, + { id: :pb, title: 'protobuf service tests', dir: %w(spec/pb) } ] namespace :suite do SPEC_SUITES.each do |suite| @@ -50,7 +51,8 @@ task 'suite:wrapper' => [:compile, :rubocop] task 'suite:idiomatic' => 'suite:wrapper' task 'suite:bidi' => 'suite:wrapper' task 'suite:server' => 'suite:wrapper' +task 'suite:pb' => 'suite:server' desc 'Compiles the gRPC extension then runs all the tests' -task all: ['suite:idiomatic', 'suite:bidi', 'suite:server'] +task all: ['suite:idiomatic', 'suite:bidi', 'suite:pb', 'suite:server'] task default: :all diff --git a/src/ruby/bin/interop/test/cpp/interop/empty.rb b/src/ruby/bin/grpc_ruby_interop_client index 3579fa5ded..e79fd33aa5 100644..100755 --- a/src/ruby/bin/interop/test/cpp/interop/empty.rb +++ b/src/ruby/bin/grpc_ruby_interop_client @@ -1,3 +1,5 @@ +#!/usr/bin/env ruby + # Copyright 2015, Google Inc. # All rights reserved. # @@ -27,18 +29,5 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: test/cpp/interop/empty.proto - -require 'google/protobuf' - -Google::Protobuf::DescriptorPool.generated_pool.build do - add_message "grpc.testing.Empty" do - end -end - -module Grpc - module Testing - Empty = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Empty").msgclass - end -end +# Provides a gem binary entry point for the interop client. +require 'test/client' diff --git a/src/ruby/bin/interop/test/cpp/interop/test_services.rb b/src/ruby/bin/grpc_ruby_interop_server index 5a3146c581..656a5f7c99 100644..100755 --- a/src/ruby/bin/interop/test/cpp/interop/test_services.rb +++ b/src/ruby/bin/grpc_ruby_interop_server @@ -1,3 +1,5 @@ +#!/usr/bin/env ruby + # Copyright 2015, Google Inc. # All rights reserved. # @@ -27,34 +29,5 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# Generated by the protocol buffer compiler. DO NOT EDIT! -# Source: test/cpp/interop/test.proto for package 'grpc.testing' - -require 'grpc' -require 'test/cpp/interop/test' - -module Grpc - module Testing - module TestService - - # TODO: add proto service documentation here - class Service - - include GRPC::GenericService - - self.marshal_class_method = :encode - self.unmarshal_class_method = :decode - self.service_name = 'grpc.testing.TestService' - - rpc :EmptyCall, Empty, Empty - rpc :UnaryCall, SimpleRequest, SimpleResponse - rpc :StreamingOutputCall, StreamingOutputCallRequest, stream(StreamingOutputCallResponse) - rpc :StreamingInputCall, stream(StreamingInputCallRequest), StreamingInputCallResponse - rpc :FullDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) - rpc :HalfDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) - end - - Stub = Service.rpc_stub_class - end - end -end +# Provides a gem binary entry point for the interop server +require 'test/server' diff --git a/src/ruby/bin/interop/README.md b/src/ruby/bin/interop/README.md deleted file mode 100644 index 84fc663620..0000000000 --- a/src/ruby/bin/interop/README.md +++ /dev/null @@ -1,8 +0,0 @@ -Interop test protos -=================== - -These ruby classes were generated with protoc v3, using grpc's ruby compiler -plugin. - -- As of 2015/01 protoc v3 is available in the -[google-protobuf](https://github.com/google/protobuf) repo diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index da4caa842b..239083f37f 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -29,6 +29,12 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# ####################################################################### +# DEPRECATED: The behaviour in this file has been moved to pb/test/client.rb +# +# This file remains to support existing tools and scripts that use it. +# ###################################################################### +# # interop_client is a testing tool that accesses a gRPC interop testing # server and runs a test on it. # @@ -39,339 +45,7 @@ # --test_case=<testcase_name> this_dir = File.expand_path(File.dirname(__FILE__)) -lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') -$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) -$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) - -require 'optparse' -require 'minitest' -require 'minitest/assertions' - -require 'grpc' -require 'googleauth' -require 'google/protobuf' - -require 'test/cpp/interop/test_services' -require 'test/cpp/interop/messages' -require 'test/cpp/interop/empty' - -require 'signet/ssl_config' - -AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR - -# loads the certificates used to access the test server securely. -def load_test_certs - this_dir = File.expand_path(File.dirname(__FILE__)) - data_dir = File.join(File.dirname(File.dirname(this_dir)), 'spec/testdata') - files = ['ca.pem', 'server1.key', 'server1.pem'] - files.map { |f| File.open(File.join(data_dir, f)).read } -end - -# loads the certificates used to access the test server securely. -def load_prod_cert - fail 'could not find a production cert' if ENV['SSL_CERT_FILE'].nil? - GRPC.logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}") - File.open(ENV['SSL_CERT_FILE']).read -end - -# creates SSL Credentials from the test certificates. -def test_creds - certs = load_test_certs - GRPC::Core::Credentials.new(certs[0]) -end - -# creates SSL Credentials from the production certificates. -def prod_creds - cert_text = load_prod_cert - GRPC::Core::Credentials.new(cert_text) -end - -# creates the SSL Credentials. -def ssl_creds(use_test_ca) - return test_creds if use_test_ca - prod_creds -end - -# creates a test stub that accesses host:port securely. -def create_stub(opts) - address = "#{opts.host}:#{opts.port}" - if opts.secure - stub_opts = { - :creds => ssl_creds(opts.use_test_ca), - GRPC::Core::Channel::SSL_TARGET => opts.host_override - } - - # Add service account creds if specified - wants_creds = %w(all compute_engine_creds service_account_creds) - if wants_creds.include?(opts.test_case) - unless opts.oauth_scope.nil? - auth_creds = Google::Auth.get_application_default(opts.oauth_scope) - stub_opts[:update_metadata] = auth_creds.updater_proc - end - end - - if opts.test_case == 'jwt_token_creds' # don't use a scope - auth_creds = Google::Auth.get_application_default - stub_opts[:update_metadata] = auth_creds.updater_proc - end - - GRPC.logger.info("... connecting securely to #{address}") - Grpc::Testing::TestService::Stub.new(address, **stub_opts) - else - GRPC.logger.info("... connecting insecurely to #{address}") - Grpc::Testing::TestService::Stub.new(address) - end -end - -# produces a string of null chars (\0) of length l. -def nulls(l) - fail 'requires #{l} to be +ve' if l < 0 - [].pack('x' * l).force_encoding('utf-8') -end - -# a PingPongPlayer implements the ping pong bidi test. -class PingPongPlayer - include Minitest::Assertions - include Grpc::Testing - include Grpc::Testing::PayloadType - attr_accessor :assertions # required by Minitest::Assertions - attr_accessor :queue - attr_accessor :canceller_op - - # reqs is the enumerator over the requests - def initialize(msg_sizes) - @queue = Queue.new - @msg_sizes = msg_sizes - @assertions = 0 # required by Minitest::Assertions - @canceller_op = nil # used to cancel after the first response - end - - def each_item - return enum_for(:each_item) unless block_given? - req_cls, p_cls = StreamingOutputCallRequest, ResponseParameters # short - count = 0 - @msg_sizes.each do |m| - req_size, resp_size = m - req = req_cls.new(payload: Payload.new(body: nulls(req_size)), - response_type: :COMPRESSABLE, - response_parameters: [p_cls.new(size: resp_size)]) - yield req - resp = @queue.pop - assert_equal(:COMPRESSABLE, resp.payload.type, 'payload type is wrong') - assert_equal(resp_size, resp.payload.body.length, - "payload body #{count} has the wrong length") - p "OK: ping_pong #{count}" - count += 1 - unless @canceller_op.nil? - canceller_op.cancel - break - end - end - end -end - -# defines methods corresponding to each interop test case. -class NamedTests - include Minitest::Assertions - include Grpc::Testing - include Grpc::Testing::PayloadType - attr_accessor :assertions # required by Minitest::Assertions - - def initialize(stub, args) - @assertions = 0 # required by Minitest::Assertions - @stub = stub - @args = args - end - - def empty_unary - resp = @stub.empty_call(Empty.new) - assert resp.is_a?(Empty), 'empty_unary: invalid response' - p 'OK: empty_unary' - end - - def large_unary - perform_large_unary - p 'OK: large_unary' - end - - def service_account_creds - # ignore this test if the oauth options are not set - if @args.oauth_scope.nil? - p 'NOT RUN: service_account_creds; no service_account settings' - return - end - json_key = File.read(ENV[AUTH_ENV]) - wanted_email = MultiJson.load(json_key)['client_email'] - resp = perform_large_unary(fill_username: true, - fill_oauth_scope: true) - assert_equal(wanted_email, resp.username, - 'service_account_creds: incorrect username') - assert(@args.oauth_scope.include?(resp.oauth_scope), - 'service_account_creds: incorrect oauth_scope') - p 'OK: service_account_creds' - end - - def jwt_token_creds - json_key = File.read(ENV[AUTH_ENV]) - wanted_email = MultiJson.load(json_key)['client_email'] - resp = perform_large_unary(fill_username: true) - assert_equal(wanted_email, resp.username, - 'service_account_creds: incorrect username') - p 'OK: jwt_token_creds' - end - - def compute_engine_creds - resp = perform_large_unary(fill_username: true, - fill_oauth_scope: true) - assert_equal(@args.default_service_account, resp.username, - 'compute_engine_creds: incorrect username') - p 'OK: compute_engine_creds' - end - - def client_streaming - msg_sizes = [27_182, 8, 1828, 45_904] - wanted_aggregate_size = 74_922 - reqs = msg_sizes.map do |x| - req = Payload.new(body: nulls(x)) - StreamingInputCallRequest.new(payload: req) - end - resp = @stub.streaming_input_call(reqs) - assert_equal(wanted_aggregate_size, resp.aggregated_payload_size, - 'client_streaming: aggregate payload size is incorrect') - p 'OK: client_streaming' - end - - def server_streaming - msg_sizes = [31_415, 9, 2653, 58_979] - response_spec = msg_sizes.map { |s| ResponseParameters.new(size: s) } - req = StreamingOutputCallRequest.new(response_type: :COMPRESSABLE, - response_parameters: response_spec) - resps = @stub.streaming_output_call(req) - resps.each_with_index do |r, i| - assert i < msg_sizes.length, 'too many responses' - assert_equal(:COMPRESSABLE, r.payload.type, - 'payload type is wrong') - assert_equal(msg_sizes[i], r.payload.body.length, - 'payload body #{i} has the wrong length') - end - p 'OK: server_streaming' - end - - def ping_pong - msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]] - ppp = PingPongPlayer.new(msg_sizes) - resps = @stub.full_duplex_call(ppp.each_item) - resps.each { |r| ppp.queue.push(r) } - p 'OK: ping_pong' - end - - def cancel_after_begin - msg_sizes = [27_182, 8, 1828, 45_904] - reqs = msg_sizes.map do |x| - req = Payload.new(body: nulls(x)) - StreamingInputCallRequest.new(payload: req) - end - op = @stub.streaming_input_call(reqs, return_op: true) - op.cancel - assert_raises(GRPC::Cancelled) { op.execute } - assert(op.cancelled, 'call operation should be CANCELLED') - p 'OK: cancel_after_begin' - end - - def cancel_after_first_response - msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]] - ppp = PingPongPlayer.new(msg_sizes) - op = @stub.full_duplex_call(ppp.each_item, return_op: true) - ppp.canceller_op = op # causes ppp to cancel after the 1st message - op.execute.each { |r| ppp.queue.push(r) } - op.wait - assert(op.cancelled, 'call operation was not CANCELLED') - p 'OK: cancel_after_first_response' - end - - def all - all_methods = NamedTests.instance_methods(false).map(&:to_s) - all_methods.each do |m| - next if m == 'all' || m.start_with?('assert') - p "TESTCASE: #{m}" - method(m).call - end - end - - private - - def perform_large_unary(fill_username: false, fill_oauth_scope: false) - req_size, wanted_response_size = 271_828, 314_159 - payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size)) - req = SimpleRequest.new(response_type: :COMPRESSABLE, - response_size: wanted_response_size, - payload: payload) - req.fill_username = fill_username - req.fill_oauth_scope = fill_oauth_scope - resp = @stub.unary_call(req) - assert_equal(:COMPRESSABLE, resp.payload.type, - 'large_unary: payload had the wrong type') - assert_equal(wanted_response_size, resp.payload.body.length, - 'large_unary: payload had the wrong length') - assert_equal(nulls(wanted_response_size), resp.payload.body, - 'large_unary: payload content is invalid') - resp - end -end - -# Args is used to hold the command line info. -Args = Struct.new(:default_service_account, :host, :host_override, - :oauth_scope, :port, :secure, :test_case, - :use_test_ca) - -# validates the the command line options, returning them as a Hash. -def parse_args - args = Args.new - args.host_override = 'foo.test.google.fr' - OptionParser.new do |opts| - opts.on('--oauth_scope scope', - 'Scope for OAuth tokens') { |v| args['oauth_scope'] = v } - opts.on('--server_host SERVER_HOST', 'server hostname') do |v| - args['host'] = v - end - opts.on('--default_service_account email_address', - 'email address of the default service account') do |v| - args['default_service_account'] = v - end - opts.on('--server_host_override HOST_OVERRIDE', - 'override host via a HTTP header') do |v| - args['host_override'] = v - end - opts.on('--server_port SERVER_PORT', 'server port') { |v| args['port'] = v } - # instance_methods(false) gives only the methods defined in that class - test_cases = NamedTests.instance_methods(false).map(&:to_s) - test_case_list = test_cases.join(',') - opts.on('--test_case CODE', test_cases, {}, 'select a test_case', - " (#{test_case_list})") { |v| args['test_case'] = v } - opts.on('-s', '--use_tls', 'require a secure connection?') do |v| - args['secure'] = v - end - opts.on('-t', '--use_test_ca', - 'if secure, use the test certificate?') do |v| - args['use_test_ca'] = v - end - end.parse! - _check_args(args) -end - -def _check_args(args) - %w(host port test_case).each do |a| - if args[a].nil? - fail(OptionParser::MissingArgument, "please specify --#{arg}") - end - end - args -end - -def main - opts = parse_args - stub = create_stub(opts) - NamedTests.new(stub, opts).method(opts['test_case']).call -end +pb_dir = File.join(File.dirname(File.dirname(this_dir)), 'pb') +$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) -main +require 'test/client' diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb index 2ba8d2c19e..c6b0d00ec6 100755 --- a/src/ruby/bin/interop/interop_server.rb +++ b/src/ruby/bin/interop/interop_server.rb @@ -29,6 +29,12 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# ####################################################################### +# DEPRECATED: The behaviour in this file has been moved to pb/test/server.rb +# +# This file remains to support existing tools and scripts that use it. +# ###################################################################### +# # interop_server is a Testing app that runs a gRPC interop testing server. # # It helps validate interoperation b/w gRPC in different environments @@ -38,157 +44,7 @@ # Usage: $ path/to/interop_server.rb --port this_dir = File.expand_path(File.dirname(__FILE__)) -lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') -$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) -$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) - -require 'forwardable' -require 'optparse' - -require 'grpc' - -require 'test/cpp/interop/test_services' -require 'test/cpp/interop/messages' -require 'test/cpp/interop/empty' - -# loads the certificates by the test server. -def load_test_certs - this_dir = File.expand_path(File.dirname(__FILE__)) - data_dir = File.join(File.dirname(File.dirname(this_dir)), 'spec/testdata') - files = ['ca.pem', 'server1.key', 'server1.pem'] - files.map { |f| File.open(File.join(data_dir, f)).read } -end - -# creates a ServerCredentials from the test certificates. -def test_server_creds - certs = load_test_certs - GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2]) -end - -# produces a string of null chars (\0) of length l. -def nulls(l) - fail 'requires #{l} to be +ve' if l < 0 - [].pack('x' * l).force_encoding('utf-8') -end - -# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. -class EnumeratorQueue - extend Forwardable - def_delegators :@q, :push - - def initialize(sentinel) - @q = Queue.new - @sentinel = sentinel - end - - def each_item - return enum_for(:each_item) unless block_given? - loop do - r = @q.pop - break if r.equal?(@sentinel) - fail r if r.is_a? Exception - yield r - end - end -end - -# A runnable implementation of the schema-specified testing service, with each -# service method implemented as required by the interop testing spec. -class TestTarget < Grpc::Testing::TestService::Service - include Grpc::Testing - include Grpc::Testing::PayloadType - - def empty_call(_empty, _call) - Empty.new - end - - def unary_call(simple_req, _call) - req_size = simple_req.response_size - SimpleResponse.new(payload: Payload.new(type: :COMPRESSABLE, - body: nulls(req_size))) - end - - def streaming_input_call(call) - sizes = call.each_remote_read.map { |x| x.payload.body.length } - sum = sizes.inject { |s, x| s + x } - StreamingInputCallResponse.new(aggregated_payload_size: sum) - end - - def streaming_output_call(req, _call) - cls = StreamingOutputCallResponse - req.response_parameters.map do |p| - cls.new(payload: Payload.new(type: req.response_type, - body: nulls(p.size))) - end - end - - def full_duplex_call(reqs) - # reqs is a lazy Enumerator of the requests sent by the client. - q = EnumeratorQueue.new(self) - cls = StreamingOutputCallResponse - Thread.new do - begin - GRPC.logger.info('interop-server: started receiving') - reqs.each do |req| - resp_size = req.response_parameters[0].size - GRPC.logger.info("read a req, response size is #{resp_size}") - resp = cls.new(payload: Payload.new(type: req.response_type, - body: nulls(resp_size))) - q.push(resp) - end - GRPC.logger.info('interop-server: finished receiving') - q.push(self) - rescue StandardError => e - GRPC.logger.info('interop-server: failed') - GRPC.logger.warn(e) - q.push(e) # share the exception with the enumerator - end - end - q.each_item - end - - def half_duplex_call(reqs) - # TODO: update with unique behaviour of the half_duplex_call if that's - # ever required by any of the tests. - full_duplex_call(reqs) - end -end - -# validates the the command line options, returning them as a Hash. -def parse_options - options = { - 'port' => nil, - 'secure' => false - } - OptionParser.new do |opts| - opts.banner = 'Usage: --port port' - opts.on('--port PORT', 'server port') do |v| - options['port'] = v - end - opts.on('-s', '--use_tls', 'require a secure connection?') do |v| - options['secure'] = v - end - end.parse! - - if options['port'].nil? - fail(OptionParser::MissingArgument, 'please specify --port') - end - options -end - -def main - opts = parse_options - host = "0.0.0.0:#{opts['port']}" - s = GRPC::RpcServer.new - if opts['secure'] - s.add_http2_port(host, test_server_creds) - GRPC.logger.info("... running securely on #{host}") - else - s.add_http2_port(host) - GRPC.logger.info("... running insecurely on #{host}") - end - s.handle(TestTarget) - s.run_till_terminated -end +pb_dir = File.join(File.dirname(File.dirname(this_dir)), 'pb') +$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) -main +require 'test/server' diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 88659da535..b09d4e2cd9 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -170,7 +170,7 @@ static VALUE grpc_rb_call_cancel(VALUE self) { grpc_call *call = NULL; grpc_call_error err; TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); - err = grpc_call_cancel(call); + err = grpc_call_cancel(call, NULL); if (err != GRPC_CALL_OK) { rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)", grpc_call_error_detail_of(err), err); @@ -615,7 +615,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, /* call grpc_call_start_batch, then wait for it to complete using * pluck_event */ - err = grpc_call_start_batch(call, st.ops, st.op_num, ROBJECT(tag)); + err = grpc_call_start_batch(call, st.ops, st.op_num, ROBJECT(tag), NULL); if (err != GRPC_CALL_OK) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eCallError, @@ -629,13 +629,9 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); return Qnil; } - if (!ev.success) { - grpc_run_batch_stack_cleanup(&st); - rb_raise(grpc_rb_eCallError, "start_batch completion failed"); - return Qnil; - } - /* Build and return the BatchResult struct result */ + /* Build and return the BatchResult struct result, + if there is an error, it's reflected in the status */ result = grpc_run_batch_stack_build_result(&st); grpc_run_batch_stack_cleanup(&st); return result; diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 2129ba3485..6491aa4fb4 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -147,7 +147,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { target_chars = StringValueCStr(target); grpc_rb_hash_convert_to_channel_args(channel_args, &args); if (credentials == Qnil) { - ch = grpc_insecure_channel_create(target_chars, &args); + ch = grpc_insecure_channel_create(target_chars, &args, NULL); } else { creds = grpc_rb_get_wrapped_credentials(credentials); ch = grpc_secure_channel_create(creds, target_chars, &args); @@ -288,7 +288,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, call = grpc_channel_create_call(ch, parent_call, flags, cq, method_chars, host_chars, grpc_rb_time_timeval( deadline, - /* absolute time */ 0)); + /* absolute time */ 0), NULL); if (call == NULL) { rb_raise(rb_eRuntimeError, "cannot create call with method %s", method_chars); diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index b6674d7682..0bc9eb2a97 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -56,7 +56,7 @@ typedef struct next_call_stack { static void *grpc_rb_completion_queue_next_no_gil(void *param) { next_call_stack *const next_call = (next_call_stack*)param; next_call->event = - grpc_completion_queue_next(next_call->cq, next_call->timeout); + grpc_completion_queue_next(next_call->cq, next_call->timeout, NULL); return NULL; } @@ -64,7 +64,7 @@ static void *grpc_rb_completion_queue_next_no_gil(void *param) { static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { next_call_stack *const next_call = (next_call_stack*)param; next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag, - next_call->timeout); + next_call->timeout, NULL); return NULL; } @@ -128,7 +128,7 @@ static rb_data_type_t grpc_rb_completion_queue_data_type = { /* Allocates a completion queue. */ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) { - grpc_completion_queue *cq = grpc_completion_queue_create(); + grpc_completion_queue *cq = grpc_completion_queue_create(NULL); if (cq == NULL) { rb_raise(rb_eArgError, "could not create a completion queue: not sure why"); } diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 79a4ae8757..7e76349d2e 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -128,7 +128,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) { TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, wrapper); grpc_rb_hash_convert_to_channel_args(channel_args, &args); - srv = grpc_server_create(&args); + srv = grpc_server_create(&args, NULL); if (args.args != NULL) { xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */ @@ -136,7 +136,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) { if (srv == NULL) { rb_raise(rb_eRuntimeError, "could not create a gRPC server, not sure why"); } - grpc_server_register_completion_queue(srv, cq); + grpc_server_register_completion_queue(srv, cq, NULL); wrapper->wrapped = srv; /* Add the cq as the server's mark object. This ensures the ruby cq can't be diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index eb748458b9..20a6206e7e 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -24,16 +24,16 @@ Gem::Specification.new do |s| %w(math noproto).each do |b| s.executables += ["#{b}_client.rb", "#{b}_server.rb"] end - s.require_paths = %w( bin lib ) + s.executables += %w(grpc_ruby_interop_client grpc_ruby_interop_server) + s.require_paths = %w( bin lib pb ) s.platform = Gem::Platform::RUBY s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' - s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests - s.add_dependency 'logging', '~> 2.0' - s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests + s.add_dependency 'googleauth', '~> 0.4' - s.add_development_dependency 'simplecov', '~> 0.9' s.add_development_dependency 'bundler', '~> 1.9' + s.add_development_dependency 'logging', '~> 2.0' + s.add_development_dependency 'simplecov', '~> 0.9' s.add_development_dependency 'rake', '~> 10.4' s.add_development_dependency 'rake-compiler', '~> 0.9' s.add_development_dependency 'rspec', '~> 3.2' diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 215c0069a3..17da401c6b 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -74,8 +74,7 @@ module GRPC # # @param call [Call] a call on which to start and invocation # @param q [CompletionQueue] the completion queue - # @param deadline [Fixnum,TimeSpec] the deadline - def self.client_invoke(call, q, _deadline, **kw) + def self.client_invoke(call, q, **kw) fail(TypeError, '!Core::Call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(TypeError, '!Core::CompletionQueue') @@ -418,7 +417,7 @@ module GRPC # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, **kw, &blk) start_call(**kw) unless @started - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal) bd.run_on_client(requests, @op_notifier, &blk) end @@ -434,7 +433,7 @@ module GRPC # # @param gen_each_reply [Proc] generates the BiDi stream replies def run_server_bidi(gen_each_reply) - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal) bd.run_on_server(gen_each_reply) end @@ -456,7 +455,7 @@ module GRPC # Starts the call if not already started def start_call(**kw) return if @started - @metadata_tag = ActiveCall.client_invoke(@call, @cq, @deadline, **kw) + @metadata_tag = ActiveCall.client_invoke(@call, @cq, **kw) @started = true end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 3b0c71395c..9dbbb74caf 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -56,15 +56,13 @@ module GRPC # the call # @param marshal [Function] f(obj)->string that marshal requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param deadline [Fixnum] the deadline for the call to complete - def initialize(call, q, marshal, unmarshal, deadline) + def initialize(call, q, marshal, unmarshal) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue') end @call = call @cq = q - @deadline = deadline @marshal = marshal @op_notifier = nil # signals completion on clients @readq = Queue.new @@ -99,7 +97,7 @@ module GRPC # @param gen_each_reply [Proc] generates the BiDi stream replies. def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) - @loop_th = start_read_loop + @loop_th = start_read_loop(is_client: false) write_loop(replys, is_client: false) end @@ -127,7 +125,7 @@ module GRPC count += 1 req = @readq.pop GRPC.logger.debug("each_queued_msg: req = #{req}") - throw req if req.is_a? StandardError + fail req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end @@ -147,12 +145,9 @@ module GRPC GRPC.logger.debug("bidi-write-loop: #{count} writes done") if is_client GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") - batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, - SEND_CLOSE_FROM_CLIENT => nil, - RECV_STATUS_ON_CLIENT => nil) - @call.status = batch_result.status - batch_result.check_status - GRPC.logger.debug("bidi-write-loop: done status #{@call.status}") + @call.run_batch(@cq, write_tag, INFINITE_FUTURE, + SEND_CLOSE_FROM_CLIENT => nil) + GRPC.logger.debug('bidi-write-loop: done') notify_done end GRPC.logger.debug('bidi-write-loop: finished') @@ -164,7 +159,7 @@ module GRPC end # starts the read loop - def start_read_loop + def start_read_loop(is_client: true) Thread.new do GRPC.logger.debug('bidi-read-loop: starting') begin @@ -177,9 +172,19 @@ module GRPC # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, RECV_MESSAGE => nil) + # handle the next message if batch_result.message.nil? GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") + + if is_client + batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, + RECV_STATUS_ON_CLIENT => nil) + @call.status = batch_result.status + batch_result.check_status + GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") + end + @readq.push(END_OF_READS) GRPC.logger.debug('bidi-read-loop: done reading!') break diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index cce718537c..24ec1793f6 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -161,15 +161,21 @@ module GRPC # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param timeout [Numeric] (optional) the max completion time in seconds + # @param deadline [Time] (optional) the time the request should complete # @param parent [Core::Call] a prior call whose reserved metadata # will be propagated by this one. # @param return_op [true|false] return an Operation if true # @return [Object] the response received from the server - def request_response(method, req, marshal, unmarshal, timeout = nil, + def request_response(method, req, marshal, unmarshal, + deadline: nil, + timeout: nil, return_op: false, parent: parent, **kw) - c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) + c = new_active_call(method, marshal, unmarshal, + deadline: deadline, + timeout: timeout, + parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.request_response(req, **md) unless return_op @@ -222,16 +228,22 @@ module GRPC # @param requests [Object] an Enumerable of requests to send # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param timeout [Numeric] the max completion time in seconds + # @param timeout [Numeric] (optional) the max completion time in seconds + # @param deadline [Time] (optional) the time the request should complete # @param return_op [true|false] return an Operation if true # @param parent [Core::Call] a prior call whose reserved metadata # will be propagated by this one. # @return [Object|Operation] the response received from the server - def client_streamer(method, requests, marshal, unmarshal, timeout = nil, + def client_streamer(method, requests, marshal, unmarshal, + deadline: nil, + timeout: nil, return_op: false, parent: nil, **kw) - c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) + c = new_active_call(method, marshal, unmarshal, + deadline: deadline, + timeout: timeout, + parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.client_streamer(requests, **md) unless return_op @@ -292,18 +304,24 @@ module GRPC # @param req [Object] the request sent to the server # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - # @param timeout [Numeric] the max completion time in seconds + # @param timeout [Numeric] (optional) the max completion time in seconds + # @param deadline [Time] (optional) the time the request should complete # @param return_op [true|false]return an Operation if true # @param parent [Core::Call] a prior call whose reserved metadata # will be propagated by this one. # @param blk [Block] when provided, is executed for each response # @return [Enumerator|Operation|nil] as discussed above - def server_streamer(method, req, marshal, unmarshal, timeout = nil, + def server_streamer(method, req, marshal, unmarshal, + deadline: nil, + timeout: nil, return_op: false, parent: nil, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) + c = new_active_call(method, marshal, unmarshal, + deadline: deadline, + timeout: timeout, + parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.server_streamer(req, **md, &blk) unless return_op @@ -404,17 +422,23 @@ module GRPC # @param marshal [Function] f(obj)->string that marshals requests # @param unmarshal [Function] f(string)->obj that unmarshals responses # @param timeout [Numeric] (optional) the max completion time in seconds + # @param deadline [Time] (optional) the time the request should complete # @param parent [Core::Call] a prior call whose reserved metadata # will be propagated by this one. # @param return_op [true|false] return an Operation if true # @param blk [Block] when provided, is executed for each response # @return [Enumerator|nil|Operation] as discussed above - def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil, + def bidi_streamer(method, requests, marshal, unmarshal, + deadline: nil, + timeout: nil, return_op: false, parent: nil, **kw, &blk) - c = new_active_call(method, marshal, unmarshal, timeout, parent: parent) + c = new_active_call(method, marshal, unmarshal, + deadline: deadline, + timeout: timeout, + parent: parent) kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method) md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri) return c.bidi_streamer(requests, **md, &blk) unless return_op @@ -438,8 +462,13 @@ module GRPC # @param parent [Grpc::Call] a parent call, available when calls are # made from server # @param timeout [TimeConst] - def new_active_call(method, marshal, unmarshal, timeout = nil, parent: nil) - deadline = from_relative_time(timeout.nil? ? @timeout : timeout) + def new_active_call(method, marshal, unmarshal, + deadline: nil, + timeout: nil, + parent: nil) + if deadline.nil? + deadline = from_relative_time(timeout.nil? ? @timeout : timeout) + end call = @ch.create_call(@queue, parent, # parent call @propagate_mask, # propagation options diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb index 3b9743ea66..80ff669cca 100644 --- a/src/ruby/lib/grpc/generic/service.rb +++ b/src/ruby/lib/grpc/generic/service.rb @@ -174,26 +174,24 @@ module GRPC unmarshal = desc.unmarshal_proc(:output) route = "/#{route_prefix}/#{name}" if desc.request_response? - define_method(mth_name) do |req, deadline = nil, **kw| + define_method(mth_name) do |req, **kw| GRPC.logger.debug("calling #{@host}:#{route}") - request_response(route, req, marshal, unmarshal, deadline, **kw) + request_response(route, req, marshal, unmarshal, **kw) end elsif desc.client_streamer? - define_method(mth_name) do |reqs, deadline = nil, **kw| + define_method(mth_name) do |reqs, **kw| GRPC.logger.debug("calling #{@host}:#{route}") - client_streamer(route, reqs, marshal, unmarshal, deadline, **kw) + client_streamer(route, reqs, marshal, unmarshal, **kw) end elsif desc.server_streamer? - define_method(mth_name) do |req, deadline = nil, **kw, &blk| + define_method(mth_name) do |req, **kw, &blk| GRPC.logger.debug("calling #{@host}:#{route}") - server_streamer(route, req, marshal, unmarshal, deadline, **kw, - &blk) + server_streamer(route, req, marshal, unmarshal, **kw, &blk) end else # is a bidi_stream - define_method(mth_name) do |reqs, deadline = nil, **kw, &blk| + define_method(mth_name) do |reqs, **kw, &blk| GRPC.logger.debug("calling #{@host}:#{route}") - bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw, - &blk) + bidi_streamer(route, reqs, marshal, unmarshal, **kw, &blk) end end end diff --git a/src/ruby/lib/grpc/logconfig.rb b/src/ruby/lib/grpc/logconfig.rb index e9b4aa3c95..2bb7c86d5e 100644 --- a/src/ruby/lib/grpc/logconfig.rb +++ b/src/ruby/lib/grpc/logconfig.rb @@ -27,17 +27,32 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -require 'logging' - # GRPC contains the General RPC module. module GRPC - extend Logging.globally -end + # DefaultLogger is a module included in GRPC if no other logging is set up for + # it. See ../spec/spec_helpers an example of where other logging is added. + module DefaultLogger + def logger + LOGGER + end + + private + + # NoopLogger implements the methods of Ruby's conventional logging interface + # that are actually used internally within gRPC with a noop implementation. + class NoopLogger + def info(_ignored) + end -Logging.logger.root.appenders = Logging.appenders.stdout -Logging.logger.root.level = :info + def debug(_ignored) + end -# TODO: provide command-line configuration for logging -Logging.logger['GRPC'].level = :info -Logging.logger['GRPC::ActiveCall'].level = :info -Logging.logger['GRPC::BidiCall'].level = :info + def warn(_ignored) + end + end + + LOGGER = NoopLogger.new + end + + include DefaultLogger unless method_defined?(:logger) +end diff --git a/src/ruby/pb/README.md b/src/ruby/pb/README.md new file mode 100644 index 0000000000..84644e1098 --- /dev/null +++ b/src/ruby/pb/README.md @@ -0,0 +1,42 @@ +Protocol Buffers +================ + +This folder contains protocol buffers provided with gRPC ruby, and the generated +code to them. + +PREREQUISITES +------------- + +The code is is generated using the protoc (> 3.0.0.alpha.1) and the +grpc_ruby_plugin. These must be installed to regenerate the IDL defined +classes, but that's not necessary just to use them. + +health_check/v1alpha +-------------------- + +This package defines the surface of a simple health check service that gRPC +servers may choose to implement, and provides an implementation for it. To +re-generate the surface. + +```bash +$ # (from this directory) +$ protoc -I . grpc/health/v1alpha/health.proto \ + --grpc_out=. \ + --ruby_out=. \ + --plugin=protoc-gen-grpc=`which grpc_ruby_plugin` +``` + +test +---- + +This package defines the surface of the gRPC interop test service and client +To re-generate the surface, it's necessary to have checked-out versions of +the grpc interop test proto, e.g, by having the full gRPC repository. E.g, + +```bash +$ # (from this directory within the grpc repo) +$ protoc -I../../.. ../../../test/proto/{messages,test,empty}.proto \ + --grpc_out=. \ + --ruby_out=. \ + --plugin=protoc-gen-grpc=`which grpc_ruby_plugin` +``` diff --git a/src/ruby/pb/grpc/health/checker.rb b/src/ruby/pb/grpc/health/checker.rb new file mode 100644 index 0000000000..8c692e74f9 --- /dev/null +++ b/src/ruby/pb/grpc/health/checker.rb @@ -0,0 +1,75 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require 'grpc' +require 'grpc/health/v1alpha/health_services' +require 'thread' + +module Grpc + # Health contains classes and modules that support providing a health check + # service. + module Health + # Checker is implementation of the schema-specified health checking service. + class Checker < V1alpha::Health::Service + StatusCodes = GRPC::Core::StatusCodes + HealthCheckResponse = V1alpha::HealthCheckResponse + + # Initializes the statuses of participating services + def initialize + @statuses = {} + @status_mutex = Mutex.new # guards access to @statuses + end + + # Implements the rpc IDL API method + def check(req, _call) + status = nil + @status_mutex.synchronize do + status = @statuses["#{req.host}/#{req.service}"] + end + fail GRPC::BadStatus, StatusCodes::NOT_FOUND if status.nil? + HealthCheckResponse.new(status: status) + end + + # Adds the health status for a given host and service. + def add_status(host, service, status) + @status_mutex.synchronize { @statuses["#{host}/#{service}"] = status } + end + + # Clears the status for the given host or service. + def clear_status(host, service) + @status_mutex.synchronize { @statuses.delete("#{host}/#{service}") } + end + + # Clears alls the statuses. + def clear_all + @status_mutex.synchronize { @statuses = {} } + end + end + end +end diff --git a/src/ruby/pb/grpc/health/v1alpha/health.proto b/src/ruby/pb/grpc/health/v1alpha/health.proto new file mode 100644 index 0000000000..d31df1e0a7 --- /dev/null +++ b/src/ruby/pb/grpc/health/v1alpha/health.proto @@ -0,0 +1,50 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package grpc.health.v1alpha; + +message HealthCheckRequest { + string host = 1; + string service = 2; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + } + ServingStatus status = 1; +} + +service Health { + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); +}
\ No newline at end of file diff --git a/src/ruby/pb/grpc/health/v1alpha/health.rb b/src/ruby/pb/grpc/health/v1alpha/health.rb new file mode 100644 index 0000000000..9c04298ea5 --- /dev/null +++ b/src/ruby/pb/grpc/health/v1alpha/health.rb @@ -0,0 +1,29 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: grpc/health/v1alpha/health.proto + +require 'google/protobuf' + +Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "grpc.health.v1alpha.HealthCheckRequest" do + optional :host, :string, 1 + optional :service, :string, 2 + end + add_message "grpc.health.v1alpha.HealthCheckResponse" do + optional :status, :enum, 1, "grpc.health.v1alpha.HealthCheckResponse.ServingStatus" + end + add_enum "grpc.health.v1alpha.HealthCheckResponse.ServingStatus" do + value :UNKNOWN, 0 + value :SERVING, 1 + value :NOT_SERVING, 2 + end +end + +module Grpc + module Health + module V1alpha + HealthCheckRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.health.v1alpha.HealthCheckRequest").msgclass + HealthCheckResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.health.v1alpha.HealthCheckResponse").msgclass + HealthCheckResponse::ServingStatus = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.health.v1alpha.HealthCheckResponse.ServingStatus").enummodule + end + end +end diff --git a/src/ruby/pb/grpc/health/v1alpha/health_services.rb b/src/ruby/pb/grpc/health/v1alpha/health_services.rb new file mode 100644 index 0000000000..d5cba2e9ec --- /dev/null +++ b/src/ruby/pb/grpc/health/v1alpha/health_services.rb @@ -0,0 +1,28 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: grpc/health/v1alpha/health.proto for package 'grpc.health.v1alpha' + +require 'grpc' +require 'grpc/health/v1alpha/health' + +module Grpc + module Health + module V1alpha + module Health + + # TODO: add proto service documentation here + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'grpc.health.v1alpha.Health' + + rpc :Check, HealthCheckRequest, HealthCheckResponse + end + + Stub = Service.rpc_stub_class + end + end + end +end diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb new file mode 100755 index 0000000000..164e304b4d --- /dev/null +++ b/src/ruby/pb/test/client.rb @@ -0,0 +1,453 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# client is a testing tool that accesses a gRPC interop testing server and runs +# a test on it. +# +# Helps validate interoperation b/w different gRPC implementations. +# +# Usage: $ path/to/client.rb --server_host=<hostname> \ +# --server_port=<port> \ +# --test_case=<testcase_name> + +this_dir = File.expand_path(File.dirname(__FILE__)) +lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') +pb_dir = File.dirname(File.dirname(this_dir)) +$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) +$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) +$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) + +require 'optparse' + +require 'grpc' +require 'googleauth' +require 'google/protobuf' + +require 'test/proto/empty' +require 'test/proto/messages' +require 'test/proto/test_services' + +require 'signet/ssl_config' + +AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR + +# AssertionError is use to indicate interop test failures. +class AssertionError < RuntimeError; end + +# Fails with AssertionError if the block does evaluate to true +def assert(msg = 'unknown cause') + fail 'No assertion block provided' unless block_given? + fail AssertionError, msg unless yield +end + +# loads the certificates used to access the test server securely. +def load_test_certs + this_dir = File.expand_path(File.dirname(__FILE__)) + data_dir = File.join(File.dirname(File.dirname(this_dir)), 'spec/testdata') + files = ['ca.pem', 'server1.key', 'server1.pem'] + files.map { |f| File.open(File.join(data_dir, f)).read } +end + +# loads the certificates used to access the test server securely. +def load_prod_cert + fail 'could not find a production cert' if ENV['SSL_CERT_FILE'].nil? + GRPC.logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}") + File.open(ENV['SSL_CERT_FILE']).read +end + +# creates SSL Credentials from the test certificates. +def test_creds + certs = load_test_certs + GRPC::Core::Credentials.new(certs[0]) +end + +# creates SSL Credentials from the production certificates. +def prod_creds + cert_text = load_prod_cert + GRPC::Core::Credentials.new(cert_text) +end + +# creates the SSL Credentials. +def ssl_creds(use_test_ca) + return test_creds if use_test_ca + prod_creds +end + +# creates a test stub that accesses host:port securely. +def create_stub(opts) + address = "#{opts.host}:#{opts.port}" + if opts.secure + stub_opts = { + :creds => ssl_creds(opts.use_test_ca), + GRPC::Core::Channel::SSL_TARGET => opts.host_override + } + + # Add service account creds if specified + wants_creds = %w(all compute_engine_creds service_account_creds) + if wants_creds.include?(opts.test_case) + unless opts.oauth_scope.nil? + auth_creds = Google::Auth.get_application_default(opts.oauth_scope) + stub_opts[:update_metadata] = auth_creds.updater_proc + end + end + + if opts.test_case == 'oauth2_auth_token' + auth_creds = Google::Auth.get_application_default(opts.oauth_scope) + kw = auth_creds.updater_proc.call({}) # gives as an auth token + + # use a metadata update proc that just adds the auth token. + stub_opts[:update_metadata] = proc { |md| md.merge(kw) } + end + + if opts.test_case == 'jwt_token_creds' # don't use a scope + auth_creds = Google::Auth.get_application_default + stub_opts[:update_metadata] = auth_creds.updater_proc + end + + GRPC.logger.info("... connecting securely to #{address}") + Grpc::Testing::TestService::Stub.new(address, **stub_opts) + else + GRPC.logger.info("... connecting insecurely to #{address}") + Grpc::Testing::TestService::Stub.new(address) + end +end + +# produces a string of null chars (\0) of length l. +def nulls(l) + fail 'requires #{l} to be +ve' if l < 0 + [].pack('x' * l).force_encoding('utf-8') +end + +# a PingPongPlayer implements the ping pong bidi test. +class PingPongPlayer + include Grpc::Testing + include Grpc::Testing::PayloadType + attr_accessor :queue + attr_accessor :canceller_op + + # reqs is the enumerator over the requests + def initialize(msg_sizes) + @queue = Queue.new + @msg_sizes = msg_sizes + @canceller_op = nil # used to cancel after the first response + end + + def each_item + return enum_for(:each_item) unless block_given? + req_cls, p_cls = StreamingOutputCallRequest, ResponseParameters # short + count = 0 + @msg_sizes.each do |m| + req_size, resp_size = m + req = req_cls.new(payload: Payload.new(body: nulls(req_size)), + response_type: :COMPRESSABLE, + response_parameters: [p_cls.new(size: resp_size)]) + yield req + resp = @queue.pop + assert('payload type is wrong') { :COMPRESSABLE == resp.payload.type } + assert("payload body #{count} has the wrong length") do + resp_size == resp.payload.body.length + end + p "OK: ping_pong #{count}" + count += 1 + unless @canceller_op.nil? + canceller_op.cancel + break + end + end + end +end + +# defines methods corresponding to each interop test case. +class NamedTests + include Grpc::Testing + include Grpc::Testing::PayloadType + + def initialize(stub, args) + @stub = stub + @args = args + end + + def empty_unary + resp = @stub.empty_call(Empty.new) + assert('empty_unary: invalid response') { resp.is_a?(Empty) } + p 'OK: empty_unary' + end + + def large_unary + perform_large_unary + p 'OK: large_unary' + end + + def service_account_creds + # ignore this test if the oauth options are not set + if @args.oauth_scope.nil? + p 'NOT RUN: service_account_creds; no service_account settings' + return + end + json_key = File.read(ENV[AUTH_ENV]) + wanted_email = MultiJson.load(json_key)['client_email'] + resp = perform_large_unary(fill_username: true, + fill_oauth_scope: true) + assert("#{__callee__}: bad username") { wanted_email == resp.username } + assert("#{__callee__}: bad oauth scope") do + @args.oauth_scope.include?(resp.oauth_scope) + end + p "OK: #{__callee__}" + end + + def jwt_token_creds + json_key = File.read(ENV[AUTH_ENV]) + wanted_email = MultiJson.load(json_key)['client_email'] + resp = perform_large_unary(fill_username: true) + assert("#{__callee__}: bad username") { wanted_email == resp.username } + p "OK: #{__callee__}" + end + + def compute_engine_creds + resp = perform_large_unary(fill_username: true, + fill_oauth_scope: true) + assert("#{__callee__}: bad username") do + @args.default_service_account == resp.username + end + p "OK: #{__callee__}" + end + + def oauth2_auth_token + resp = perform_large_unary(fill_username: true, + fill_oauth_scope: true) + json_key = File.read(ENV[AUTH_ENV]) + wanted_email = MultiJson.load(json_key)['client_email'] + assert("#{__callee__}: bad username") { wanted_email == resp.username } + assert("#{__callee__}: bad oauth scope") do + @args.oauth_scope.include?(resp.oauth_scope) + end + p "OK: #{__callee__}" + end + + def per_rpc_creds + auth_creds = Google::Auth.get_application_default(@args.oauth_scope) + kw = auth_creds.updater_proc.call({}) + resp = perform_large_unary(fill_username: true, + fill_oauth_scope: true, + **kw) + json_key = File.read(ENV[AUTH_ENV]) + wanted_email = MultiJson.load(json_key)['client_email'] + assert("#{__callee__}: bad username") { wanted_email == resp.username } + assert("#{__callee__}: bad oauth scope") do + @args.oauth_scope.include?(resp.oauth_scope) + end + p "OK: #{__callee__}" + end + + def client_streaming + msg_sizes = [27_182, 8, 1828, 45_904] + wanted_aggregate_size = 74_922 + reqs = msg_sizes.map do |x| + req = Payload.new(body: nulls(x)) + StreamingInputCallRequest.new(payload: req) + end + resp = @stub.streaming_input_call(reqs) + assert("#{__callee__}: aggregate payload size is incorrect") do + wanted_aggregate_size == resp.aggregated_payload_size + end + p "OK: #{__callee__}" + end + + def server_streaming + msg_sizes = [31_415, 9, 2653, 58_979] + response_spec = msg_sizes.map { |s| ResponseParameters.new(size: s) } + req = StreamingOutputCallRequest.new(response_type: :COMPRESSABLE, + response_parameters: response_spec) + resps = @stub.streaming_output_call(req) + resps.each_with_index do |r, i| + assert("#{__callee__}: too many responses") { i < msg_sizes.length } + assert("#{__callee__}: payload body #{i} has the wrong length") do + msg_sizes[i] == r.payload.body.length + end + assert("#{__callee__}: payload type is wrong") do + :COMPRESSABLE == r.payload.type + end + end + p "OK: #{__callee__}" + end + + def ping_pong + msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]] + ppp = PingPongPlayer.new(msg_sizes) + resps = @stub.full_duplex_call(ppp.each_item) + resps.each { |r| ppp.queue.push(r) } + p "OK: #{__callee__}" + end + + def timeout_on_sleeping_server + msg_sizes = [[27_182, 31_415]] + ppp = PingPongPlayer.new(msg_sizes) + resps = @stub.full_duplex_call(ppp.each_item, timeout: 0.001) + resps.each { |r| ppp.queue.push(r) } + fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)' + rescue GRPC::BadStatus => e + assert("#{__callee__}: status was wrong") do + e.code == GRPC::Core::StatusCodes::DEADLINE_EXCEEDED + end + p "OK: #{__callee__}" + end + + def empty_stream + ppp = PingPongPlayer.new([]) + resps = @stub.full_duplex_call(ppp.each_item) + count = 0 + resps.each do |r| + ppp.queue.push(r) + count += 1 + end + assert("#{__callee__}: too many responses expected 0") do + count == 0 + end + p "OK: #{__callee__}" + end + + def cancel_after_begin + msg_sizes = [27_182, 8, 1828, 45_904] + reqs = msg_sizes.map do |x| + req = Payload.new(body: nulls(x)) + StreamingInputCallRequest.new(payload: req) + end + op = @stub.streaming_input_call(reqs, return_op: true) + op.cancel + op.execute + fail 'Should have raised GRPC:Cancelled' + rescue GRPC::Cancelled + assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled } + p "OK: #{__callee__}" + end + + def cancel_after_first_response + msg_sizes = [[27_182, 31_415], [8, 9], [1828, 2653], [45_904, 58_979]] + ppp = PingPongPlayer.new(msg_sizes) + op = @stub.full_duplex_call(ppp.each_item, return_op: true) + ppp.canceller_op = op # causes ppp to cancel after the 1st message + op.execute.each { |r| ppp.queue.push(r) } + fail 'Should have raised GRPC:Cancelled' + rescue GRPC::Cancelled + assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled } + op.wait + p "OK: #{__callee__}" + end + + def all + all_methods = NamedTests.instance_methods(false).map(&:to_s) + all_methods.each do |m| + next if m == 'all' || m.start_with?('assert') + p "TESTCASE: #{m}" + method(m).call + end + end + + private + + def perform_large_unary(fill_username: false, fill_oauth_scope: false, **kw) + req_size, wanted_response_size = 271_828, 314_159 + payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size)) + req = SimpleRequest.new(response_type: :COMPRESSABLE, + response_size: wanted_response_size, + payload: payload) + req.fill_username = fill_username + req.fill_oauth_scope = fill_oauth_scope + resp = @stub.unary_call(req, **kw) + assert('payload type is wrong') do + :COMPRESSABLE == resp.payload.type + end + assert('payload body has the wrong length') do + wanted_response_size == resp.payload.body.length + end + assert('payload body is invalid') do + nulls(wanted_response_size) == resp.payload.body + end + resp + end +end + +# Args is used to hold the command line info. +Args = Struct.new(:default_service_account, :host, :host_override, + :oauth_scope, :port, :secure, :test_case, + :use_test_ca) + +# validates the the command line options, returning them as a Hash. +def parse_args + args = Args.new + args.host_override = 'foo.test.google.fr' + OptionParser.new do |opts| + opts.on('--oauth_scope scope', + 'Scope for OAuth tokens') { |v| args['oauth_scope'] = v } + opts.on('--server_host SERVER_HOST', 'server hostname') do |v| + args['host'] = v + end + opts.on('--default_service_account email_address', + 'email address of the default service account') do |v| + args['default_service_account'] = v + end + opts.on('--server_host_override HOST_OVERRIDE', + 'override host via a HTTP header') do |v| + args['host_override'] = v + end + opts.on('--server_port SERVER_PORT', 'server port') { |v| args['port'] = v } + # instance_methods(false) gives only the methods defined in that class + test_cases = NamedTests.instance_methods(false).map(&:to_s) + test_case_list = test_cases.join(',') + opts.on('--test_case CODE', test_cases, {}, 'select a test_case', + " (#{test_case_list})") { |v| args['test_case'] = v } + opts.on('-s', '--use_tls', 'require a secure connection?') do |v| + args['secure'] = v + end + opts.on('-t', '--use_test_ca', + 'if secure, use the test certificate?') do |v| + args['use_test_ca'] = v + end + end.parse! + _check_args(args) +end + +def _check_args(args) + %w(host port test_case).each do |a| + if args[a].nil? + fail(OptionParser::MissingArgument, "please specify --#{a}") + end + end + args +end + +def main + opts = parse_args + stub = create_stub(opts) + NamedTests.new(stub, opts).method(opts['test_case']).call +end + +main diff --git a/src/ruby/pb/test/proto/empty.rb b/src/ruby/pb/test/proto/empty.rb new file mode 100644 index 0000000000..559adcc85e --- /dev/null +++ b/src/ruby/pb/test/proto/empty.rb @@ -0,0 +1,15 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test/proto/empty.proto + +require 'google/protobuf' + +Google::Protobuf::DescriptorPool.generated_pool.build do + add_message "grpc.testing.Empty" do + end +end + +module Grpc + module Testing + Empty = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Empty").msgclass + end +end diff --git a/src/ruby/bin/interop/test/cpp/interop/messages.rb b/src/ruby/pb/test/proto/messages.rb index 89c349b406..9b7f977285 100644 --- a/src/ruby/bin/interop/test/cpp/interop/messages.rb +++ b/src/ruby/pb/test/proto/messages.rb @@ -1,34 +1,5 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: test/cpp/interop/messages.proto +# source: test/proto/messages.proto require 'google/protobuf' @@ -37,12 +8,18 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :type, :enum, 1, "grpc.testing.PayloadType" optional :body, :string, 2 end + add_message "grpc.testing.EchoStatus" do + optional :code, :int32, 1 + optional :message, :string, 2 + end add_message "grpc.testing.SimpleRequest" do optional :response_type, :enum, 1, "grpc.testing.PayloadType" optional :response_size, :int32, 2 optional :payload, :message, 3, "grpc.testing.Payload" optional :fill_username, :bool, 4 optional :fill_oauth_scope, :bool, 5 + optional :response_compression, :enum, 6, "grpc.testing.CompressionType" + optional :response_status, :message, 7, "grpc.testing.EchoStatus" end add_message "grpc.testing.SimpleResponse" do optional :payload, :message, 1, "grpc.testing.Payload" @@ -63,20 +40,32 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :response_type, :enum, 1, "grpc.testing.PayloadType" repeated :response_parameters, :message, 2, "grpc.testing.ResponseParameters" optional :payload, :message, 3, "grpc.testing.Payload" + optional :response_compression, :enum, 6, "grpc.testing.CompressionType" + optional :response_status, :message, 7, "grpc.testing.EchoStatus" end add_message "grpc.testing.StreamingOutputCallResponse" do optional :payload, :message, 1, "grpc.testing.Payload" end + add_message "grpc.testing.ReconnectInfo" do + optional :passed, :bool, 1 + repeated :backoff_ms, :int32, 2 + end add_enum "grpc.testing.PayloadType" do value :COMPRESSABLE, 0 value :UNCOMPRESSABLE, 1 value :RANDOM, 2 end + add_enum "grpc.testing.CompressionType" do + value :NONE, 0 + value :GZIP, 1 + value :DEFLATE, 2 + end end module Grpc module Testing Payload = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Payload").msgclass + EchoStatus = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.EchoStatus").msgclass SimpleRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleRequest").msgclass SimpleResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleResponse").msgclass StreamingInputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallRequest").msgclass @@ -84,6 +73,8 @@ module Grpc ResponseParameters = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ResponseParameters").msgclass StreamingOutputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallRequest").msgclass StreamingOutputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallResponse").msgclass + ReconnectInfo = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ReconnectInfo").msgclass PayloadType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PayloadType").enummodule + CompressionType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.CompressionType").enummodule end end diff --git a/src/ruby/pb/test/proto/test.rb b/src/ruby/pb/test/proto/test.rb new file mode 100644 index 0000000000..100eb6505c --- /dev/null +++ b/src/ruby/pb/test/proto/test.rb @@ -0,0 +1,14 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test/proto/test.proto + +require 'google/protobuf' + +require 'test/proto/empty' +require 'test/proto/messages' +Google::Protobuf::DescriptorPool.generated_pool.build do +end + +module Grpc + module Testing + end +end diff --git a/src/ruby/pb/test/proto/test_services.rb b/src/ruby/pb/test/proto/test_services.rb new file mode 100644 index 0000000000..9df9cc5860 --- /dev/null +++ b/src/ruby/pb/test/proto/test_services.rb @@ -0,0 +1,64 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# Source: test/proto/test.proto for package 'grpc.testing' + +require 'grpc' +require 'test/proto/test' + +module Grpc + module Testing + module TestService + + # TODO: add proto service documentation here + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'grpc.testing.TestService' + + rpc :EmptyCall, Empty, Empty + rpc :UnaryCall, SimpleRequest, SimpleResponse + rpc :StreamingOutputCall, StreamingOutputCallRequest, stream(StreamingOutputCallResponse) + rpc :StreamingInputCall, stream(StreamingInputCallRequest), StreamingInputCallResponse + rpc :FullDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) + rpc :HalfDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse) + end + + Stub = Service.rpc_stub_class + end + module UnimplementedService + + # TODO: add proto service documentation here + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'grpc.testing.UnimplementedService' + + rpc :UnimplementedCall, Empty, Empty + end + + Stub = Service.rpc_stub_class + end + module ReconnectService + + # TODO: add proto service documentation here + class Service + + include GRPC::GenericService + + self.marshal_class_method = :encode + self.unmarshal_class_method = :decode + self.service_name = 'grpc.testing.ReconnectService' + + rpc :Start, Empty, Empty + rpc :Stop, Empty, ReconnectInfo + end + + Stub = Service.rpc_stub_class + end + end +end diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb new file mode 100755 index 0000000000..e2e1ecbd62 --- /dev/null +++ b/src/ruby/pb/test/server.rb @@ -0,0 +1,196 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# interop_server is a Testing app that runs a gRPC interop testing server. +# +# It helps validate interoperation b/w gRPC in different environments +# +# Helps validate interoperation b/w different gRPC implementations. +# +# Usage: $ path/to/interop_server.rb --port + +this_dir = File.expand_path(File.dirname(__FILE__)) +lib_dir = File.join(File.dirname(File.dirname(this_dir)), 'lib') +pb_dir = File.dirname(File.dirname(this_dir)) +$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) +$LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) +$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) + +require 'forwardable' +require 'optparse' + +require 'grpc' + +require 'test/proto/empty' +require 'test/proto/messages' +require 'test/proto/test_services' + +# loads the certificates by the test server. +def load_test_certs + this_dir = File.expand_path(File.dirname(__FILE__)) + data_dir = File.join(File.dirname(File.dirname(this_dir)), 'spec/testdata') + files = ['ca.pem', 'server1.key', 'server1.pem'] + files.map { |f| File.open(File.join(data_dir, f)).read } +end + +# creates a ServerCredentials from the test certificates. +def test_server_creds + certs = load_test_certs + GRPC::Core::ServerCredentials.new(nil, certs[1], certs[2]) +end + +# produces a string of null chars (\0) of length l. +def nulls(l) + fail 'requires #{l} to be +ve' if l < 0 + [].pack('x' * l).force_encoding('utf-8') +end + +# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. +class EnumeratorQueue + extend Forwardable + def_delegators :@q, :push + + def initialize(sentinel) + @q = Queue.new + @sentinel = sentinel + end + + def each_item + return enum_for(:each_item) unless block_given? + loop do + r = @q.pop + break if r.equal?(@sentinel) + fail r if r.is_a? Exception + yield r + end + end +end + +# A runnable implementation of the schema-specified testing service, with each +# service method implemented as required by the interop testing spec. +class TestTarget < Grpc::Testing::TestService::Service + include Grpc::Testing + include Grpc::Testing::PayloadType + + def empty_call(_empty, _call) + Empty.new + end + + def unary_call(simple_req, _call) + req_size = simple_req.response_size + SimpleResponse.new(payload: Payload.new(type: :COMPRESSABLE, + body: nulls(req_size))) + end + + def streaming_input_call(call) + sizes = call.each_remote_read.map { |x| x.payload.body.length } + sum = sizes.inject { |s, x| s + x } + StreamingInputCallResponse.new(aggregated_payload_size: sum) + end + + def streaming_output_call(req, _call) + cls = StreamingOutputCallResponse + req.response_parameters.map do |p| + cls.new(payload: Payload.new(type: req.response_type, + body: nulls(p.size))) + end + end + + def full_duplex_call(reqs) + # reqs is a lazy Enumerator of the requests sent by the client. + q = EnumeratorQueue.new(self) + cls = StreamingOutputCallResponse + Thread.new do + begin + GRPC.logger.info('interop-server: started receiving') + reqs.each do |req| + resp_size = req.response_parameters[0].size + GRPC.logger.info("read a req, response size is #{resp_size}") + resp = cls.new(payload: Payload.new(type: req.response_type, + body: nulls(resp_size))) + q.push(resp) + end + GRPC.logger.info('interop-server: finished receiving') + q.push(self) + rescue StandardError => e + GRPC.logger.info('interop-server: failed') + GRPC.logger.warn(e) + q.push(e) # share the exception with the enumerator + end + end + q.each_item + end + + def half_duplex_call(reqs) + # TODO: update with unique behaviour of the half_duplex_call if that's + # ever required by any of the tests. + full_duplex_call(reqs) + end +end + +# validates the the command line options, returning them as a Hash. +def parse_options + options = { + 'port' => nil, + 'secure' => false + } + OptionParser.new do |opts| + opts.banner = 'Usage: --port port' + opts.on('--port PORT', 'server port') do |v| + options['port'] = v + end + opts.on('-s', '--use_tls', 'require a secure connection?') do |v| + options['secure'] = v + end + end.parse! + + if options['port'].nil? + fail(OptionParser::MissingArgument, 'please specify --port') + end + options +end + +def main + opts = parse_options + host = "0.0.0.0:#{opts['port']}" + s = GRPC::RpcServer.new + if opts['secure'] + s.add_http2_port(host, test_server_creds) + GRPC.logger.info("... running securely on #{host}") + else + s.add_http2_port(host) + GRPC.logger.info("... running insecurely on #{host}") + end + s.handle(TestTarget) + s.run_till_terminated +end + +main diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 0bf65ba2e9..26208b714a 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -57,7 +57,7 @@ describe GRPC::ActiveCall do describe 'restricted view methods' do before(:each) do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -87,7 +87,7 @@ describe GRPC::ActiveCall do describe '#remote_send' do it 'allows a client to send a payload to the server' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) @client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -111,7 +111,7 @@ describe GRPC::ActiveCall do it 'marshals the payload using the marshal func' do call = make_test_call - ActiveCall.client_invoke(call, @client_queue, deadline) + ActiveCall.client_invoke(call, @client_queue) marshal = proc { |x| 'marshalled:' + x } client_call = ActiveCall.new(call, @client_queue, marshal, @pass_through, deadline) @@ -134,8 +134,7 @@ describe GRPC::ActiveCall do describe '#client_invoke' do it 'sends keywords as metadata to the server when the are present' do call = make_test_call - ActiveCall.client_invoke(call, @client_queue, deadline, - k1: 'v1', k2: 'v2') + ActiveCall.client_invoke(call, @client_queue, k1: 'v1', k2: 'v2') recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) recvd_call = recvd_rpc.call expect(recvd_call).to_not be_nil @@ -148,7 +147,7 @@ describe GRPC::ActiveCall do describe '#remote_read' do it 'reads the response sent by a server' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -161,7 +160,7 @@ describe GRPC::ActiveCall do it 'saves no metadata when the server adds no metadata' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -176,7 +175,7 @@ describe GRPC::ActiveCall do it 'saves metadata add by the server' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -192,7 +191,7 @@ describe GRPC::ActiveCall do it 'get a nil msg before a status when an OK status is sent' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -209,7 +208,7 @@ describe GRPC::ActiveCall do it 'unmarshals the response using the unmarshal func' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) unmarshal = proc { |x| 'unmarshalled:' + x } client_call = ActiveCall.new(call, @client_queue, @pass_through, unmarshal, deadline, @@ -234,7 +233,7 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that can read n responses' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -252,7 +251,7 @@ describe GRPC::ActiveCall do it 'the returns an enumerator that stops after an OK Status' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -275,7 +274,7 @@ describe GRPC::ActiveCall do describe '#writes_done' do it 'finishes ok if the server sends a status response' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -291,7 +290,7 @@ describe GRPC::ActiveCall do it 'finishes ok if the server sends an early status response' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) @@ -307,7 +306,7 @@ describe GRPC::ActiveCall do it 'finishes ok if writes_done is true' do call = make_test_call - md_tag = ActiveCall.client_invoke(call, @client_queue, deadline) + md_tag = ActiveCall.client_invoke(call, @client_queue) client_call = ActiveCall.new(call, @client_queue, @pass_through, @pass_through, deadline, metadata_tag: md_tag) diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 68d4b11790..edcc962a7d 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -408,6 +408,26 @@ describe 'ClientStub' do it_behaves_like 'bidi streaming' end + + describe 'without enough time to run' do + before(:each) do + @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } + @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } + server_port = create_test_server + @host = "localhost:#{server_port}" + end + + it 'should fail with DeadlineExceeded', bidi: true do + @server.start + stub = GRPC::ClientStub.new(@host, @cq) + blk = proc do + e = stub.bidi_streamer(@method, @sent_msgs, noop, noop, + timeout: 0.001) + e.collect { |r| r } + end + expect(&blk).to raise_error GRPC::BadStatus, /Deadline Exceeded/ + end + end end def run_server_streamer(expected_input, replys, status, **kw) diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 0326f6e894..1295fd7fdd 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -396,8 +396,9 @@ describe GRPC::RpcServer do @srv.wait_till_running req = EchoMsg.new stub = SlowStub.new(@host, **client_opts) - deadline = service.delay + 1.0 # wait for long enough - expect(stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2')).to be_a(EchoMsg) + timeout = service.delay + 1.0 # wait for long enough + resp = stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2') + expect(resp).to be_a(EchoMsg) wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] check_md(wanted_md, service.received_md) @srv.stop @@ -411,8 +412,8 @@ describe GRPC::RpcServer do @srv.wait_till_running req = EchoMsg.new stub = SlowStub.new(@host, **client_opts) - deadline = 0.1 # too short for SlowService to respond - blk = proc { stub.an_rpc(req, deadline, k1: 'v1', k2: 'v2') } + timeout = 0.1 # too short for SlowService to respond + blk = proc { stub.an_rpc(req, timeout: timeout, k1: 'v1', k2: 'v2') } expect(&blk).to raise_error GRPC::BadStatus wanted_md = [] expect(service.received_md).to eq(wanted_md) diff --git a/src/ruby/spec/pb/health/checker_spec.rb b/src/ruby/spec/pb/health/checker_spec.rb new file mode 100644 index 0000000000..6999a69105 --- /dev/null +++ b/src/ruby/spec/pb/health/checker_spec.rb @@ -0,0 +1,233 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require 'grpc' +require 'grpc/health/v1alpha/health' +require 'grpc/health/checker' +require 'open3' + +def can_run_codegen_check + system('which grpc_ruby_plugin') && system('which protoc') +end + +describe 'Health protobuf code generation' do + context 'the health service file used by grpc/health/checker' do + if !can_run_codegen_check + skip 'protoc || grpc_ruby_plugin missing, cannot verify health code-gen' + else + it 'should already be loaded indirectly i.e, used by the other specs' do + expect(require('grpc/health/v1alpha/health_services')).to be(false) + end + + it 'should have the same content as created by code generation' do + root_dir = File.dirname( + File.dirname(File.dirname(File.dirname(__FILE__)))) + pb_dir = File.join(root_dir, 'pb') + + # Get the current content + service_path = File.join(pb_dir, 'grpc', 'health', 'v1alpha', + 'health_services.rb') + want = nil + File.open(service_path) { |f| want = f.read } + + # Regenerate it + plugin, = Open3.capture2('which', 'grpc_ruby_plugin') + plugin = plugin.strip + got = nil + Dir.mktmpdir do |tmp_dir| + gen_out = File.join(tmp_dir, 'grpc', 'health', 'v1alpha', + 'health_services.rb') + pid = spawn( + 'protoc', + '-I.', + 'grpc/health/v1alpha/health.proto', + "--grpc_out=#{tmp_dir}", + "--plugin=protoc-gen-grpc=#{plugin}", + chdir: pb_dir) + Process.wait(pid) + File.open(gen_out) { |f| got = f.read } + end + expect(got).to eq(want) + end + end + end +end + +describe Grpc::Health::Checker do + StatusCodes = GRPC::Core::StatusCodes + ServingStatus = Grpc::Health::V1alpha::HealthCheckResponse::ServingStatus + HCResp = Grpc::Health::V1alpha::HealthCheckResponse + HCReq = Grpc::Health::V1alpha::HealthCheckRequest + success_tests = + [ + { + desc: 'neither host or service are specified', + host: '', + service: '' + }, { + desc: 'only the host is specified', + host: 'test-fake-host', + service: '' + }, { + desc: 'the host and service are specified', + host: 'test-fake-host', + service: 'fake-service-1' + }, { + desc: 'only the service is specified', + host: '', + service: 'fake-service-2' + } + ] + + context 'initialization' do + it 'can be constructed with no args' do + expect(subject).to_not be(nil) + end + end + + context 'method `add_status` and `check`' do + success_tests.each do |t| + it "should succeed when #{t[:desc]}" do + subject.add_status(t[:host], t[:service], ServingStatus::NOT_SERVING) + got = subject.check(HCReq.new(host: t[:host], service: t[:service]), + nil) + want = HCResp.new(status: ServingStatus::NOT_SERVING) + expect(got).to eq(want) + end + end + end + + context 'method `check`' do + success_tests.each do |t| + it "should fail with NOT_FOUND when #{t[:desc]}" do + blk = proc do + subject.check(HCReq.new(host: t[:host], service: t[:service]), nil) + end + expected_msg = /#{StatusCodes::NOT_FOUND}/ + expect(&blk).to raise_error GRPC::BadStatus, expected_msg + end + end + end + + context 'method `clear_status`' do + success_tests.each do |t| + it "should fail after clearing status when #{t[:desc]}" do + subject.add_status(t[:host], t[:service], ServingStatus::NOT_SERVING) + got = subject.check(HCReq.new(host: t[:host], service: t[:service]), + nil) + want = HCResp.new(status: ServingStatus::NOT_SERVING) + expect(got).to eq(want) + + subject.clear_status(t[:host], t[:service]) + blk = proc do + subject.check(HCReq.new(host: t[:host], service: t[:service]), + nil) + end + expected_msg = /#{StatusCodes::NOT_FOUND}/ + expect(&blk).to raise_error GRPC::BadStatus, expected_msg + end + end + end + + context 'method `clear_all`' do + it 'should return NOT_FOUND after being invoked' do + success_tests.each do |t| + subject.add_status(t[:host], t[:service], ServingStatus::NOT_SERVING) + got = subject.check(HCReq.new(host: t[:host], service: t[:service]), + nil) + want = HCResp.new(status: ServingStatus::NOT_SERVING) + expect(got).to eq(want) + end + + subject.clear_all + + success_tests.each do |t| + blk = proc do + subject.check(HCReq.new(host: t[:host], service: t[:service]), nil) + end + expected_msg = /#{StatusCodes::NOT_FOUND}/ + expect(&blk).to raise_error GRPC::BadStatus, expected_msg + end + end + end + + describe 'running on RpcServer' do + RpcServer = GRPC::RpcServer + StatusCodes = GRPC::Core::StatusCodes + CheckerStub = Grpc::Health::Checker.rpc_stub_class + + before(:each) do + @server_queue = GRPC::Core::CompletionQueue.new + server_host = '0.0.0.0:0' + @server = GRPC::Core::Server.new(@server_queue, nil) + server_port = @server.add_http2_port(server_host) + @host = "localhost:#{server_port}" + @ch = GRPC::Core::Channel.new(@host, nil) + @client_opts = { channel_override: @ch } + server_opts = { + server_override: @server, + completion_queue_override: @server_queue, + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + end + + after(:each) do + @srv.stop + end + + it 'should receive the correct status', server: true do + @srv.handle(subject) + subject.add_status('', '', ServingStatus::NOT_SERVING) + t = Thread.new { @srv.run } + @srv.wait_till_running + + stub = CheckerStub.new(@host, **@client_opts) + got = stub.check(HCReq.new) + want = HCResp.new(status: ServingStatus::NOT_SERVING) + expect(got).to eq(want) + @srv.stop + t.join + end + + it 'should fail on unknown services', server: true do + @srv.handle(subject) + t = Thread.new { @srv.run } + @srv.wait_till_running + blk = proc do + stub = CheckerStub.new(@host, **@client_opts) + stub.check(HCReq.new(host: 'unknown', service: 'unknown')) + end + expected_msg = /#{StatusCodes::NOT_FOUND}/ + expect(&blk).to raise_error GRPC::BadStatus, expected_msg + @srv.stop + t.join + end + end +end diff --git a/src/ruby/spec/spec_helper.rb b/src/ruby/spec/spec_helper.rb index 270d2e97d3..c891c1bf5e 100644 --- a/src/ruby/spec/spec_helper.rb +++ b/src/ruby/spec/spec_helper.rb @@ -47,11 +47,23 @@ require 'rspec' require 'logging' require 'rspec/logging_helper' +# GRPC is the general RPC module +# +# Configure its logging for fine-grained log control during test runs +module GRPC + extend Logging.globally +end +Logging.logger.root.appenders = Logging.appenders.stdout +Logging.logger.root.level = :info +Logging.logger['GRPC'].level = :info +Logging.logger['GRPC::ActiveCall'].level = :info +Logging.logger['GRPC::BidiCall'].level = :info + # Configure RSpec to capture log messages for each test. The output from the # logs will be stored in the @log_output variable. It is a StringIO instance. RSpec.configure do |config| include RSpec::LoggingHelper - config.capture_log_messages + config.capture_log_messages # comment this out to see logs during test runs end RSpec::Expectations.configuration.warn_about_potential_false_positives = false |