diff options
author | 2015-05-24 15:58:14 -0700 | |
---|---|---|
committer | 2015-05-24 15:58:14 -0700 | |
commit | 8b5276c02ec7aebdf749bfd95d140406e6ea2226 (patch) | |
tree | 8389493193029a3cb5c80b11f595bc9d4d5035d7 /src | |
parent | 24d115654056aa2be3d548d80ea97937eb15b2a1 (diff) | |
parent | 031dea1df4b6213b9f9779a824fccc6d348b8648 (diff) |
Merge github.com:grpc/grpc into we-dont-need-no-backup
Conflicts:
src/core/surface/call.c
test/core/end2end/dualstack_socket_test.c
test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c
test/core/end2end/tests/early_server_shutdown_finishes_tags.c
test/core/end2end/tests/graceful_server_shutdown.c
test/core/end2end/tests/invoke_large_request.c
test/core/end2end/tests/max_concurrent_streams.c
test/core/end2end/tests/max_message_length.c
test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
test/core/end2end/tests/request_response_with_metadata_and_payload.c
test/core/end2end/tests/request_response_with_payload.c
test/core/end2end/tests/request_response_with_payload_and_call_creds.c
test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
test/core/end2end/tests/request_with_large_metadata.c
test/core/end2end/tests/request_with_payload.c
test/core/httpcli/httpcli_test.c
tools/run_tests/run_tests.py
Diffstat (limited to 'src')
106 files changed, 1329 insertions, 798 deletions
diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc index 82dd06bcec..5dd078b303 100644 --- a/src/compiler/csharp_generator.cc +++ b/src/compiler/csharp_generator.cc @@ -51,20 +51,49 @@ using grpc_generator::METHODTYPE_NO_STREAMING; using grpc_generator::METHODTYPE_CLIENT_STREAMING; using grpc_generator::METHODTYPE_SERVER_STREAMING; using grpc_generator::METHODTYPE_BIDI_STREAMING; +using grpc_generator::StringReplace; using std::map; using std::vector; namespace grpc_csharp_generator { namespace { -std::string GetCSharpNamespace(const FileDescriptor* file) { - // TODO(jtattermusch): this should be based on csharp_namespace option +// TODO(jtattermusch): make GetFileNamespace part of libprotoc public API. +// NOTE: Implementation needs to match exactly to GetFileNamespace +// defined in csharp_helpers.h in protoc csharp plugin. +// We cannot reference it directly because google3 protobufs +// don't have a csharp protoc plugin. +std::string GetFileNamespace(const FileDescriptor* file) { + if (file->options().has_csharp_namespace()) { + return file->options().csharp_namespace(); + } return file->package(); } -std::string GetMessageType(const Descriptor* message) { - // TODO(jtattermusch): this has to match with C# protobuf generator - return message->name(); +std::string ToCSharpName(const std::string& name, const FileDescriptor* file) { + std::string result = GetFileNamespace(file); + if (result != "") { + result += '.'; + } + std::string classname; + if (file->package().empty()) { + classname = name; + } else { + // Strip the proto package from full_name since we've replaced it with + // the C# namespace. + classname = name.substr(file->package().size() + 1); + } + result += StringReplace(classname, ".", ".Types.", false); + return "global::" + result; +} + +// TODO(jtattermusch): make GetClassName part of libprotoc public API. +// NOTE: Implementation needs to match exactly to GetClassName +// defined in csharp_helpers.h in protoc csharp plugin. +// We cannot reference it directly because google3 protobufs +// don't have a csharp protoc plugin. +std::string GetClassName(const Descriptor* message) { + return ToCSharpName(message->full_name(), message->file()); } std::string GetServiceClassName(const ServiceDescriptor* service) { @@ -114,22 +143,22 @@ std::string GetMethodRequestParamMaybe(const MethodDescriptor *method) { if (method->client_streaming()) { return ""; } - return GetMessageType(method->input_type()) + " request, "; + return GetClassName(method->input_type()) + " request, "; } std::string GetMethodReturnTypeClient(const MethodDescriptor *method) { switch (GetMethodType(method)) { case METHODTYPE_NO_STREAMING: - return "Task<" + GetMessageType(method->output_type()) + ">"; + return "Task<" + GetClassName(method->output_type()) + ">"; case METHODTYPE_CLIENT_STREAMING: - return "AsyncClientStreamingCall<" + GetMessageType(method->input_type()) - + ", " + GetMessageType(method->output_type()) + ">"; + return "AsyncClientStreamingCall<" + GetClassName(method->input_type()) + + ", " + GetClassName(method->output_type()) + ">"; case METHODTYPE_SERVER_STREAMING: - return "AsyncServerStreamingCall<" + GetMessageType(method->output_type()) + return "AsyncServerStreamingCall<" + GetClassName(method->output_type()) + ">"; case METHODTYPE_BIDI_STREAMING: - return "AsyncDuplexStreamingCall<" + GetMessageType(method->input_type()) - + ", " + GetMessageType(method->output_type()) + ">"; + return "AsyncDuplexStreamingCall<" + GetClassName(method->input_type()) + + ", " + GetClassName(method->output_type()) + ">"; } GOOGLE_LOG(FATAL)<< "Can't get here."; return ""; @@ -139,10 +168,10 @@ std::string GetMethodRequestParamServer(const MethodDescriptor *method) { switch (GetMethodType(method)) { case METHODTYPE_NO_STREAMING: case METHODTYPE_SERVER_STREAMING: - return GetMessageType(method->input_type()) + " request"; + return GetClassName(method->input_type()) + " request"; case METHODTYPE_CLIENT_STREAMING: case METHODTYPE_BIDI_STREAMING: - return "IAsyncStreamReader<" + GetMessageType(method->input_type()) + return "IAsyncStreamReader<" + GetClassName(method->input_type()) + "> requestStream"; } GOOGLE_LOG(FATAL)<< "Can't get here."; @@ -153,7 +182,7 @@ std::string GetMethodReturnTypeServer(const MethodDescriptor *method) { switch (GetMethodType(method)) { case METHODTYPE_NO_STREAMING: case METHODTYPE_CLIENT_STREAMING: - return "Task<" + GetMessageType(method->output_type()) + ">"; + return "Task<" + GetClassName(method->output_type()) + ">"; case METHODTYPE_SERVER_STREAMING: case METHODTYPE_BIDI_STREAMING: return "Task"; @@ -169,7 +198,7 @@ std::string GetMethodResponseStreamMaybe(const MethodDescriptor *method) { return ""; case METHODTYPE_SERVER_STREAMING: case METHODTYPE_BIDI_STREAMING: - return ", IServerStreamWriter<" + GetMessageType(method->output_type()) + return ", IServerStreamWriter<" + GetClassName(method->output_type()) + "> responseStream"; } GOOGLE_LOG(FATAL)<< "Can't get here."; @@ -202,7 +231,7 @@ void GenerateMarshallerFields(Printer* out, const ServiceDescriptor *service) { out->Print( "static readonly Marshaller<$type$> $fieldname$ = Marshallers.Create((arg) => arg.ToByteArray(), $type$.ParseFrom);\n", "fieldname", GetMarshallerFieldName(message), "type", - GetMessageType(message)); + GetClassName(message)); } out->Print("\n"); } @@ -211,8 +240,8 @@ void GenerateStaticMethodField(Printer* out, const MethodDescriptor *method) { out->Print( "static readonly Method<$request$, $response$> $fieldname$ = new Method<$request$, $response$>(\n", "fieldname", GetMethodFieldName(method), "request", - GetMessageType(method->input_type()), "response", - GetMessageType(method->output_type())); + GetClassName(method->input_type()), "response", + GetClassName(method->output_type())); out->Indent(); out->Indent(); out->Print("$methodtype$,\n", "methodtype", @@ -242,8 +271,8 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) { out->Print( "$response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken));\n", "methodname", method->name(), "request", - GetMessageType(method->input_type()), "response", - GetMessageType(method->output_type())); + GetClassName(method->input_type()), "response", + GetClassName(method->output_type())); } std::string method_name = method->name(); @@ -310,8 +339,8 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) { out->Print( "public $response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken))\n", "methodname", method->name(), "request", - GetMessageType(method->input_type()), "response", - GetMessageType(method->output_type())); + GetClassName(method->input_type()), "response", + GetClassName(method->output_type())); out->Print("{\n"); out->Indent(); out->Print("var call = CreateCall($servicenamefield$, $methodfield$);\n", @@ -466,7 +495,7 @@ grpc::string GetServices(const FileDescriptor *file) { // TODO(jtattermusch): add using for protobuf message classes out.Print("\n"); - out.Print("namespace $namespace$ {\n", "namespace", GetCSharpNamespace(file)); + out.Print("namespace $namespace$ {\n", "namespace", GetFileNamespace(file)); out.Indent(); for (int i = 0; i < file->service_count(); i++) { GenerateService(&out, file->service(i)); diff --git a/src/compiler/generator_helpers.h b/src/compiler/generator_helpers.h index 7ce4ec526c..7bdaff1c9b 100644 --- a/src/compiler/generator_helpers.h +++ b/src/compiler/generator_helpers.h @@ -60,21 +60,26 @@ inline grpc::string StripProto(grpc::string filename) { } inline grpc::string StringReplace(grpc::string str, const grpc::string &from, - const grpc::string &to) { + const grpc::string &to, bool replace_all) { size_t pos = 0; - for (;;) { + do { pos = str.find(from, pos); if (pos == grpc::string::npos) { break; } str.replace(pos, from.length(), to); pos += to.length(); - } + } while(replace_all); return str; } +inline grpc::string StringReplace(grpc::string str, const grpc::string &from, + const grpc::string &to) { + return StringReplace(str, from, to, true); +} + inline std::vector<grpc::string> tokenize(const grpc::string &input, const grpc::string &delimiters) { std::vector<grpc::string> tokens; @@ -103,6 +108,14 @@ inline grpc::string CapitalizeFirstLetter(grpc::string s) { return s; } +inline grpc::string LowercaseFirstLetter(grpc::string s) { + if (s.empty()) { + return s; + } + s[0] = ::tolower(s[0]); + return s; +} + inline grpc::string LowerUnderscoreToUpperCamel(grpc::string str) { std::vector<grpc::string> tokens = tokenize(str, "_"); grpc::string result = ""; diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc index c68c9c37c2..8f35302bee 100644 --- a/src/compiler/objective_c_generator.cc +++ b/src/compiler/objective_c_generator.cc @@ -40,195 +40,200 @@ #include <sstream> +using ::grpc::protobuf::io::Printer; +using ::grpc::protobuf::MethodDescriptor; +using ::grpc::protobuf::ServiceDescriptor; +using ::std::map; +using ::grpc::string; + namespace grpc_objective_c_generator { namespace { -void PrintSimpleBlockSignature(grpc::protobuf::io::Printer *printer, - const grpc::protobuf::MethodDescriptor *method, - std::map<grpc::string, grpc::string> *vars) { - (*vars)["method_name"] = method->name(); - (*vars)["request_type"] = PrefixedName(method->input_type()->name()); - (*vars)["response_type"] = PrefixedName(method->output_type()->name()); +void PrintProtoRpcDeclarationAsPragma(Printer *printer, + const MethodDescriptor *method, + map<string, string> vars) { + vars["client_stream"] = method->client_streaming() ? "stream " : ""; + vars["server_stream"] = method->server_streaming() ? "stream " : ""; - if (method->server_streaming()) { - printer->Print("// When the response stream finishes, the handler is " - "called with nil for both arguments.\n\n"); - } else { - printer->Print("// The handler is only called once.\n\n"); - } - printer->Print(*vars, "- (id<GRXLiveSource>)$method_name$WithRequest:" - "($request_type$)request completionHandler:(void(^)" - "($response_type$ *, NSError *))handler"); + printer->Print(vars, + "#pragma mark $method_name$($client_stream$$request_type$)" + " returns ($server_stream$$response_type$)\n\n"); } -void PrintSimpleDelegateSignature(grpc::protobuf::io::Printer *printer, - const grpc::protobuf::MethodDescriptor *method, - std::map<grpc::string, grpc::string> *vars) { - (*vars)["method_name"] = method->name(); - (*vars)["request_type"] = PrefixedName(method->input_type()->name()); +void PrintMethodSignature(Printer *printer, + const MethodDescriptor *method, + const map<string, string>& vars) { + // TODO(jcanizales): Print method comments. - printer->Print(*vars, "- (id<GRXLiveSource>)$method_name$WithRequest:" - "($request_type$)request delegate:(id<GRXSink>)delegate"); -} + printer->Print(vars, "- ($return_type$)$method_name$With"); + if (method->client_streaming()) { + printer->Print("RequestsWriter:(id<GRXWriter>)request"); + } else { + printer->Print(vars, "Request:($prefix$$request_type$ *)request"); + } -void PrintAdvancedSignature(grpc::protobuf::io::Printer *printer, - const grpc::protobuf::MethodDescriptor *method, - std::map<grpc::string, grpc::string> *vars) { - (*vars)["method_name"] = method->name(); - printer->Print(*vars, "- (GRXSource *)$method_name$WithRequest:" - "(id<GRXSource>)request"); + // TODO(jcanizales): Put this on a new line and align colons. + // TODO(jcanizales): eventHandler for server streaming? + printer->Print(" handler:(void(^)("); + if (method->server_streaming()) { + printer->Print("BOOL done, "); + } + printer->Print(vars, + "$prefix$$response_type$ *response, NSError *error))handler"); } -void PrintSourceMethodSimpleBlock(grpc::protobuf::io::Printer *printer, - const grpc::protobuf::MethodDescriptor *method, - std::map<grpc::string, grpc::string> *vars) { - PrintSimpleBlockSignature(printer, method, vars); - - (*vars)["method_name"] = method->name(); - printer->Print(" {\n"); - printer->Indent(); - printer->Print(*vars, "return [[self $method_name$WithRequest:request] " - "connectHandler:^(id value, NSError *error) {\n"); - printer->Indent(); - printer->Print("handler(value, error);\n"); - printer->Outdent(); - printer->Print("}];\n"); - printer->Outdent(); - printer->Print("}\n"); +void PrintSimpleSignature(Printer *printer, + const MethodDescriptor *method, + map<string, string> vars) { + vars["method_name"] = + grpc_generator::LowercaseFirstLetter(vars["method_name"]); + vars["return_type"] = "void"; + PrintMethodSignature(printer, method, vars); } -void PrintSourceMethodSimpleDelegate(grpc::protobuf::io::Printer *printer, - const grpc::protobuf::MethodDescriptor *method, - std::map<grpc::string, grpc::string> *vars) { - PrintSimpleDelegateSignature(printer, method, vars); - - (*vars)["method_name"] = method->name(); - printer->Print(" {\n"); - printer->Indent(); - printer->Print(*vars, "return [[self $method_name$WithRequest:request]" - "connectToSink:delegate];\n"); - printer->Outdent(); - printer->Print("}\n"); +void PrintAdvancedSignature(Printer *printer, + const MethodDescriptor *method, + map<string, string> vars) { + vars["method_name"] = "RPCTo" + vars["method_name"]; + vars["return_type"] = "ProtoRPC *"; + PrintMethodSignature(printer, method, vars); } -void PrintSourceMethodAdvanced(grpc::protobuf::io::Printer *printer, - const grpc::protobuf::MethodDescriptor *method, - std::map<grpc::string, grpc::string> *vars) { +void PrintMethodDeclarations(Printer *printer, + const MethodDescriptor *method, + map<string, string> vars) { + vars["method_name"] = method->name(); + vars["request_type"] = method->input_type()->name(); + vars["response_type"] = method->output_type()->name(); + + PrintProtoRpcDeclarationAsPragma(printer, method, vars); + + PrintSimpleSignature(printer, method, vars); + printer->Print(";\n\n"); PrintAdvancedSignature(printer, method, vars); + printer->Print(";\n\n\n"); +} - (*vars)["method_name"] = method->name(); - printer->Print(" {\n"); - printer->Indent(); - printer->Print(*vars, "return [self $method_name$WithRequest:request " - "client:[self newClient]];\n"); - printer->Outdent(); +void PrintSimpleImplementation(Printer *printer, + const MethodDescriptor *method, + map<string, string> vars) { + printer->Print("{\n"); + printer->Print(vars, " [[self RPCTo$method_name$With"); + if (method->client_streaming()) { + printer->Print("RequestsWriter:request"); + } else { + printer->Print("Request:request"); + } + printer->Print(" handler:handler] start];\n"); printer->Print("}\n"); } -void PrintSourceMethodHandler(grpc::protobuf::io::Printer *printer, - const grpc::protobuf::MethodDescriptor *method, - std::map<grpc::string, grpc::string> *vars) { - (*vars)["method_name"] = method->name(); - (*vars)["response_type"] = PrefixedName(method->output_type()->name()); - (*vars)["caps_name"] = grpc_generator::CapitalizeFirstLetter(method->name()); - - printer->Print(*vars, "- (GRXSource *)$method_name$WithRequest:" - "(id<GRXSource>)request client:(PBgRPCClient *)client {\n"); - printer->Indent(); - printer->Print(*vars, - "return [self responseWithMethod:$@\"$caps_name\"\n"); - printer->Print(*vars, - " class:[$response_type$ class]\n"); - printer->Print(" request:request\n"); - printer->Print(" client:client];\n"); - printer->Outdent(); +void PrintAdvancedImplementation(Printer *printer, + const MethodDescriptor *method, + map<string, string> vars) { + printer->Print("{\n"); + printer->Print(vars, " return [self RPCToMethod:@\"$method_name$\"\n"); + + printer->Print(" requestsWriter:"); + if (method->client_streaming()) { + printer->Print("request\n"); + } else { + printer->Print("[GRXWriter writerWithValue:request]\n"); + } + + printer->Print(vars, + " responseClass:[$prefix$$response_type$ class]\n"); + + printer->Print(" responsesWriteable:[GRXWriteable "); + if (method->server_streaming()) { + printer->Print("writeableWithStreamHandler:handler]];\n"); + } else { + printer->Print("writeableWithSingleValueHandler:handler]];\n"); + } + printer->Print("}\n"); } +void PrintMethodImplementations(Printer *printer, + const MethodDescriptor *method, + map<string, string> vars) { + vars["method_name"] = method->name(); + vars["request_type"] = method->input_type()->name(); + vars["response_type"] = method->output_type()->name(); + + PrintProtoRpcDeclarationAsPragma(printer, method, vars); + + // TODO(jcanizales): Print documentation from the method. + PrintSimpleSignature(printer, method, vars); + PrintSimpleImplementation(printer, method, vars); + + printer->Print("// Returns a not-yet-started RPC object.\n"); + PrintAdvancedSignature(printer, method, vars); + PrintAdvancedImplementation(printer, method, vars); } -grpc::string GetHeader(const grpc::protobuf::ServiceDescriptor *service, - const grpc::string message_header) { - grpc::string output; +} // namespace + +string GetHeader(const ServiceDescriptor *service, const string prefix) { + string output; grpc::protobuf::io::StringOutputStream output_stream(&output); - grpc::protobuf::io::Printer printer(&output_stream, '$'); - std::map<grpc::string, grpc::string> vars; - printer.Print("#import \"PBgRPCClient.h\"\n"); - printer.Print("#import \"PBStub.h\"\n"); - vars["message_header"] = message_header; - printer.Print(vars, "#import \"$message_header$\"\n\n"); - printer.Print("@protocol GRXSource\n"); - printer.Print("@class GRXSource\n\n"); - vars["service_name"] = service->name(); - printer.Print("@protocol $service_name$Stub <NSObject>\n\n"); - printer.Print("#pragma mark Simple block handlers\n\n"); - for (int i = 0; i < service->method_count(); i++) { - PrintSimpleBlockSignature(&printer, service->method(i), &vars); - printer.Print(";\n"); - } - printer.Print("\n"); - printer.Print("#pragma mark Simple delegate handlers.\n\n"); - printer.Print("# TODO(jcanizales): Use high-level snippets to remove this duplication."); - for (int i = 0; i < service->method_count(); i++) { - PrintSimpleDelegateSignature(&printer, service->method(i), &vars); - printer.Print(";\n"); - } - printer.Print("\n"); - printer.Print("#pragma mark Advanced handlers.\n\n"); + Printer printer(&output_stream, '$'); + + printer.Print("@protocol GRXWriteable;\n"); + printer.Print("@protocol GRXWriter;\n\n"); + + map<string, string> vars = {{"service_name", service->name()}, + {"prefix", prefix}}; + printer.Print(vars, "@protocol $prefix$$service_name$ <NSObject>\n\n"); + for (int i = 0; i < service->method_count(); i++) { - PrintAdvancedSignature(&printer, service->method(i), &vars); - printer.Print(";\n"); + PrintMethodDeclarations(&printer, service->method(i), vars); } - printer.Print("\n"); printer.Print("@end\n\n"); - printer.Print("// Basic stub that only does marshalling and parsing\n"); - printer.Print(vars, "@interface $service_name$Stub :" - " PBStub<$service_name$Stub>\n"); - printer.Print("- (instancetype)initWithHost:(NSString *)host;\n"); + + printer.Print("// Basic service implementation, over gRPC, that only does" + " marshalling and parsing.\n"); + printer.Print(vars, "@interface $prefix$$service_name$ :" + " ProtoService<$prefix$$service_name$>\n"); + printer.Print("- (instancetype)initWithHost:(NSString *)host" + " NS_DESIGNATED_INITIALIZER;\n"); printer.Print("@end\n"); return output; } -grpc::string GetSource(const grpc::protobuf::ServiceDescriptor *service) { - grpc::string output; +string GetSource(const ServiceDescriptor *service, const string prefix) { + string output; grpc::protobuf::io::StringOutputStream output_stream(&output); - grpc::protobuf::io::Printer printer(&output_stream, '$'); - std::map<grpc::string, grpc::string> vars; - vars["service_name"] = service->name(); - printer.Print(vars, "#import \"$service_name$Stub.pb.h\"\n"); - printer.Print("#import \"PBGeneratedMessage+GRXSource.h\"\n\n"); - vars["full_name"] = service->full_name(); + Printer printer(&output_stream, '$'); + + map<string, string> vars = {{"service_name", service->name()}, + {"package", service->file()->package()}, + {"prefix", prefix}}; + + printer.Print(vars, + "static NSString *const kPackageName = @\"$package$\";\n"); printer.Print(vars, - "static NSString *const kInterface = @\"$full_name$\";\n"); - printer.Print("@implementation $service_name$Stub\n\n"); + "static NSString *const kServiceName = @\"$service_name$\";\n\n"); + + printer.Print(vars, "@implementation $prefix$$service_name$\n\n"); + + printer.Print("// Designated initializer\n"); printer.Print("- (instancetype)initWithHost:(NSString *)host {\n"); - printer.Indent(); - printer.Print("if ((self = [super initWithHost:host " - "interface:kInterface])) {\n"); - printer.Print("}\n"); - printer.Print("return self;\n"); - printer.Outdent(); + printer.Print(" return (self = [super initWithHost:host" + " packageName:kPackageName serviceName:kServiceName]);\n"); printer.Print("}\n\n"); - printer.Print("#pragma mark Simple block handlers.\n"); - for (int i = 0; i < service->method_count(); i++) { - PrintSourceMethodSimpleBlock(&printer, service->method(i), &vars); - } - printer.Print("\n"); - printer.Print("#pragma mark Simple delegate handlers.\n"); - for (int i = 0; i < service->method_count(); i++) { - PrintSourceMethodSimpleDelegate(&printer, service->method(i), &vars); - } - printer.Print("\n"); - printer.Print("#pragma mark Advanced handlers.\n"); - for (int i = 0; i < service->method_count(); i++) { - PrintSourceMethodAdvanced(&printer, service->method(i), &vars); - } - printer.Print("\n"); - printer.Print("#pragma mark Handlers for subclasses " - "(stub wrappers) to override.\n"); + printer.Print("// Override superclass initializer to disallow different" + " package and service names.\n"); + printer.Print("- (instancetype)initWithHost:(NSString *)host\n"); + printer.Print(" packageName:(NSString *)packageName\n"); + printer.Print(" serviceName:(NSString *)serviceName {\n"); + printer.Print(" return [self initWithHost:host];\n"); + printer.Print("}\n\n\n"); + for (int i = 0; i < service->method_count(); i++) { - PrintSourceMethodHandler(&printer, service->method(i), &vars); + PrintMethodImplementations(&printer, service->method(i), vars); } + printer.Print("@end\n"); return output; } diff --git a/src/compiler/objective_c_generator.h b/src/compiler/objective_c_generator.h index 93c730b34e..548e96fcf1 100644 --- a/src/compiler/objective_c_generator.h +++ b/src/compiler/objective_c_generator.h @@ -38,10 +38,15 @@ namespace grpc_objective_c_generator { +// Returns the content to be included in the "global_scope" insertion point of +// the generated header file. grpc::string GetHeader(const grpc::protobuf::ServiceDescriptor *service, - const grpc::string message_header); + const grpc::string prefix); -grpc::string GetSource(const grpc::protobuf::ServiceDescriptor *service); +// Returns the content to be included in the "global_scope" insertion point of +// the generated implementation file. +grpc::string GetSource(const grpc::protobuf::ServiceDescriptor *service, + const grpc::string prefix); } // namespace grpc_objective_c_generator diff --git a/src/compiler/objective_c_generator_helpers.h b/src/compiler/objective_c_generator_helpers.h index 6a7c13991f..d92a2b5e9a 100644 --- a/src/compiler/objective_c_generator_helpers.h +++ b/src/compiler/objective_c_generator_helpers.h @@ -40,18 +40,8 @@ namespace grpc_objective_c_generator { -const grpc::string prefix = "PBG"; - inline grpc::string MessageHeaderName(const grpc::protobuf::FileDescriptor *file) { - return grpc_generator::FileNameInUpperCamel(file) + ".pb.h"; -} - -inline grpc::string StubFileName(grpc::string service_name) { - return prefix + service_name + "Stub"; -} - -inline grpc::string PrefixedName(grpc::string name) { - return prefix + name; + return grpc_generator::FileNameInUpperCamel(file) + ".pbobjc.h"; } } diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc index eebce0cd20..3cb170e95c 100644 --- a/src/compiler/objective_c_plugin.cc +++ b/src/compiler/objective_c_plugin.cc @@ -39,54 +39,77 @@ #include "src/compiler/objective_c_generator.h" #include "src/compiler/objective_c_generator_helpers.h" +using ::grpc::string; + class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { public: ObjectiveCGrpcGenerator() {} virtual ~ObjectiveCGrpcGenerator() {} virtual bool Generate(const grpc::protobuf::FileDescriptor *file, - const grpc::string ¶meter, + const string ¶meter, grpc::protobuf::compiler::GeneratorContext *context, - grpc::string *error) const { + string *error) const { if (file->service_count() == 0) { // No services. Do nothing. return true; } - for (int i = 0; i < file->service_count(); i++) { - const grpc::protobuf::ServiceDescriptor *service = file->service(i); - grpc::string file_name = grpc_objective_c_generator::StubFileName( - service->name()); - - // Generate .pb.h - grpc::string header_code = grpc_objective_c_generator::GetHeader( - service, grpc_objective_c_generator::MessageHeaderName(file)); - std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> header_output( - context->Open(file_name + ".pb.h")); - grpc::protobuf::io::CodedOutputStream header_coded_out( - header_output.get()); - header_coded_out.WriteRaw(header_code.data(), header_code.size()); - - // Generate .pb.m - grpc::string source_code = grpc_objective_c_generator::GetSource(service); - std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> source_output( - context->Open(file_name + ".pb.m")); - grpc::protobuf::io::CodedOutputStream source_coded_out( - source_output.get()); - source_coded_out.WriteRaw(source_code.data(), source_code.size()); + string file_name = grpc_generator::FileNameInUpperCamel(file); + string prefix = file->options().objc_class_prefix(); + + { + // Generate .pbrpc.h + + string imports = string("#import \"") + file_name + ".pbobjc.h\"\n" + "#import <gRPC/ProtoService.h>\n"; + + // TODO(jcanizales): Instead forward-declare the input and output types + // and import the files in the .pbrpc.m + string proto_imports; + for (int i = 0; i < file->dependency_count(); i++) { + string header = grpc_objective_c_generator::MessageHeaderName( + file->dependency(i)); + proto_imports += string("#import \"") + header + "\"\n"; + } + + string declarations; + for (int i = 0; i < file->service_count(); i++) { + const grpc::protobuf::ServiceDescriptor *service = file->service(i); + declarations += grpc_objective_c_generator::GetHeader(service, prefix); + } + + Write(context, file_name + ".pbrpc.h", + imports + '\n' + proto_imports + '\n' + declarations); + } + + { + // Generate .pbrpc.m + + string imports = string("#import \"") + file_name + ".pbrpc.h\"\n" + "#import <gRPC/GRXWriteable.h>\n" + "#import <gRPC/GRXWriter+Immediate.h>\n" + "#import <gRPC/ProtoRPC.h>\n"; + + string definitions; + for (int i = 0; i < file->service_count(); i++) { + const grpc::protobuf::ServiceDescriptor *service = file->service(i); + definitions += grpc_objective_c_generator::GetSource(service, prefix); + } + + Write(context, file_name + ".pbrpc.m", imports + '\n' + definitions); } return true; } private: - // Insert the given code into the given file at the given insertion point. - void Insert(grpc::protobuf::compiler::GeneratorContext *context, - const grpc::string &filename, const grpc::string &insertion_point, - const grpc::string &code) const { + // Write the given code into the given file. + void Write(grpc::protobuf::compiler::GeneratorContext *context, + const string &filename, const string &code) const { std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output( - context->OpenForInsert(filename, insertion_point)); + context->Open(filename)); grpc::protobuf::io::CodedOutputStream coded_out(output.get()); coded_out.WriteRaw(code.data(), code.size()); } diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 1fa6c97cfb..7c8c80d162 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -68,7 +68,6 @@ static grpc_httpcli_post_override g_post_override = NULL; static void next_address(internal_request *req); static void finish(internal_request *req, int success) { - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); req->on_response(req->user_data, success ? &req->parser.r : NULL); grpc_httpcli_parser_destroy(&req->parser); if (req->addresses != NULL) { @@ -87,8 +86,6 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices, internal_request *req = user_data; size_t i; - gpr_log(GPR_DEBUG, "%s nslices=%d status=%d", __FUNCTION__, nslices, status); - for (i = 0; i < nslices; i++) { if (GPR_SLICE_LENGTH(slices[i])) { req->have_read_byte = 1; @@ -121,13 +118,11 @@ done: } static void on_written(internal_request *req) { - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); grpc_endpoint_notify_on_read(req->ep, on_read, req); } static void done_write(void *arg, grpc_endpoint_cb_status status) { internal_request *req = arg; - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); switch (status) { case GRPC_ENDPOINT_CB_OK: on_written(req); @@ -142,7 +137,6 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) { static void start_write(internal_request *req) { gpr_slice_ref(req->request_text); - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); switch ( grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) { case GRPC_ENDPOINT_WRITE_DONE: @@ -160,7 +154,6 @@ static void on_secure_transport_setup_done(void *rp, grpc_security_status status, grpc_endpoint *secure_endpoint) { internal_request *req = rp; - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); finish(req, 0); @@ -173,7 +166,6 @@ static void on_secure_transport_setup_done(void *rp, static void on_connected(void *arg, grpc_endpoint *tcp) { internal_request *req = arg; - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); if (!tcp) { next_address(req); return; @@ -201,7 +193,6 @@ static void on_connected(void *arg, grpc_endpoint *tcp) { static void next_address(internal_request *req) { grpc_resolved_address *addr; - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); if (req->next_address == req->addresses->naddrs) { finish(req, 0); return; @@ -214,7 +205,6 @@ static void next_address(internal_request *req) { static void on_resolved(void *arg, grpc_resolved_addresses *addresses) { internal_request *req = arg; - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); if (!addresses) { finish(req, 0); return; diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index 740bbe716e..3d202a5cc8 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -169,8 +169,7 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) { case AF_UNIX: return 1; default: - gpr_log(GPR_ERROR, "Unknown socket family %d in %s", addr->sa_family, - __FUNCTION__); + gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port", addr->sa_family); return 0; } } @@ -184,8 +183,7 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port) { ((struct sockaddr_in6 *)addr)->sin6_port = htons(port); return 1; default: - gpr_log(GPR_ERROR, "Unknown socket family %d in %s", addr->sa_family, - __FUNCTION__); + gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", addr->sa_family); return 0; } } diff --git a/src/core/support/subprocess_posix.c b/src/core/support/subprocess_posix.c index 642520bb47..b4631fa0ed 100644 --- a/src/core/support/subprocess_posix.c +++ b/src/core/support/subprocess_posix.c @@ -57,7 +57,7 @@ struct gpr_subprocess { const char *gpr_subprocess_binary_extension() { return ""; } -gpr_subprocess *gpr_subprocess_create(int argc, char **argv) { +gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) { gpr_subprocess *r; int pid; char **exec_args; @@ -92,7 +92,11 @@ void gpr_subprocess_destroy(gpr_subprocess *p) { int gpr_subprocess_join(gpr_subprocess *p) { int status; +retry: if (waitpid(p->pid, &status, 0) == -1) { + if (errno == EINTR) { + goto retry; + } gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno)); return -1; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index d403eeede6..4d2ba7cd7d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -401,6 +401,7 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static int need_more_data(grpc_call *call) { + if (call->read_state == READ_STATE_STREAM_CLOSED) return 0; return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) || is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || @@ -408,8 +409,7 @@ static int need_more_data(grpc_call *call) { is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || - (call->write_state == WRITE_STATE_INITIAL && !call->is_client && - call->read_state < READ_STATE_GOT_INITIAL_METADATA); + (call->write_state == WRITE_STATE_INITIAL && !call->is_client); } static void unlock(grpc_call *call) { @@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, switch ((grpc_ioreq_op)i) { case GRPC_IOREQ_RECV_MESSAGE: case GRPC_IOREQ_SEND_MESSAGE: - if (master->success) { - call->request_set[i] = REQSET_EMPTY; - } else { + call->request_set[i] = REQSET_EMPTY; + if (!master->success) { call->write_state = WRITE_STATE_WRITE_CLOSED; } break; @@ -583,6 +582,23 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) { } } +static void early_out_write_ops(grpc_call *call) { + switch (call->write_state) { + case WRITE_STATE_WRITE_CLOSED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + /* fallthrough */ + case WRITE_STATE_STARTED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); + /* fallthrough */ + case WRITE_STATE_INITIAL: + /* do nothing */ + break; + } +} + static void call_on_done_send(void *pc, int success) { grpc_call *call = pc; lock(call); @@ -601,7 +617,9 @@ static void call_on_done_send(void *pc, int success) { } if (!success) { call->write_state = WRITE_STATE_WRITE_CLOSED; + early_out_write_ops(call); } + call->send_ops.nops = 0; call->last_send_contains = 0; call->sending = 0; unlock(call); @@ -921,23 +939,6 @@ static void finish_read_ops(grpc_call *call) { } } -static void early_out_write_ops(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_WRITE_CLOSED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); - /* fallthrough */ - case WRITE_STATE_STARTED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); - /* fallthrough */ - case WRITE_STATE_INITIAL: - /* do nothing */ - break; - } -} - static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func completion, @@ -1178,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { + grpc_cq_end_op(call->cq, tag, call, success); +} + +static void finish_batch_with_close(grpc_call *call, int success, void *tag) { grpc_cq_end_op(call->cq, tag, call, 1); } @@ -1188,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t out; const grpc_op *op; grpc_ioreq *req; + void (*finish_func)(grpc_call *, int, void *) = finish_batch; GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); @@ -1271,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_status_on_client.trailing_metadata; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: req = &reqs[out++]; @@ -1280,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_close_on_server.cancelled; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; } } grpc_cq_begin_op(call->cq, call); - return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, + return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 351ed5b758..24a23ae5c4 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -427,6 +427,8 @@ static void server_on_recv(void *ptr, int success) { grpc_iomgr_add_callback(kill_zombie, elem); } else if (calld->state == PENDING) { call_list_remove(calld, PENDING_START); + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); } gpr_mu_unlock(&chand->server->mu); break; @@ -663,7 +665,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method, const char *host) { registered_method *m; if (!method) { - gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); + gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL"); return NULL; } for (m = server->registered_methods; m; m = m->next) { diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h index ac76c4cc9c..c9e3e13042 100644 --- a/src/core/transport/chttp2/frame.h +++ b/src/core/transport/chttp2/frame.h @@ -53,12 +53,14 @@ typedef struct { gpr_uint8 send_ping_ack; gpr_uint8 process_ping_reply; gpr_uint8 goaway; + gpr_uint8 rst_stream; gpr_int64 initial_window_update; gpr_uint32 window_update; gpr_uint32 goaway_last_stream_index; gpr_uint32 goaway_error; gpr_slice goaway_text; + gpr_uint32 rst_stream_reason; } grpc_chttp2_parse_state; #define GRPC_CHTTP2_FRAME_DATA 0 diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c index 368ca86481..3016aac7a2 100644 --- a/src/core/transport/chttp2/frame_rst_stream.c +++ b/src/core/transport/chttp2/frame_rst_stream.c @@ -32,6 +32,9 @@ */ #include "src/core/transport/chttp2/frame_rst_stream.h" + +#include <grpc/support/log.h> + #include "src/core/transport/chttp2/frame.h" gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) { @@ -54,3 +57,40 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) { return slice; } + +grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( + grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags) { + if (length != 4) { + gpr_log(GPR_ERROR, "invalid rst_stream: length=%d, flags=%02x", length, flags); + return GRPC_CHTTP2_CONNECTION_ERROR; + } + parser->byte = 0; + return GRPC_CHTTP2_PARSE_OK; +} + +grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( + void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, + int is_last) { + gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); + gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); + gpr_uint8 *cur = beg; + grpc_chttp2_rst_stream_parser *p = parser; + + while (p->byte != 4 && cur != end) { + p->reason_bytes[p->byte] = *cur; + cur++; + p->byte++; + } + + if (p->byte == 4) { + GPR_ASSERT(is_last); + state->rst_stream = 1; + state->rst_stream_reason = + (((gpr_uint32)p->reason_bytes[0]) << 24) | + (((gpr_uint32)p->reason_bytes[1]) << 16) | + (((gpr_uint32)p->reason_bytes[2]) << 8) | + (((gpr_uint32)p->reason_bytes[3])); + } + + return GRPC_CHTTP2_PARSE_OK; +} diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h index 2d3ee18637..07a3c98d03 100644 --- a/src/core/transport/chttp2/frame_rst_stream.h +++ b/src/core/transport/chttp2/frame_rst_stream.h @@ -35,7 +35,18 @@ #define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H #include <grpc/support/slice.h> +#include "src/core/transport/chttp2/frame.h" + +typedef struct { + gpr_uint8 byte; + gpr_uint8 reason_bytes[4]; +} grpc_chttp2_rst_stream_parser; gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code); +grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( + grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags); +grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( + void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */ diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index 3fd8f67226..a489543868 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -654,7 +654,7 @@ static int parse_stream_weight(grpc_chttp2_hpack_parser *p, return 1; } - return parse_begin(p, cur + 1, end); + return p->after_prioritization(p, cur + 1, end); } static int parse_stream_dep3(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, @@ -1349,7 +1349,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p, } void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) { - GPR_ASSERT(p->state == parse_begin); + p->after_prioritization = p->state; p->state = parse_stream_dep0; } diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h index bb4c1a1f49..bfc06b3980 100644 --- a/src/core/transport/chttp2/hpack_parser.h +++ b/src/core/transport/chttp2/hpack_parser.h @@ -62,6 +62,8 @@ struct grpc_chttp2_hpack_parser { grpc_chttp2_hpack_parser_state state; /* future states dependent on the opening op code */ const grpc_chttp2_hpack_parser_state *next_state; + /* what to do after skipping prioritization data */ + grpc_chttp2_hpack_parser_state after_prioritization; /* the value we're currently parsing */ union { gpr_uint32 *value; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index a6f9f782a1..9dc5f23389 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -154,7 +154,13 @@ typedef enum { WRITE_STATE_OPEN, WRITE_STATE_QUEUED_CLOSE, WRITE_STATE_SENT_CLOSE -} WRITE_STATE; +} write_state; + +typedef enum { + DONT_SEND_CLOSED = 0, + SEND_CLOSED, + SEND_CLOSED_WITH_RST_STREAM +} send_closed; typedef struct { stream *head; @@ -267,6 +273,7 @@ struct transport { grpc_chttp2_window_update_parser window_update; grpc_chttp2_settings_parser settings; grpc_chttp2_ping_parser ping; + grpc_chttp2_rst_stream_parser rst_stream; } simple_parsers; /* goaway */ @@ -312,8 +319,8 @@ struct stream { /* when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ - WRITE_STATE write_state; - gpr_uint8 send_closed; + write_state write_state; + send_closed send_closed; gpr_uint8 read_closed; gpr_uint8 cancelled; @@ -937,7 +944,11 @@ static int prepare_write(transport *t) { if (s->write_state == WRITE_STATE_QUEUED_CLOSE && s->outgoing_sopb->nops == 0) { - s->send_closed = 1; + if (!t->is_client && !s->read_closed) { + s->send_closed = SEND_CLOSED_WITH_RST_STREAM; + } else { + s->send_closed = SEND_CLOSED; + } } if (s->writing_sopb.nops > 0 || s->send_closed) { stream_list_join(t, s, WRITING); @@ -982,9 +993,12 @@ static void finalize_outbuf(transport *t) { while ((s = stream_list_remove_head(t, WRITING))) { grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops, - s->send_closed, s->id, &t->hpack_compressor, &t->outbuf); + s->send_closed != DONT_SEND_CLOSED, s->id, &t->hpack_compressor, &t->outbuf); s->writing_sopb.nops = 0; - if (s->send_closed) { + if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) { + gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR)); + } + if (s->send_closed != DONT_SEND_CLOSED) { stream_list_join(t, s, WRITTEN_CLOSED); } } @@ -999,9 +1013,10 @@ static void finish_write_common(transport *t, int success) { } while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) { s->write_state = WRITE_STATE_SENT_CLOSE; - if (1||!s->cancelled) { - maybe_finish_read(t, s); + if (!t->is_client) { + s->read_closed = 1; } + maybe_finish_read(t, s); } t->outbuf.count = 0; t->outbuf.length = 0; @@ -1127,6 +1142,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { if (op->recv_ops) { GPR_ASSERT(s->incoming_sopb == NULL); + GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED); s->recv_done_closure.cb = op->on_done_recv; s->recv_done_closure.user_data = op->recv_user_data; s->incoming_sopb = op->recv_ops; @@ -1214,12 +1230,14 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, if (s) { /* clear out any unreported input & output: nobody cares anymore */ had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0; - schedule_nuke_sopb(t, &s->parser.incoming_sopb); - if (s->outgoing_sopb) { - schedule_nuke_sopb(t, s->outgoing_sopb); - s->outgoing_sopb = NULL; - stream_list_remove(t, s, WRITABLE); - schedule_cb(t, s->send_done_closure, 0); + if (error_code != GRPC_CHTTP2_NO_ERROR) { + schedule_nuke_sopb(t, &s->parser.incoming_sopb); + if (s->outgoing_sopb) { + schedule_nuke_sopb(t, s->outgoing_sopb); + s->outgoing_sopb = NULL; + stream_list_remove(t, s, WRITABLE); + schedule_cb(t, s->send_done_closure, 0); + } } if (s->cancelled) { send_rst = 0; @@ -1228,31 +1246,34 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, s->cancelled = 1; stream_list_join(t, s, CANCELLED); - gpr_ltoa(local_status, buffer); - add_incoming_metadata( - t, s, - grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); - if (!optional_message) { - switch (local_status) { - case GRPC_STATUS_CANCELLED: - add_incoming_metadata( - t, s, grpc_mdelem_from_strings(t->metadata_context, - "grpc-message", "Cancelled")); - break; - default: - break; - } - } else { + if (error_code != GRPC_CHTTP2_NO_ERROR) { + /* synthesize a status if we don't believe we'll get one */ + gpr_ltoa(local_status, buffer); add_incoming_metadata( t, s, - grpc_mdelem_from_metadata_strings( - t->metadata_context, - grpc_mdstr_from_string(t->metadata_context, "grpc-message"), - grpc_mdstr_ref(optional_message))); + grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); + if (!optional_message) { + switch (local_status) { + case GRPC_STATUS_CANCELLED: + add_incoming_metadata( + t, s, grpc_mdelem_from_strings(t->metadata_context, + "grpc-message", "Cancelled")); + break; + default: + break; + } + } else { + add_incoming_metadata( + t, s, + grpc_mdelem_from_metadata_strings( + t->metadata_context, + grpc_mdstr_from_string(t->metadata_context, "grpc-message"), + grpc_mdstr_ref(optional_message))); + } + add_metadata_batch(t, s); } - add_metadata_batch(t, s); - maybe_finish_read(t, s); } + maybe_finish_read(t, s); } if (!id) send_rst = 0; if (send_rst) { @@ -1527,6 +1548,19 @@ static int init_ping_parser(transport *t) { return ok; } +static int init_rst_stream_parser(transport *t) { + int ok = GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_rst_stream_parser_begin_frame(&t->simple_parsers.rst_stream, + t->incoming_frame_size, + t->incoming_frame_flags); + if (!ok) { + drop_connection(t); + } + t->parser = grpc_chttp2_rst_stream_parser_parse; + t->parser_data = &t->simple_parsers.rst_stream; + return ok; +} + static int init_goaway_parser(transport *t) { int ok = GRPC_CHTTP2_PARSE_OK == @@ -1581,12 +1615,7 @@ static int init_frame_parser(transport *t) { gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame"); return 0; case GRPC_CHTTP2_FRAME_RST_STREAM: - /* TODO(ctiller): actually parse the reason */ - cancel_stream_id( - t, t->incoming_stream_id, - grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL), - GRPC_CHTTP2_CANCEL, 0); - return init_skip_frame(t, 0); + return init_rst_stream_parser(t); case GRPC_CHTTP2_FRAME_SETTINGS: return init_settings_frame_parser(t); case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: @@ -1650,6 +1679,12 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { if (st.goaway) { add_goaway(t, st.goaway_error, st.goaway_text); } + if (st.rst_stream) { + cancel_stream_id( + t, t->incoming_stream_id, + grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason), + st.rst_stream_reason, 0); + } if (st.process_ping_reply) { for (i = 0; i < t->ping_count; i++) { if (0 == diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 62f4020d7e..e66b4ed2d8 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -149,7 +149,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { } buf.AddServerSendStatus(&ctx_.trailing_metadata_, status); call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + cq_.Pluck(&buf); /* status ignored */ void* ignored_tag; bool ignored_ok; cq_.Shutdown(); diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.csproj b/src/csharp/Grpc.Auth/Grpc.Auth.csproj index f7724ea643..e6abbbfdf0 100644 --- a/src/csharp/Grpc.Auth/Grpc.Auth.csproj +++ b/src/csharp/Grpc.Auth/Grpc.Auth.csproj @@ -10,6 +10,7 @@ <RootNamespace>Grpc.Auth</RootNamespace> <AssemblyName>Grpc.Auth</AssemblyName> <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <DocumentationFile>bin\$(Configuration)\Grpc.Auth.Xml</DocumentationFile> </PropertyGroup> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> <DebugSymbols>true</DebugSymbols> diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec index 28ec93d3c5..85aee35566 100644 --- a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec +++ b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec @@ -22,5 +22,8 @@ </metadata> <files> <file src="bin/Release/Grpc.Auth.dll" target="lib/net45" /> + <file src="bin/Release/Grpc.Auth.pdb" target="lib/net45" /> + <file src="bin/Release/Grpc.Auth.xml" target="lib/net45" /> + <file src="**\*.cs" target="src" /> </files> </package> diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index eac8d16fb1..62cb443272 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -34,6 +34,9 @@ <HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath> </Reference> <Reference Include="System" /> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="Properties\AssemblyInfo.cs" /> @@ -57,7 +60,5 @@ <ItemGroup> <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" /> </ItemGroup> - <ItemGroup> - <Folder Include="Internal\" /> - </ItemGroup> + <ItemGroup /> </Project>
\ No newline at end of file diff --git a/src/csharp/Grpc.Core.Tests/packages.config b/src/csharp/Grpc.Core.Tests/packages.config index c714ef3a23..28af8d78c6 100644 --- a/src/csharp/Grpc.Core.Tests/packages.config +++ b/src/csharp/Grpc.Core.Tests/packages.config @@ -1,4 +1,5 @@ <?xml version="1.0" encoding="utf-8"?> <packages> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="NUnit" version="2.6.4" targetFramework="net45" /> </packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index b95776f66d..d66b0d4974 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -40,33 +40,17 @@ namespace Grpc.Core /// <summary> /// Return type for client streaming calls. /// </summary> - public sealed class AsyncClientStreamingCall<TRequest, TResponse> - where TRequest : class - where TResponse : class + public sealed class AsyncClientStreamingCall<TRequest, TResponse> : IDisposable { readonly IClientStreamWriter<TRequest> requestStream; readonly Task<TResponse> result; + readonly Action disposeAction; - public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result) + public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction) { this.requestStream = requestStream; this.result = result; - } - - /// <summary> - /// Writes a request to RequestStream. - /// </summary> - public Task Write(TRequest message) - { - return requestStream.Write(message); - } - - /// <summary> - /// Closes the RequestStream. - /// </summary> - public Task Close() - { - return requestStream.Close(); + this.disposeAction = disposeAction; } /// <summary> @@ -99,5 +83,16 @@ namespace Grpc.Core { return result.GetAwaiter(); } + + /// <summary> + /// Provides means to provide after the call. + /// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index ee05437416..4c0d5936ac 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -40,42 +40,17 @@ namespace Grpc.Core /// <summary> /// Return type for bidirectional streaming calls. /// </summary> - public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> - where TRequest : class - where TResponse : class + public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> : IDisposable { readonly IClientStreamWriter<TRequest> requestStream; readonly IAsyncStreamReader<TResponse> responseStream; + readonly Action disposeAction; - public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream) + public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; - } - - /// <summary> - /// Writes a request to RequestStream. - /// </summary> - public Task Write(TRequest message) - { - return requestStream.Write(message); - } - - /// <summary> - /// Closes the RequestStream. - /// </summary> - public Task Close() - { - return requestStream.Close(); - } - - /// <summary> - /// Reads a response from ResponseStream. - /// </summary> - /// <returns></returns> - public Task<TResponse> ReadNext() - { - return responseStream.ReadNext(); + this.disposeAction = disposeAction; } /// <summary> @@ -99,5 +74,16 @@ namespace Grpc.Core return requestStream; } } + + /// <summary> + /// Provides means to cleanup after the call. + /// If the call has already finished normally (request stream has been completed and response stream has been fully read), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index 73b9614985..7a479b9a23 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -40,23 +40,15 @@ namespace Grpc.Core /// <summary> /// Return type for server streaming calls. /// </summary> - public sealed class AsyncServerStreamingCall<TResponse> - where TResponse : class + public sealed class AsyncServerStreamingCall<TResponse> : IDisposable { readonly IAsyncStreamReader<TResponse> responseStream; + readonly Action disposeAction; - public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream) + public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction) { this.responseStream = responseStream; - } - - /// <summary> - /// Reads the next response from ResponseStream - /// </summary> - /// <returns></returns> - public Task<TResponse> ReadNext() - { - return responseStream.ReadNext(); + this.disposeAction = disposeAction; } /// <summary> @@ -69,5 +61,16 @@ namespace Grpc.Core return responseStream; } } + + /// <summary> + /// Provides means to cleanup after the call. + /// If the call has already finished normally (response stream has been fully read), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } } } diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs index d1ee59ff0a..37b452f020 100644 --- a/src/csharp/Grpc.Core/Call.cs +++ b/src/csharp/Grpc.Core/Call.cs @@ -41,8 +41,6 @@ namespace Grpc.Core /// Abstraction of a call to be invoked on a client. /// </summary> public class Call<TRequest, TResponse> - where TRequest : class - where TResponse : class { readonly string name; readonly Marshaller<TRequest> requestMarshaller; diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index ba42a2d4f8..9f8baac684 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -73,7 +73,7 @@ namespace Grpc.Core asyncCall.StartServerStreamingCall(req, call.Headers); RegisterCancellationCallback(asyncCall, token); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncServerStreamingCall<TResponse>(responseStream); + return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel); } public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) @@ -85,7 +85,7 @@ namespace Grpc.Core var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers); RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); - return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask); + return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel); } public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) @@ -98,7 +98,7 @@ namespace Grpc.Core RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream); + return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel); } private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token) diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index f5f2cf5f22..fe2d446a35 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -13,6 +13,7 @@ <AssemblyName>Grpc.Core</AssemblyName> <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> <NuGetPackageImportStamp>8bb563fb</NuGetPackageImportStamp> + <DocumentationFile>bin\$(Configuration)\Grpc.Core.Xml</DocumentationFile> </PropertyGroup> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> <DebugSymbols>true</DebugSymbols> @@ -37,6 +38,9 @@ <Reference Include="System.Collections.Immutable"> <HintPath>..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="AsyncDuplexStreamingCall.cs" /> diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec index e54908cb8b..69e8497bb7 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.nuspec +++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec @@ -16,10 +16,14 @@ <tags>gRPC RPC Protocol HTTP/2</tags> <dependencies> <dependency id="Microsoft.Bcl.Immutable" version="1.0.34" /> - <dependency id="grpc.native.csharp_ext" version="0.8.0.0" /> + <dependency id="Ix-Async" version="1.2.3" /> + <dependency id="grpc.native.csharp_ext" version="0.9.0.0" /> </dependencies> </metadata> <files> <file src="bin/Release/Grpc.Core.dll" target="lib/net45" /> + <file src="bin/Release/Grpc.Core.pdb" target="lib/net45" /> + <file src="bin/Release/Grpc.Core.xml" target="lib/net45" /> + <file src="**\*.cs" target="src" /> </files> </package> diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 699741cd05..371fbf27ce 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -43,13 +43,8 @@ namespace Grpc.Core /// A stream of messages to be read. /// </summary> /// <typeparam name="T"></typeparam> - public interface IAsyncStreamReader<T> - where T : class + public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse> { - /// <summary> - /// Reads a single message. Returns null if the last message was already read. - /// A following read can only be started when the previous one finishes. - /// </summary> - Task<T> ReadNext(); + // TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface. } } diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs index 4bd8bfb8df..2000210252 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs @@ -44,12 +44,11 @@ namespace Grpc.Core /// </summary> /// <typeparam name="T"></typeparam> public interface IAsyncStreamWriter<T> - where T : class { /// <summary> - /// Writes a single message. Only one write can be pending at a time. + /// Writes a single asynchronously. Only one write can be pending at a time. /// </summary> /// <param name="message">the message to be written. Cannot be null.</param> - Task Write(T message); + Task WriteAsync(T message); } } diff --git a/src/csharp/Grpc.Core/IClientStreamWriter.cs b/src/csharp/Grpc.Core/IClientStreamWriter.cs index 0847a928e6..a3028bc374 100644 --- a/src/csharp/Grpc.Core/IClientStreamWriter.cs +++ b/src/csharp/Grpc.Core/IClientStreamWriter.cs @@ -44,11 +44,10 @@ namespace Grpc.Core /// </summary> /// <typeparam name="T"></typeparam> public interface IClientStreamWriter<T> : IAsyncStreamWriter<T> - where T : class { /// <summary> - /// Closes the stream. Can only be called once there is no pending write. No writes should follow calling this. + /// Completes/closes the stream. Can only be called once there is no pending write. No writes should follow calling this. /// </summary> - Task Close(); + Task CompleteAsync(); } } diff --git a/src/csharp/Grpc.Core/IServerStreamWriter.cs b/src/csharp/Grpc.Core/IServerStreamWriter.cs index 199a585a3f..9f3af59109 100644 --- a/src/csharp/Grpc.Core/IServerStreamWriter.cs +++ b/src/csharp/Grpc.Core/IServerStreamWriter.cs @@ -43,7 +43,7 @@ namespace Grpc.Core /// A writable stream of messages that is used in server-side handlers. /// </summary> public interface IServerStreamWriter<T> : IAsyncStreamWriter<T> - where T : class { + // TODO(jtattermusch): consider just using IAsyncStreamWriter instead of this interface. } } diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 1697058732..58f493463b 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -38,8 +38,6 @@ namespace Grpc.Core.Internal /// Writes requests asynchronously to an underlying AsyncCall object. /// </summary> internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest> - where TRequest : class - where TResponse : class { readonly AsyncCall<TRequest, TResponse> call; @@ -48,14 +46,14 @@ namespace Grpc.Core.Internal this.call = call; } - public Task Write(TRequest message) + public Task WriteAsync(TRequest message) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendMessage(message, taskSource.CompletionDelegate); return taskSource.Task; } - public Task Close() + public Task CompleteAsync() { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendCloseFromClient(taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index b2378cade6..6c44521038 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCall<TRequest, TResponse> call; + TResponse current; public ClientResponseStream(AsyncCall<TRequest, TResponse> call) { this.call = call; } - public Task<TResponse> ReadNext() + public TResponse Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TResponse>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 95d8e97869..f494d9e0ff 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -71,12 +72,13 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context var result = await handler(context, request); - await responseStream.Write(result); + await responseStream.WriteAsync(result); } catch (Exception e) { @@ -85,7 +87,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -122,9 +124,10 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var request = await requestStream.ReadNext(); + Preconditions.CheckArgument(await requestStream.MoveNext()); + var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - Preconditions.CheckArgument(await requestStream.ReadNext() == null); + Preconditions.CheckArgument(!await requestStream.MoveNext()); var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context await handler(context, request, responseStream); @@ -137,7 +140,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -178,7 +181,7 @@ namespace Grpc.Core.Internal var result = await handler(context, requestStream); try { - await responseStream.Write(result); + await responseStream.WriteAsync(result); } catch (OperationCanceledException) { @@ -193,7 +196,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -240,7 +243,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatus(status); + await responseStream.WriteStatusAsync(status); } catch (OperationCanceledException) { @@ -263,7 +266,7 @@ namespace Grpc.Core.Internal var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall); var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); - await responseStream.WriteStatus(new Status(StatusCode.Unimplemented, "No such method.")); + await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method.")); // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed. await requestStream.ToList(); await finishedTask; diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index d9ee0c815b..3fccb88abb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -33,6 +33,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core.Internal @@ -42,17 +43,41 @@ namespace Grpc.Core.Internal where TResponse : class { readonly AsyncCallServer<TRequest, TResponse> call; + TRequest current; public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call) { this.call = call; } - public Task<TRequest> ReadNext() + public TRequest Current { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task<bool> MoveNext(CancellationToken token) + { + if (token != CancellationToken.None) + { + throw new InvalidOperationException("Cancellation of individual reads is not supported."); + } var taskSource = new AsyncCompletionTaskSource<TRequest>(); call.StartReadMessage(taskSource.CompletionDelegate); - return taskSource.Task; + var result = await taskSource.Task; + this.current = result; + return result != null; + } + + public void Dispose() + { + // TODO(jtattermusch): implement the semantics of stream disposal. } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index da688d504f..a2d77dd5b7 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -49,14 +49,14 @@ namespace Grpc.Core.Internal this.call = call; } - public Task Write(TResponse message) + public Task WriteAsync(TResponse message) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendMessage(message, taskSource.CompletionDelegate); return taskSource.Task; } - public Task WriteStatus(Status status) + public Task WriteStatusAsync(Status status) { var taskSource = new AsyncCompletionTaskSource<object>(); call.StartSendStatusFromServer(status, taskSource.CompletionDelegate); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 731ea2be81..7a1c016ae2 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -39,9 +39,6 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { - // TODO: we need to make sure that the delegates are not collected before invoked. - //internal delegate void ServerShutdownCallbackDelegate(bool success); - /// <summary> /// grpc_server from grpc/grpc.h /// </summary> diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index e873b3e88a..bc9a499c51 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -42,7 +42,6 @@ namespace Grpc.Core /// </summary> public sealed class ServerCallContext { - // TODO(jtattermusch): add cancellationToken // TODO(jtattermusch): add deadline info diff --git a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs index f915155f8a..8a748b45a8 100644 --- a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs +++ b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs @@ -49,14 +49,9 @@ namespace Grpc.Core.Utils public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction) where T : class { - while (true) + while (await streamReader.MoveNext()) { - var elem = await streamReader.ReadNext(); - if (elem == null) - { - break; - } - await asyncAction(elem); + await asyncAction(streamReader.Current); } } @@ -67,32 +62,27 @@ namespace Grpc.Core.Utils where T : class { var result = new List<T>(); - while (true) + while (await streamReader.MoveNext()) { - var elem = await streamReader.ReadNext(); - if (elem == null) - { - break; - } - result.Add(elem); + result.Add(streamReader.Current); } return result; } /// <summary> /// Writes all elements from given enumerable to the stream. - /// Closes the stream afterwards unless close = false. + /// Completes the stream afterwards unless close = false. /// </summary> - public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool close = true) + public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true) where T : class { foreach (var element in elements) { - await streamWriter.Write(element); + await streamWriter.WriteAsync(element); } - if (close) + if (complete) { - await streamWriter.Close(); + await streamWriter.CompleteAsync(); } } @@ -104,7 +94,7 @@ namespace Grpc.Core.Utils { foreach (var element in elements) { - await streamWriter.Write(element); + await streamWriter.WriteAsync(element); } } } diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config index 71967de56e..fb7eaaeeda 100644 --- a/src/csharp/Grpc.Core/packages.config +++ b/src/csharp/Grpc.Core/packages.config @@ -2,5 +2,6 @@ <packages> <package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" /> <package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="Microsoft.Bcl.Immutable" version="1.0.34" targetFramework="net45" /> </packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj index 87ccf07dd8..6e84add42b 100644 --- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj +++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj @@ -37,6 +37,10 @@ <Reference Include="Google.ProtocolBuffers"> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async, Version=1.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL"> + <SpecificVersion>False</SpecificVersion> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="Properties\AssemblyInfo.cs" /> diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index 4997d3aa42..5aa6f4162d 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -96,7 +96,19 @@ namespace math.Tests Assert.AreEqual(0, response.Remainder); } - // TODO(jtattermusch): test division by zero + [Test] + public void DivByZero() + { + try + { + DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build()); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); + } + } [Test] public void DivAsync() @@ -114,11 +126,12 @@ namespace math.Tests { Task.Run(async () => { - var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build()); - - var responses = await call.ResponseStream.ToList(); - CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 }, - responses.ConvertAll((n) => n.Num_)); + using (var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build())) + { + var responses = await call.ResponseStream.ToList(); + CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 }, + responses.ConvertAll((n) => n.Num_)); + } }).Wait(); } @@ -128,13 +141,15 @@ namespace math.Tests { Task.Run(async () => { - var call = client.Sum(); - var numbers = new List<long> { 10, 20, 30 }.ConvertAll( - n => Num.CreateBuilder().SetNum_(n).Build()); + using (var call = client.Sum()) + { + var numbers = new List<long> { 10, 20, 30 }.ConvertAll( + n => Num.CreateBuilder().SetNum_(n).Build()); - await call.RequestStream.WriteAll(numbers); - var result = await call.Result; - Assert.AreEqual(60, result.Num_); + await call.RequestStream.WriteAll(numbers); + var result = await call.Result; + Assert.AreEqual(60, result.Num_); + } }).Wait(); } @@ -150,12 +165,14 @@ namespace math.Tests new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build() }; - var call = client.DivMany(); - await call.RequestStream.WriteAll(divArgsList); - var result = await call.ResponseStream.ToList(); + using (var call = client.DivMany()) + { + await call.RequestStream.WriteAll(divArgsList); + var result = await call.ResponseStream.ToList(); - CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient)); - CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder)); + CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient)); + CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder)); + } }).Wait(); } } diff --git a/src/csharp/Grpc.Examples.Tests/packages.config b/src/csharp/Grpc.Examples.Tests/packages.config index 4d6ec63b3c..cc6e9af40f 100644 --- a/src/csharp/Grpc.Examples.Tests/packages.config +++ b/src/csharp/Grpc.Examples.Tests/packages.config @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.Examples/Grpc.Examples.csproj b/src/csharp/Grpc.Examples/Grpc.Examples.csproj index 2c5019c214..5ce490f403 100644 --- a/src/csharp/Grpc.Examples/Grpc.Examples.csproj +++ b/src/csharp/Grpc.Examples/Grpc.Examples.csproj @@ -35,6 +35,9 @@ <Reference Include="Google.ProtocolBuffers"> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="Properties\AssemblyInfo.cs" /> diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs index ab06a44c0d..d2cfbee18f 100644 --- a/src/csharp/Grpc.Examples/MathExamples.cs +++ b/src/csharp/Grpc.Examples/MathExamples.cs @@ -51,18 +51,13 @@ namespace math Console.WriteLine("DivAsync Result: " + result); } - public static async Task DivAsyncWithCancellationExample(Math.IMathClient stub) - { - Task<DivReply> resultTask = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); - DivReply result = await resultTask; - Console.WriteLine(result); - } - public static async Task FibExample(Math.IMathClient stub) { - var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build()); - List<Num> result = await call.ResponseStream.ToList(); - Console.WriteLine("Fib Result: " + string.Join("|", result)); + using (var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build())) + { + List<Num> result = await call.ResponseStream.ToList(); + Console.WriteLine("Fib Result: " + string.Join("|", result)); + } } public static async Task SumExample(Math.IMathClient stub) @@ -74,9 +69,11 @@ namespace math new Num.Builder { Num_ = 3 }.Build() }; - var call = stub.Sum(); - await call.RequestStream.WriteAll(numbers); - Console.WriteLine("Sum Result: " + await call.Result); + using (var call = stub.Sum()) + { + await call.RequestStream.WriteAll(numbers); + Console.WriteLine("Sum Result: " + await call.Result); + } } public static async Task DivManyExample(Math.IMathClient stub) @@ -87,9 +84,11 @@ namespace math new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(), new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build() }; - var call = stub.DivMany(); - await call.RequestStream.WriteAll(divArgsList); - Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList())); + using (var call = stub.DivMany()) + { + await call.RequestStream.WriteAll(divArgsList); + Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList())); + } } public static async Task DependendRequestsExample(Math.IMathClient stub) @@ -101,9 +100,12 @@ namespace math new Num.Builder { Num_ = 3 }.Build() }; - var sumCall = stub.Sum(); - await sumCall.RequestStream.WriteAll(numbers); - Num sum = await sumCall.Result; + Num sum; + using (var sumCall = stub.Sum()) + { + await sumCall.RequestStream.WriteAll(numbers); + sum = await sumCall.Result; + } DivReply result = await stub.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build()); Console.WriteLine("Avg Result: " + result); diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs index 2546fd220d..b9efc44e8c 100644 --- a/src/csharp/Grpc.Examples/MathGrpc.cs +++ b/src/csharp/Grpc.Examples/MathGrpc.cs @@ -12,30 +12,30 @@ namespace math { { static readonly string __ServiceName = "math.Math"; - static readonly Marshaller<DivArgs> __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom); - static readonly Marshaller<DivReply> __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom); - static readonly Marshaller<FibArgs> __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), FibArgs.ParseFrom); - static readonly Marshaller<Num> __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom); + static readonly Marshaller<global::math.DivArgs> __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivArgs.ParseFrom); + static readonly Marshaller<global::math.DivReply> __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivReply.ParseFrom); + static readonly Marshaller<global::math.FibArgs> __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.FibArgs.ParseFrom); + static readonly Marshaller<global::math.Num> __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), global::math.Num.ParseFrom); - static readonly Method<DivArgs, DivReply> __Method_Div = new Method<DivArgs, DivReply>( + static readonly Method<global::math.DivArgs, global::math.DivReply> __Method_Div = new Method<global::math.DivArgs, global::math.DivReply>( MethodType.Unary, "Div", __Marshaller_DivArgs, __Marshaller_DivReply); - static readonly Method<DivArgs, DivReply> __Method_DivMany = new Method<DivArgs, DivReply>( + static readonly Method<global::math.DivArgs, global::math.DivReply> __Method_DivMany = new Method<global::math.DivArgs, global::math.DivReply>( MethodType.DuplexStreaming, "DivMany", __Marshaller_DivArgs, __Marshaller_DivReply); - static readonly Method<FibArgs, Num> __Method_Fib = new Method<FibArgs, Num>( + static readonly Method<global::math.FibArgs, global::math.Num> __Method_Fib = new Method<global::math.FibArgs, global::math.Num>( MethodType.ServerStreaming, "Fib", __Marshaller_FibArgs, __Marshaller_Num); - static readonly Method<Num, Num> __Method_Sum = new Method<Num, Num>( + static readonly Method<global::math.Num, global::math.Num> __Method_Sum = new Method<global::math.Num, global::math.Num>( MethodType.ClientStreaming, "Sum", __Marshaller_Num, @@ -44,20 +44,20 @@ namespace math { // client-side stub interface public interface IMathClient { - DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken)); - Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)); - AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken)); - AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken)); - AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken)); + global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken)); + Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken)); + AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken)); + AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken)); } // server-side interface public interface IMath { - Task<DivReply> Div(ServerCallContext context, DivArgs request); - Task DivMany(ServerCallContext context, IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream); - Task Fib(ServerCallContext context, FibArgs request, IServerStreamWriter<Num> responseStream); - Task<Num> Sum(ServerCallContext context, IAsyncStreamReader<Num> requestStream); + Task<global::math.DivReply> Div(ServerCallContext context, global::math.DivArgs request); + Task DivMany(ServerCallContext context, IAsyncStreamReader<global::math.DivArgs> requestStream, IServerStreamWriter<global::math.DivReply> responseStream); + Task Fib(ServerCallContext context, global::math.FibArgs request, IServerStreamWriter<global::math.Num> responseStream); + Task<global::math.Num> Sum(ServerCallContext context, IAsyncStreamReader<global::math.Num> requestStream); } // client stub @@ -69,27 +69,27 @@ namespace math { public MathClient(Channel channel, StubConfiguration config) : base(channel, config) { } - public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken)) + public global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Div); return Calls.BlockingUnaryCall(call, request, token); } - public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)) + public Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Div); return Calls.AsyncUnaryCall(call, request, token); } - public AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_DivMany); return Calls.AsyncDuplexStreamingCall(call, token); } - public AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken)) + public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Fib); return Calls.AsyncServerStreamingCall(call, request, token); } - public AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken)) + public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Sum); return Calls.AsyncClientStreamingCall(call, token); diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs index 3b33b09bbd..e247ac9d73 100644 --- a/src/csharp/Grpc.Examples/MathServiceImpl.cs +++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs @@ -62,7 +62,7 @@ namespace math { foreach (var num in FibInternal(request.Limit)) { - await responseStream.Write(num); + await responseStream.WriteAsync(num); } } } @@ -81,7 +81,7 @@ namespace math { await requestStream.ForEach(async divArgs => { - await responseStream.Write(DivInternal(divArgs)); + await responseStream.WriteAsync(DivInternal(divArgs)); }); } diff --git a/src/csharp/Grpc.Examples/packages.config b/src/csharp/Grpc.Examples/packages.config index 51c17bcd5e..4c8d60fa62 100644 --- a/src/csharp/Grpc.Examples/packages.config +++ b/src/csharp/Grpc.Examples/packages.config @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="utf-8"?> <packages> <package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="NUnit" version="2.6.4" targetFramework="net45" /> </packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 1ca3dd24e1..b3a0a2917b 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -54,6 +54,9 @@ <Reference Include="Google.ProtocolBuffers"> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> </Reference> + <Reference Include="System.Interactive.Async"> + <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> + </Reference> <Reference Include="System.Net" /> <Reference Include="System.Net.Http" /> <Reference Include="System.Net.Http.Extensions"> diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 02f8a369de..dfaf18cae1 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -213,11 +213,13 @@ namespace Grpc.IntegrationTesting var bodySizes = new List<int> { 27182, 8, 1828, 45904 }.ConvertAll((size) => StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build()); - var call = client.StreamingInputCall(); - await call.RequestStream.WriteAll(bodySizes); + using (var call = client.StreamingInputCall()) + { + await call.RequestStream.WriteAll(bodySizes); - var response = await call.Result; - Assert.AreEqual(74922, response.AggregatedPayloadSize); + var response = await call.Result; + Assert.AreEqual(74922, response.AggregatedPayloadSize); + } Console.WriteLine("Passed!"); }).Wait(); } @@ -236,14 +238,15 @@ namespace Grpc.IntegrationTesting (size) => ResponseParameters.CreateBuilder().SetSize(size).Build())) .Build(); - var call = client.StreamingOutputCall(request); - - var responseList = await call.ResponseStream.ToList(); - foreach (var res in responseList) + using (var call = client.StreamingOutputCall(request)) { - Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type); + var responseList = await call.ResponseStream.ToList(); + foreach (var res in responseList) + { + Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type); + } + CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length)); } - CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length)); Console.WriteLine("Passed!"); }).Wait(); } @@ -254,51 +257,48 @@ namespace Grpc.IntegrationTesting { Console.WriteLine("running ping_pong"); - var call = client.FullDuplexCall(); - - StreamingOutputCallResponse response; - - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) - .SetPayload(CreateZerosPayload(27182)).Build()); - - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(31415, response.Payload.Body.Length); + using (var call = client.FullDuplexCall()) + { + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) + .SetPayload(CreateZerosPayload(27182)).Build()); - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9)) - .SetPayload(CreateZerosPayload(8)).Build()); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(9, response.Payload.Body.Length); + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9)) + .SetPayload(CreateZerosPayload(8)).Build()); - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653)) - .SetPayload(CreateZerosPayload(1828)).Build()); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(2653, response.Payload.Body.Length); + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653)) + .SetPayload(CreateZerosPayload(1828)).Build()); - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979)) - .SetPayload(CreateZerosPayload(45904)).Build()); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(58979, response.Payload.Body.Length); + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979)) + .SetPayload(CreateZerosPayload(45904)).Build()); - await call.RequestStream.Close(); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(null, response); + await call.RequestStream.CompleteAsync(); + Assert.IsFalse(await call.ResponseStream.MoveNext()); + } Console.WriteLine("Passed!"); }).Wait(); } @@ -308,12 +308,13 @@ namespace Grpc.IntegrationTesting Task.Run(async () => { Console.WriteLine("running empty_stream"); - var call = client.FullDuplexCall(); - await call.Close(); - - var responseList = await call.ResponseStream.ToList(); - Assert.AreEqual(0, responseList.Count); + using (var call = client.FullDuplexCall()) + { + await call.RequestStream.CompleteAsync(); + var responseList = await call.ResponseStream.ToList(); + Assert.AreEqual(0, responseList.Count); + } Console.WriteLine("Passed!"); }).Wait(); } @@ -365,19 +366,21 @@ namespace Grpc.IntegrationTesting Console.WriteLine("running cancel_after_begin"); var cts = new CancellationTokenSource(); - var call = client.StreamingInputCall(cts.Token); - // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. - await Task.Delay(1000); - cts.Cancel(); - - try + using (var call = client.StreamingInputCall(cts.Token)) { - var response = await call.Result; - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. + await Task.Delay(1000); + cts.Cancel(); + + try + { + var response = await call.Result; + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + } } Console.WriteLine("Passed!"); }).Wait(); @@ -390,29 +393,28 @@ namespace Grpc.IntegrationTesting Console.WriteLine("running cancel_after_first_response"); var cts = new CancellationTokenSource(); - var call = client.FullDuplexCall(cts.Token); - - StreamingOutputCallResponse response; - - await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() - .SetResponseType(PayloadType.COMPRESSABLE) - .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) - .SetPayload(CreateZerosPayload(27182)).Build()); + using (var call = client.FullDuplexCall(cts.Token)) + { + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetResponseType(PayloadType.COMPRESSABLE) + .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) + .SetPayload(CreateZerosPayload(27182)).Build()); - response = await call.ResponseStream.ReadNext(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(31415, response.Payload.Body.Length); + Assert.IsTrue(await call.ResponseStream.MoveNext()); + Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length); - cts.Cancel(); + cts.Cancel(); - try - { - response = await call.ResponseStream.ReadNext(); - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + try + { + await call.ResponseStream.MoveNext(); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); + } } Console.WriteLine("Passed!"); }).Wait(); diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs index 679aafb57a..ee077f9f56 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs @@ -12,45 +12,45 @@ namespace grpc.testing { { static readonly string __ServiceName = "grpc.testing.TestService"; - static readonly Marshaller<Empty> __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), Empty.ParseFrom); - static readonly Marshaller<SimpleRequest> __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), SimpleRequest.ParseFrom); - static readonly Marshaller<SimpleResponse> __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), SimpleResponse.ParseFrom); - static readonly Marshaller<StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallRequest.ParseFrom); - static readonly Marshaller<StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallResponse.ParseFrom); - static readonly Marshaller<StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallRequest.ParseFrom); - static readonly Marshaller<StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallResponse.ParseFrom); + static readonly Marshaller<global::grpc.testing.Empty> __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.Empty.ParseFrom); + static readonly Marshaller<global::grpc.testing.SimpleRequest> __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleRequest.ParseFrom); + static readonly Marshaller<global::grpc.testing.SimpleResponse> __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleResponse.ParseFrom); + static readonly Marshaller<global::grpc.testing.StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallRequest.ParseFrom); + static readonly Marshaller<global::grpc.testing.StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallResponse.ParseFrom); + static readonly Marshaller<global::grpc.testing.StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallRequest.ParseFrom); + static readonly Marshaller<global::grpc.testing.StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallResponse.ParseFrom); - static readonly Method<Empty, Empty> __Method_EmptyCall = new Method<Empty, Empty>( + static readonly Method<global::grpc.testing.Empty, global::grpc.testing.Empty> __Method_EmptyCall = new Method<global::grpc.testing.Empty, global::grpc.testing.Empty>( MethodType.Unary, "EmptyCall", __Marshaller_Empty, __Marshaller_Empty); - static readonly Method<SimpleRequest, SimpleResponse> __Method_UnaryCall = new Method<SimpleRequest, SimpleResponse>( + static readonly Method<global::grpc.testing.SimpleRequest, global::grpc.testing.SimpleResponse> __Method_UnaryCall = new Method<global::grpc.testing.SimpleRequest, global::grpc.testing.SimpleResponse>( MethodType.Unary, "UnaryCall", __Marshaller_SimpleRequest, __Marshaller_SimpleResponse); - static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( + static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>( MethodType.ServerStreaming, "StreamingOutputCall", __Marshaller_StreamingOutputCallRequest, __Marshaller_StreamingOutputCallResponse); - static readonly Method<StreamingInputCallRequest, StreamingInputCallResponse> __Method_StreamingInputCall = new Method<StreamingInputCallRequest, StreamingInputCallResponse>( + static readonly Method<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> __Method_StreamingInputCall = new Method<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse>( MethodType.ClientStreaming, "StreamingInputCall", __Marshaller_StreamingInputCallRequest, __Marshaller_StreamingInputCallResponse); - static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_FullDuplexCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( + static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_FullDuplexCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>( MethodType.DuplexStreaming, "FullDuplexCall", __Marshaller_StreamingOutputCallRequest, __Marshaller_StreamingOutputCallResponse); - static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_HalfDuplexCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( + static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_HalfDuplexCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>( MethodType.DuplexStreaming, "HalfDuplexCall", __Marshaller_StreamingOutputCallRequest, @@ -59,25 +59,25 @@ namespace grpc.testing { // client-side stub interface public interface ITestServiceClient { - Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken)); - Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken)); - SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken)); - Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)); - AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)); - AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)); - AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken)); - AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken)); + global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)); + Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)); + global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)); + Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)); + AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)); + AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken)); } // server-side interface public interface ITestService { - Task<Empty> EmptyCall(ServerCallContext context, Empty request); - Task<SimpleResponse> UnaryCall(ServerCallContext context, SimpleRequest request); - Task StreamingOutputCall(ServerCallContext context, StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream); - Task<StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<StreamingInputCallRequest> requestStream); - Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream); - Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream); + Task<global::grpc.testing.Empty> EmptyCall(ServerCallContext context, global::grpc.testing.Empty request); + Task<global::grpc.testing.SimpleResponse> UnaryCall(ServerCallContext context, global::grpc.testing.SimpleRequest request); + Task StreamingOutputCall(ServerCallContext context, global::grpc.testing.StreamingOutputCallRequest request, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream); + Task<global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingInputCallRequest> requestStream); + Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream); + Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream); } // client stub @@ -89,42 +89,42 @@ namespace grpc.testing { public TestServiceClient(Channel channel, StubConfiguration config) : base(channel, config) { } - public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken)) + public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_EmptyCall); return Calls.BlockingUnaryCall(call, request, token); } - public Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken)) + public Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_EmptyCall); return Calls.AsyncUnaryCall(call, request, token); } - public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken)) + public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_UnaryCall); return Calls.BlockingUnaryCall(call, request, token); } - public Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)) + public Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_UnaryCall); return Calls.AsyncUnaryCall(call, request, token); } - public AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)) + public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_StreamingOutputCall); return Calls.AsyncServerStreamingCall(call, request, token); } - public AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)) + public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_StreamingInputCall); return Calls.AsyncClientStreamingCall(call, token); } - public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_FullDuplexCall); return Calls.AsyncDuplexStreamingCall(call, token); } - public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_HalfDuplexCall); return Calls.AsyncDuplexStreamingCall(call, token); diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs index d6ba61ef82..6bd997d1f4 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs @@ -64,7 +64,7 @@ namespace grpc.testing { var response = StreamingOutputCallResponse.CreateBuilder() .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); - await responseStream.Write(response); + await responseStream.WriteAsync(response); } } @@ -86,7 +86,7 @@ namespace grpc.testing { var response = StreamingOutputCallResponse.CreateBuilder() .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); - await responseStream.Write(response); + await responseStream.WriteAsync(response); } }); } diff --git a/src/csharp/Grpc.IntegrationTesting/packages.config b/src/csharp/Grpc.IntegrationTesting/packages.config index e33b6e3e46..291b7b8599 100644 --- a/src/csharp/Grpc.IntegrationTesting/packages.config +++ b/src/csharp/Grpc.IntegrationTesting/packages.config @@ -3,6 +3,7 @@ <package id="Google.Apis.Auth" version="1.9.1" targetFramework="net45" /> <package id="Google.Apis.Core" version="1.9.1" targetFramework="net45" /> <package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" /> + <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> <package id="Microsoft.Bcl" version="1.1.9" targetFramework="net45" /> <package id="Microsoft.Bcl.Async" version="1.0.168" targetFramework="net45" /> <package id="Microsoft.Bcl.Build" version="1.0.14" targetFramework="net45" /> diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat index fe7e0a495f..7cb78bddf4 100644 --- a/src/csharp/build_packages.bat +++ b/src/csharp/build_packages.bat @@ -11,8 +11,8 @@ endlocal @call buildall.bat || goto :error %NUGET% pack ..\..\vsprojects\nuget_package\grpc.native.csharp_ext.nuspec || goto :error -%NUGET% pack Grpc.Core\Grpc.Core.nuspec || goto :error -%NUGET% pack Grpc.Auth\Grpc.Auth.nuspec || goto :error +%NUGET% pack Grpc.Core\Grpc.Core.nuspec -Symbols || goto :error +%NUGET% pack Grpc.Auth\Grpc.Auth.nuspec -Symbols || goto :error %NUGET% pack Grpc.nuspec || goto :error goto :EOF diff --git a/src/node/bin/README.md b/src/node/bin/README.md new file mode 100644 index 0000000000..2f856e428e --- /dev/null +++ b/src/node/bin/README.md @@ -0,0 +1,16 @@ +# Command Line Tools + +# Service Packager + +The command line tool `bin/service_packager`, when called with the following command line: + +```bash +service_packager proto_file -o output_path -n name -v version [-i input_path...] +``` + +Populates `output_path` with a node package consisting of a `package.json` populated with `name` and `version`, an `index.js`, a `LICENSE` file copied from gRPC, and a `service.json`, which is compiled from `proto_file` and the given `input_path`s. `require('output_path')` returns an object that is equivalent to + +```js +{ client: require('grpc').load('service.json'), + auth: require('google-auth-library') } +``` diff --git a/src/node/bin/service_packager b/src/node/bin/service_packager new file mode 100755 index 0000000000..c7f2460997 --- /dev/null +++ b/src/node/bin/service_packager @@ -0,0 +1,2 @@ +#!/usr/bin/env node +require(__dirname+'/../cli/service_packager.js').main(process.argv.slice(2));
\ No newline at end of file diff --git a/src/node/cli/service_packager.js b/src/node/cli/service_packager.js new file mode 100644 index 0000000000..f29180b252 --- /dev/null +++ b/src/node/cli/service_packager.js @@ -0,0 +1,142 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +'use strict'; + +var fs = require('fs'); +var path = require('path'); + +var _ = require('underscore'); +var async = require('async'); +var pbjs = require('protobufjs/cli/pbjs'); +var parseArgs = require('minimist'); +var Mustache = require('mustache'); + +var package_json = require('../package.json'); + +var template_path = path.resolve(__dirname, 'service_packager'); + +var package_tpl_path = path.join(template_path, 'package.json.template'); + +var arg_format = { + string: ['include', 'out', 'name', 'version'], + alias: { + include: 'i', + out: 'o', + name: 'n', + version: 'v' + } +}; + +// TODO(mlumish): autogenerate README.md from proto file + +/** + * Render package.json file from template using provided parameters. + * @param {Object} params Map of parameter names to values + * @param {function(Error, string)} callback Callback to pass rendered template + * text to + */ +function generatePackage(params, callback) { + fs.readFile(package_tpl_path, {encoding: 'utf-8'}, function(err, template) { + if (err) { + callback(err); + } else { + var rendered = Mustache.render(template, params); + callback(null, rendered); + } + }); +} + +/** + * Copy a file + * @param {string} src_path The filepath to copy from + * @param {string} dest_path The filepath to copy to + */ +function copyFile(src_path, dest_path) { + fs.createReadStream(src_path).pipe(fs.createWriteStream(dest_path)); +} + +/** + * Run the script. Copies the index.js and LICENSE files to the output path, + * renders the package.json template to the output path, and generates a + * service.json file from the input proto files using pbjs. The arguments are + * taken directly from the command line, and handled as follows: + * -i (--include) : An include path for pbjs (can be dpulicated) + * -o (--output): The output path + * -n (--name): The name of the package + * -v (--version): The package version + * @param {Array} argv The argument vector + */ +function main(argv) { + var args = parseArgs(argv, arg_format); + var out_path = path.resolve(args.out); + var include_dirs = []; + if (args.include) { + include_dirs = _.map(_.flatten([args.include]), function(p) { + return path.resolve(p); + }); + } + args.grpc_version = package_json.version; + generatePackage(args, function(err, rendered) { + if (err) throw err; + fs.writeFile(path.join(out_path, 'package.json'), rendered, function(err) { + if (err) throw err; + }); + }); + copyFile(path.join(template_path, 'index.js'), + path.join(out_path, 'index.js')); + copyFile(path.join(__dirname, '..', 'LICENSE'), + path.join(out_path, 'LICENSE')); + + var service_stream = fs.createWriteStream(path.join(out_path, + 'service.json')); + var pbjs_args = _.flatten(['node', 'pbjs', + args._[0], + '-legacy', + _.map(include_dirs, function(dir) { + return "-path=" + dir; + })]); + var old_stdout = process.stdout; + process.__defineGetter__('stdout', function() { + return service_stream; + }); + var pbjs_status = pbjs.main(pbjs_args); + process.__defineGetter__('stdout', function() { + return old_stdout; + }); + if (pbjs_status !== pbjs.STATUS_OK) { + throw new Error('pbjs failed with status code ' + pbjs_status); + } +} + +exports.main = main; diff --git a/src/node/cli/service_packager/index.js b/src/node/cli/service_packager/index.js new file mode 100644 index 0000000000..811e08b89a --- /dev/null +++ b/src/node/cli/service_packager/index.js @@ -0,0 +1,36 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +var grpc = require('grpc'); +exports.client = grpc.load(__dirname + '/service.json', 'json'); +exports.auth = require('google-auth-library'); diff --git a/src/node/cli/service_packager/package.json.template b/src/node/cli/service_packager/package.json.template new file mode 100644 index 0000000000..9f9019172e --- /dev/null +++ b/src/node/cli/service_packager/package.json.template @@ -0,0 +1,17 @@ +{ + "name": "{{{name}}}", + "version": "{{{version}}}", + "author": "Google Inc.", + "description": "Client library for {{{name}}} built on gRPC", + "license": "Apache-2.0", + "dependencies": { + "grpc": "{{{grpc_version}}}", + "google-auth-library": "^0.9.2" + }, + "main": "index.js", + "files": [ + "LICENSE", + "index.js", + "service.json" + ] +} diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 80f811901c..455055d9f3 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -154,13 +154,15 @@ function serverStreaming(client, done) { arg.response_parameters[resp_index].size); resp_index += 1; }); - call.on('status', function(status) { - assert.strictEqual(status.code, grpc.status.OK); + call.on('end', function() { assert.strictEqual(resp_index, 4); if (done) { done(); } }); + call.on('status', function(status) { + assert.strictEqual(status.code, grpc.status.OK); + }); } /** diff --git a/src/node/package.json b/src/node/package.json index 8d413c3ffa..3f31ba49ff 100644 --- a/src/node/package.json +++ b/src/node/package.json @@ -36,6 +36,7 @@ "jshint": "^2.5.0", "minimist": "^1.1.0", "mocha": "~1.21.0", + "mustache": "^2.0.0", "strftime": "^0.8.2" }, "engines": { @@ -46,6 +47,8 @@ "README.md", "index.js", "binding.gyp", + "bin", + "cli", "examples", "ext", "interop", diff --git a/src/node/src/client.js b/src/node/src/client.js index 46d476b9f4..65339406b2 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -81,7 +81,8 @@ function _write(chunk, encoding, callback) { batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); this.call.startBatch(batch, function(err, event) { if (err) { - throw err; + // Something has gone wrong. Stop writing by failing to call callback + return; } callback(); }); @@ -120,10 +121,8 @@ function _read(size) { */ function readCallback(err, event) { if (err) { - throw err; - } - if (self.finished) { - self.push(null); + // Something has gone wrong. Stop reading and wait for status + self.finished = true; return; } var data = event.read; @@ -237,10 +236,6 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { - if (err) { - callback(err); - return; - } emitter.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -248,6 +243,12 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; callback(error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + callback(err); + return; + } } emitter.emit('metadata', response.metadata); callback(null, deserialize(response.read)); @@ -300,7 +301,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; call.startBatch(metadata_batch, function(err, response) { if (err) { - callback(err); + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. return; } stream.emit('metadata', response.metadata); @@ -309,10 +311,6 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { - if (err) { - callback(err); - return; - } stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -320,6 +318,12 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; callback(error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + callback(err); + return; + } } callback(null, deserialize(response.read)); }); @@ -373,16 +377,15 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; call.startBatch(start_batch, function(err, response) { if (err) { - throw err; + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. + return; } stream.emit('metadata', response.metadata); }); var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { - if (err) { - throw err; - } stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -390,6 +393,12 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; stream.emit('error', error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + stream.emit('error', err); + return; + } } }); }); @@ -438,16 +447,15 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; call.startBatch(start_batch, function(err, response) { if (err) { - throw err; + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. + return; } stream.emit('metadata', response.metadata); }); var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { - if (err) { - throw err; - } stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -455,6 +463,12 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; stream.emit('error', error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + stream.emit('error', err); + return; + } } }); }); diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index 60e9861bc8..667852f382 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -286,20 +286,24 @@ describe('end-to-end', function() { assert.ifError(err); assert(response['send metadata']); assert.strictEqual(response.read.toString(), requests[0]); - var end_batch = {}; - end_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; - end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { - 'metadata': {}, - 'code': grpc.status.OK, - 'details': status_text - }; - end_batch[grpc.opType.RECV_MESSAGE] = true; - server_call.startBatch(end_batch, function(err, response) { + var snd_batch = {}; + snd_batch[grpc.opType.RECV_MESSAGE] = true; + server_call.startBatch(snd_batch, function(err, response) { assert.ifError(err); - assert(response['send status']); - assert(!response.cancelled); assert.strictEqual(response.read.toString(), requests[1]); - done(); + var end_batch = {}; + end_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { + 'metadata': {}, + 'code': grpc.status.OK, + 'details': status_text + }; + server_call.startBatch(end_batch, function(err, response) { + assert.ifError(err); + assert(response['send status']); + assert(!response.cancelled); + done(); + }); }); }); }); diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 0f458b40cd..a4a0ddb324 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -231,7 +231,7 @@ handler:resumingHandler]] errorHandler:errorHandler]; } -- (void)didReceiveValue:(id)value { +- (void)writeValue:(id)value { // TODO(jcanizales): Throw/assert if value isn't NSData. // Pause the input and only resume it when the C layer notifies us that writes @@ -255,7 +255,7 @@ errorHandler:errorHandler]; } -- (void)didFinishWithError:(NSError *)errorOrNil { +- (void)writesFinishedWithError:(NSError *)errorOrNil { if (errorOrNil) { [self cancel]; } else { @@ -306,7 +306,7 @@ - (void)startWithWriteable:(id<GRXWriteable>)writeable { // The following produces a retain cycle self:_responseWriteable:self, which is only - // broken when didFinishWithError: is sent to the wrapped writeable. + // broken when writesFinishedWithError: is sent to the wrapped writeable. // Care is taken not to retain self strongly in any of the blocks used in // the implementation of GRPCCall, so that the life of the instance is // determined by this retain cycle. diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h index 24c2b96729..1ef245fe37 100644 --- a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h +++ b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h @@ -38,11 +38,11 @@ // This is a thread-safe wrapper over a GRXWriteable instance. It lets one // enqueue calls to a GRXWriteable instance for the main thread, guaranteeing -// that didFinishWithError: is the last message sent to it (no matter what +// that writesFinishedWithError: is the last message sent to it (no matter what // messages are sent to the wrapper, in what order, nor from which thread). It // also guarantees that, if cancelWithError: is called from the main thread // (e.g. by the app cancelling the writes), no further messages are sent to the -// writeable except didFinishWithError:. +// writeable except writesFinishedWithError:. // // TODO(jcanizales): Let the user specify another queue for the writeable // callbacks. @@ -51,23 +51,22 @@ // The GRXWriteable passed is the wrapped writeable. // Both the GRXWriter instance and the GRXWriteable instance are retained until -// didFinishWithError: is sent to the writeable, and released after that. +// writesFinishedWithError: is sent to the writeable, and released after that. // This is used to create a retain cycle that keeps both objects alive until the // writing is explicitly finished. - (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(id<GRXWriter>)writer NS_DESIGNATED_INITIALIZER; -// Enqueues didReceiveValue: to be sent to the writeable in the main thread. -// The passed handler is invoked from the main thread after didReceiveValue: -// returns. +// Enqueues writeValue: to be sent to the writeable in the main thread. +// The passed handler is invoked from the main thread after writeValue: returns. - (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler; -// Enqueues didFinishWithError:nil to be sent to the writeable in the main +// Enqueues writesFinishedWithError:nil to be sent to the writeable in the main // thread. After that message is sent to the writeable, all other methods of // this object are effectively noops. - (void)enqueueSuccessfulCompletion; -// If the writeable has not yet received a didFinishWithError: message, this +// If the writeable has not yet received a writesFinishedWithError: message, this // will enqueue one to be sent to it in the main thread, and cancel all other // pending messages to the writeable enqueued by this object (both past and // future). @@ -75,7 +74,7 @@ - (void)cancelWithError:(NSError *)error; // Cancels all pending messages to the writeable enqueued by this object (both -// past and future). Because the writeable won't receive didFinishWithError:, +// past and future). Because the writeable won't receive writesFinishedWithError:, // this also releases the writeable and the writer. - (void)cancelSilently; @end diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m index ac444ef406..7d5ecb56d9 100644 --- a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m +++ b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m @@ -43,7 +43,7 @@ @implementation GRPCDelegateWrapper { dispatch_queue_t _writeableQueue; - // This ensures that didFinishWithError: is only sent once to the writeable. + // This ensures that writesFinishedWithError: is only sent once to the writeable. dispatch_once_t _alreadyFinished; } @@ -69,7 +69,7 @@ // the race. id<GRXWriteable> writeable = self.writeable; if (writeable) { - [writeable didReceiveValue:message]; + [writeable writeValue:message]; handler(); } }); @@ -80,7 +80,7 @@ dispatch_once(&_alreadyFinished, ^{ // Cancellation is now impossible. None of the other three blocks can run // concurrently with this one. - [self.writeable didFinishWithError:nil]; + [self.writeable writesFinishedWithError:nil]; // Break the retain cycle with writer, and skip any possible message to the // wrapped writeable enqueued after this one. self.writeable = nil; @@ -100,7 +100,7 @@ self.writeable = nil; dispatch_async(_writeableQueue, ^{ - [writeable didFinishWithError:error]; + [writeable writesFinishedWithError:error]; // Break the retain cycle with writer. self.writer = nil; }); diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m index c5051c0123..96608f2898 100644 --- a/src/objective-c/ProtoRPC/ProtoRPC.m +++ b/src/objective-c/ProtoRPC/ProtoRPC.m @@ -71,9 +71,9 @@ if ((self = [super initWithHost:host method:method requestsWriter:bytesWriter])) { // A writeable that parses the proto messages received. _responseWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { - [responsesWriteable didReceiveValue:[responseClass parseFromData:value]]; + [responsesWriteable writeValue:[responseClass parseFromData:value]]; } completionHandler:^(NSError *errorOrNil) { - [responsesWriteable didFinishWithError:errorOrNil]; + [responsesWriteable writesFinishedWithError:errorOrNil]; }]; } return self; diff --git a/src/objective-c/README.md b/src/objective-c/README.md index 05e9f2b4dc..49d7f43882 100644 --- a/src/objective-c/README.md +++ b/src/objective-c/README.md @@ -1,3 +1,47 @@ -gRPC implementation for Objective-C on iOS +# gRPC for Objective-C -This is a work in progress. +## How to generate a client library from a Protocol Buffers definition + +First install v3 of the Protocol Buffers compiler (_protoc_), by cloning [its Git repository](https://github.com/google/protobuf) and following these [installation instructions](https://github.com/google/protobuf#c-installation---unix) (the ones titled C++; don't miss the note for Mac users). + +Then clone this repository and execute the following commands from the root directory where it was cloned. + +Compile the gRPC plugins for _protoc_: +```sh +make plugins +``` + +Create a symbolic link to the compiled plugin binary somewhere in your `$PATH`: +```sh +ln -s `pwd`/bins/opt/grpc_objective_c_plugin /usr/local/bin/protoc-gen-objcgrpc +``` +(Notice that the name of the created link must begin with "protoc-gen-" for _protoc_ to recognize it as a plugin). + +If you don't want to create the symbolic link, you can alternatively copy the binary (with the appropriate name). Or you might prefer instead to specify the plugin's path as a flag when invoking _protoc_, in which case no system modification nor renaming is necessary. + +Finally, run _protoc_ with the following flags to generate the client library for your `.proto` files: + +```sh +protoc --objc_out=. --objcgrpc_out=. *.proto +``` + +This will generate a pair of `.pbobjc.h`/`.pbobjc.m` files for each `.proto` file, with the messages and enums defined in them. And a pair of `.pbrpc.h`/`.pbrpc.m` files for each `.proto` file with services defined. The latter contains the code to make remote calls to the specified API. + +## How to integrate a generated gRPC library in your project + +### If you use Cocoapods + +This is the recommended approach. + +You need to create a Podspec file for the generated library. This is simply a matter of copying an example like [this one](https://github.com/grpc/grpc/blob/master/src/objective-c/examples/Sample/RemoteTestClient/RemoteTest.podspec) to the directory where the source files were generated. Update the name and other metadata of the Podspec as suitable. + +Once your library has a Podspec, refer to it from your Podfile using `:path` as described [here](https://guides.cocoapods.org/using/the-podfile.html#using-the-files-from-a-folder-local-to-the-machine). + +### If you don't use Cocoapods + +You need to compile the generated `.pbpbjc.*` files (the enums and messages) without ARC support, and the generated `.pbrpc.*` files (the services) with ARC support. The generated code depends on v0.3+ of the Objective-C gRPC runtime library and v3.0+ of the Objective-C Protobuf runtime library. + +These libraries need to be integrated into your project as described in their respective Podspec files: + +* [Podspec](https://github.com/grpc/grpc/blob/master/gRPC.podspec) for the Objective-C gRPC runtime library. This can be tedious to configure manually. +* [Podspec](https://github.com/jcanizales/protobuf/blob/add-podspec/Protobuf.podspec) for the Objective-C Protobuf runtime library. diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index 4147362ba4..5e876a73bf 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -38,12 +38,12 @@ // A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed // to -startWithWriteable:). -// Once it is started, whatever values are written into it (via -didReceiveValue:) will be -// propagated immediately, unless flow control prevents it. +// Once it is started, whatever values are written into it (via -writeValue:) will be propagated +// immediately, unless flow control prevents it. // If it is throttled and keeps receiving values, as well as if it receives values before being // started, it will buffer them and propagate them in order as soon as its state becomes // GRXWriterStateStarted. -// If it receives an error (via -didFinishWithError:), it will drop any buffered values and +// If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and // propagate the error immediately. // // Beware that a pipe of this type can't prevent receiving more values when it is paused (for diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m index 3ef470f89f..4820c84af0 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.m +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -62,7 +62,7 @@ - (void)writeBufferUntilPausedOrStopped { while (_state == GRXWriterStateStarted && _queue.count > 0) { - [_writeable didReceiveValue:[self popValue]]; + [_writeable writeValue:[self popValue]]; } if (_inputIsFinished && _queue.count == 0) { // Our writer finished normally while we were paused or not-started-yet. @@ -77,10 +77,10 @@ return _state == GRXWriterStateStarted && _queue.count == 0; } -- (void)didReceiveValue:(id)value { +- (void)writeValue:(id)value { if (self.shouldFastForward) { // Skip the queue. - [_writeable didReceiveValue:value]; + [_writeable writeValue:value]; } else { // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. // So just buffer the new value. @@ -92,7 +92,7 @@ } } -- (void)didFinishWithError:(NSError *)errorOrNil { +- (void)writesFinishedWithError:(NSError *)errorOrNil { _inputIsFinished = YES; _errorOrNil = errorOrNil; if (errorOrNil || self.shouldFastForward) { @@ -140,7 +140,7 @@ - (void)finishWithError:(NSError *)errorOrNil { id<GRXWriteable> writeable = _writeable; self.state = GRXWriterStateFinished; - [writeable didFinishWithError:errorOrNil]; + [writeable writesFinishedWithError:errorOrNil]; } @end diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.m b/src/objective-c/RxLibrary/GRXImmediateWriter.m index 7468af557f..0b4979872e 100644 --- a/src/objective-c/RxLibrary/GRXImmediateWriter.m +++ b/src/objective-c/RxLibrary/GRXImmediateWriter.m @@ -109,7 +109,7 @@ - (void)writeUntilPausedOrStopped { id value; while (value = [_enumerator nextObject]) { - [_writeable didReceiveValue:value]; + [_writeable writeValue:value]; // If the writeable has a reference to us, it might change our state to paused or finished. if (_state == GRXWriterStatePaused || _state == GRXWriterStateFinished) { return; @@ -130,7 +130,7 @@ _errorOrNil = nil; id<GRXWriteable> writeable = _writeable; _writeable = nil; - [writeable didFinishWithError:errorOrNil]; + [writeable writesFinishedWithError:errorOrNil]; } - (void)setState:(GRXWriterState)newState { diff --git a/src/objective-c/RxLibrary/GRXWriteable.h b/src/objective-c/RxLibrary/GRXWriteable.h index 6f6ea142e0..216de30735 100644 --- a/src/objective-c/RxLibrary/GRXWriteable.h +++ b/src/objective-c/RxLibrary/GRXWriteable.h @@ -38,14 +38,12 @@ @protocol GRXWriteable <NSObject> // Push the next value of the sequence to the receiving object. -// TODO(jcanizales): Name it enumerator:(id<GRXEnumerator>) didProduceValue:(id)? -- (void)didReceiveValue:(id)value; +- (void)writeValue:(id)value; // Signal that the sequence is completed, or that an error ocurred. After this -// message is sent to the instance, neither it nor didReceiveValue: may be +// message is sent to the instance, neither it nor writeValue: may be // called again. -// TODO(jcanizales): enumerator:(id<GRXEnumerator>) didFinishWithError:(NSError*)? -- (void)didFinishWithError:(NSError *)errorOrNil; +- (void)writesFinishedWithError:(NSError *)errorOrNil; @end typedef void (^GRXValueHandler)(id value); diff --git a/src/objective-c/RxLibrary/GRXWriteable.m b/src/objective-c/RxLibrary/GRXWriteable.m index 7000a078d1..63f7c3e7f3 100644 --- a/src/objective-c/RxLibrary/GRXWriteable.m +++ b/src/objective-c/RxLibrary/GRXWriteable.m @@ -76,13 +76,13 @@ return self; } -- (void)didReceiveValue:(id)value { +- (void)writeValue:(id)value { if (_valueHandler) { _valueHandler(value); } } -- (void)didFinishWithError:(NSError *)errorOrNil { +- (void)writesFinishedWithError:(NSError *)errorOrNil { if (_completionHandler) { _completionHandler(errorOrNil); } diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h index 68c294f007..dcf44e3143 100644 --- a/src/objective-c/RxLibrary/GRXWriter.h +++ b/src/objective-c/RxLibrary/GRXWriter.h @@ -50,7 +50,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) { // The writer is temporarily paused, and won't send any more values to the // writeable unless its state is set back to Started. The writer might still // transition to the Finished state at any moment, and is allowed to send - // didFinishWithError: to its writeable. + // writesFinishedWithError: to its writeable. // // Not all implementations of writer have to support pausing, and thus // trying to set an writer's state to this value might have no effect. @@ -59,7 +59,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) { // The writer has released its writeable and won't interact with it anymore. // // One seldomly wants to set an writer's state to this value, as its - // writeable isn't notified with a didFinishWithError: message. Instead, sending + // writeable isn't notified with a writesFinishedWithError: message. Instead, sending // finishWithError: to the writer will make it notify the writeable and then // transition to this state. GRXWriterStateFinished @@ -105,7 +105,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) { // This method might only be called on writers in the NotStarted state. - (void)startWithWriteable:(id<GRXWriteable>)writeable; -// Send didFinishWithError:errorOrNil immediately to the writeable, and don't send +// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send // any more messages to it. // // This method might only be called on writers in the Started or Paused diff --git a/src/objective-c/RxLibrary/GRXWriter.m b/src/objective-c/RxLibrary/GRXWriter.m index b48a44f3a7..cc14383560 100644 --- a/src/objective-c/RxLibrary/GRXWriter.m +++ b/src/objective-c/RxLibrary/GRXWriter.m @@ -62,7 +62,7 @@ - (void)finishOutputWithError:(NSError *)errorOrNil { id<GRXWriteable> writeable = _writeable; _writeable = nil; - [writeable didFinishWithError:errorOrNil]; + [writeable writesFinishedWithError:errorOrNil]; } // This is used to stop the input writer. It nillifies our reference to it @@ -75,11 +75,11 @@ #pragma mark GRXWriteable implementation -- (void)didReceiveValue:(id)value { - [_writeable didReceiveValue:value]; +- (void)writeValue:(id)value { + [_writeable writeValue:value]; } -- (void)didFinishWithError:(NSError *)errorOrNil { +- (void)writesFinishedWithError:(NSError *)errorOrNil { _writer = nil; [self finishOutputWithError:errorOrNil]; } diff --git a/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m b/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m index 8a41c819a6..2cdfea1b67 100644 --- a/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m +++ b/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m @@ -57,7 +57,7 @@ static id (^kIdentity)(id value) = ^id(id value) { } // Override -- (void)didReceiveValue:(id)value { - [super didReceiveValue:_map(value)]; +- (void)writeValue:(id)value { + [super writeValue:_map(value)]; } @end diff --git a/src/objective-c/examples/Sample/Podfile b/src/objective-c/examples/Sample/Podfile index d30d9c5210..e8b78647ac 100644 --- a/src/objective-c/examples/Sample/Podfile +++ b/src/objective-c/examples/Sample/Podfile @@ -2,7 +2,7 @@ source 'https://github.com/CocoaPods/Specs.git' platform :ios, '8.0' pod 'gRPC', :path => "../../../.." -pod 'Protobuf', :git => 'https://github.com/jcanizales/protobuf.git', :branch => 'add-podspec' +pod 'Protobuf', :git => 'https://github.com/google/protobuf.git' pod 'Route_guide', :path => "RouteGuideClient" pod 'RemoteTest', :path => "RemoteTestClient" diff --git a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m index 7cebc0c2a7..8e0e11d23d 100644 --- a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m +++ b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m @@ -208,7 +208,7 @@ id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] requestedResponseSize:responses[index]]; - [requestsBuffer didReceiveValue:request]; + [requestsBuffer writeValue:request]; [_service fullDuplexCallWithRequestsWriter:requestsBuffer handler:^(BOOL done, @@ -225,9 +225,9 @@ if (index < 4) { id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index] requestedResponseSize:responses[index]]; - [requestsBuffer didReceiveValue:request]; + [requestsBuffer writeValue:request]; } else { - [requestsBuffer didFinishWithError:nil]; + [requestsBuffer writesFinishedWithError:nil]; } } @@ -269,4 +269,37 @@ [self waitForExpectationsWithTimeout:1 handler:nil]; } +- (void)testCancelAfterFirstResponseRPC { + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterFirstResponse"]; + + // A buffered pipe to which we write a single value but never close + GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init]; + + __block BOOL receivedResponse = NO; + + id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782 + requestedResponseSize:@31415]; + + [requestsBuffer writeValue:request]; + + __block ProtoRPC *call = [_service RPCToFullDuplexCallWithRequestsWriter:requestsBuffer + handler:^(BOOL done, + RMTStreamingOutputCallResponse *response, + NSError *error) { + if (receivedResponse) { + XCTAssert(done, @"Unexpected extra response %@", response); + XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED); + [expectation fulfill]; + } else { + XCTAssertNil(error, @"Finished with unexpected error: %@", error); + XCTAssertFalse(done, @"Finished without response"); + XCTAssertNotNil(response); + receivedResponse = YES; + [call cancel]; + } + }]; + [call start]; + [self waitForExpectationsWithTimeout:4 handler:nil]; +} + @end diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py index 923e889844..5fa974ed06 100644 --- a/src/python/src/grpc/_adapter/_face_test_case.py +++ b/src/python/src/grpc/_adapter/_face_test_case.py @@ -43,7 +43,7 @@ from grpc.framework.foundation import logging_pool _TIMEOUT = 3 _MAXIMUM_TIMEOUT = 90 -_MAXIMUM_POOL_SIZE = 400 +_MAXIMUM_POOL_SIZE = 4 class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage): diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py index 4987be389a..4fd76f60f8 100644 --- a/src/python/src/grpc/_adapter/_links_test.py +++ b/src/python/src/grpc/_adapter/_links_test.py @@ -54,8 +54,8 @@ def _transform_metadata(unused_metadata): class RoundTripTest(unittest.TestCase): def setUp(self): - self.fore_link_pool = logging_pool.pool(80) - self.rear_link_pool = logging_pool.pool(80) + self.fore_link_pool = logging_pool.pool(8) + self.rear_link_pool = logging_pool.pool(8) def tearDown(self): self.rear_link_pool.shutdown(wait=True) diff --git a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py index 25799d679c..bdb1ee2379 100644 --- a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py +++ b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py @@ -43,7 +43,7 @@ _TIMEOUT = 2 class LonelyRearLinkTest(unittest.TestCase): def setUp(self): - self.pool = logging_pool.pool(80) + self.pool = logging_pool.pool(8) def tearDown(self): self.pool.shutdown(wait=True) diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py index 05016cdaf3..69e145e3f6 100644 --- a/src/python/src/grpc/_adapter/fore.py +++ b/src/python/src/grpc/_adapter/fore.py @@ -41,7 +41,7 @@ from grpc.framework.base import null from grpc.framework.foundation import activated from grpc.framework.foundation import logging_pool -_THREAD_POOL_SIZE = 100 +_THREAD_POOL_SIZE = 10 @enum.unique diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index dd0a486117..b3b0b4ed32 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -41,7 +41,7 @@ from grpc.framework.base import null from grpc.framework.foundation import activated from grpc.framework.foundation import logging_pool -_THREAD_POOL_SIZE = 100 +_THREAD_POOL_SIZE = 10 _INVOCATION_EVENT_KINDS = ( _low.Event.Kind.METADATA_ACCEPTED, diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index f3f2a043eb..10919fae69 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -41,7 +41,7 @@ from grpc.framework.base import util as _base_utilities from grpc.framework.face import implementations as _face_implementations from grpc.framework.foundation import logging_pool -_THREAD_POOL_SIZE = 80 +_THREAD_POOL_SIZE = 8 _ONE_DAY_IN_SECONDS = 24 * 60 * 60 diff --git a/src/python/src/grpc/framework/base/implementations_test.py b/src/python/src/grpc/framework/base/implementations_test.py index 11e49caf75..d40bb4d92e 100644 --- a/src/python/src/grpc/framework/base/implementations_test.py +++ b/src/python/src/grpc/framework/base/implementations_test.py @@ -36,7 +36,7 @@ from grpc.framework.base import interfaces_test_case from grpc.framework.base import util from grpc.framework.foundation import logging_pool -POOL_MAX_WORKERS = 100 +POOL_MAX_WORKERS = 10 DEFAULT_TIMEOUT = 30 MAXIMUM_TIMEOUT = 60 diff --git a/src/python/src/grpc/framework/face/_calls.py b/src/python/src/grpc/framework/face/_calls.py index 75a550e3c7..87edeb0f0e 100644 --- a/src/python/src/grpc/framework/face/_calls.py +++ b/src/python/src/grpc/framework/face/_calls.py @@ -248,7 +248,7 @@ class _OperationFuture(future.Future): """See future.Future.add_done_callback for specification.""" with self._condition: if self._callbacks is not None: - self._callbacks.add(fn) + self._callbacks.append(fn) return callable_util.call_logging_exceptions(fn, _DONE_CALLBACK_LOG_MESSAGE, self) diff --git a/src/python/src/grpc/framework/face/_test_case.py b/src/python/src/grpc/framework/face/_test_case.py index b3a012db00..642d500628 100644 --- a/src/python/src/grpc/framework/face/_test_case.py +++ b/src/python/src/grpc/framework/face/_test_case.py @@ -35,7 +35,7 @@ from grpc.framework.face.testing import test_case from grpc.framework.foundation import logging_pool _TIMEOUT = 3 -_MAXIMUM_POOL_SIZE = 100 +_MAXIMUM_POOL_SIZE = 10 class FaceTestCase(test_case.FaceTestCase): diff --git a/src/python/src/grpc/framework/face/demonstration.py b/src/python/src/grpc/framework/face/demonstration.py index eabeac4569..f6b4b609ff 100644 --- a/src/python/src/grpc/framework/face/demonstration.py +++ b/src/python/src/grpc/framework/face/demonstration.py @@ -34,7 +34,7 @@ from grpc.framework.base import implementations as _base_implementations from grpc.framework.face import implementations from grpc.framework.foundation import logging_pool -_POOL_SIZE_LIMIT = 20 +_POOL_SIZE_LIMIT = 5 _MAXIMUM_TIMEOUT = 90 diff --git a/src/python/src/grpc/framework/face/testing/base_util.py b/src/python/src/grpc/framework/face/testing/base_util.py index 151d0ef793..1df1529b27 100644 --- a/src/python/src/grpc/framework/face/testing/base_util.py +++ b/src/python/src/grpc/framework/face/testing/base_util.py @@ -38,7 +38,7 @@ from grpc.framework.base import in_memory from grpc.framework.base import interfaces # pylint: disable=unused-import from grpc.framework.foundation import logging_pool -_POOL_SIZE_LIMIT = 20 +_POOL_SIZE_LIMIT = 5 _MAXIMUM_TIMEOUT = 90 diff --git a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py index 0d51b64f1b..21bf9a4248 100644 --- a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py +++ b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -44,7 +44,7 @@ from grpc.framework.foundation import future from grpc.framework.foundation import logging_pool _TIMEOUT = 3 -_MAXIMUM_POOL_SIZE = 100 +_MAXIMUM_POOL_SIZE = 10 class _PauseableIterator(object): diff --git a/src/ruby/bin/apis/pubsub_demo.rb b/src/ruby/bin/apis/pubsub_demo.rb index 6d69b0f21e..a039d036ac 100755 --- a/src/ruby/bin/apis/pubsub_demo.rb +++ b/src/ruby/bin/apis/pubsub_demo.rb @@ -79,7 +79,7 @@ end def publisher_stub(opts) address = "#{opts.host}:#{opts.port}" stub_clz = Tech::Pubsub::PublisherService::Stub # shorter - logger.info("... access PublisherService at #{address}") + GRPC.logger.info("... access PublisherService at #{address}") stub_clz.new(address, creds: ssl_creds, update_metadata: auth_proc(opts), GRPC::Core::Channel::SSL_TARGET => opts.host) @@ -89,7 +89,7 @@ end def subscriber_stub(opts) address = "#{opts.host}:#{opts.port}" stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter - logger.info("... access SubscriberService at #{address}") + GRPC.logger.info("... access SubscriberService at #{address}") stub_clz.new(address, creds: ssl_creds, update_metadata: auth_proc(opts), GRPC::Core::Channel::SSL_TARGET => opts.host) diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb index a388924722..8df03ffb3c 100755 --- a/src/ruby/bin/interop/interop_client.rb +++ b/src/ruby/bin/interop/interop_client.rb @@ -70,7 +70,7 @@ end # loads the certificates used to access the test server securely. def load_prod_cert fail 'could not find a production cert' if ENV['SSL_CERT_FILE'].nil? - logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}") + GRPC.logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}") File.open(ENV['SSL_CERT_FILE']).read end @@ -115,10 +115,10 @@ def create_stub(opts) stub_opts[:update_metadata] = auth_creds.updater_proc end - logger.info("... connecting securely to #{address}") + GRPC.logger.info("... connecting securely to #{address}") Grpc::Testing::TestService::Stub.new(address, **stub_opts) else - logger.info("... connecting insecurely to #{address}") + GRPC.logger.info("... connecting insecurely to #{address}") Grpc::Testing::TestService::Stub.new(address) end end diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb index 72570d92f3..78cb8dd836 100755 --- a/src/ruby/bin/interop/interop_server.rb +++ b/src/ruby/bin/interop/interop_server.rb @@ -129,13 +129,13 @@ class TestTarget < Grpc::Testing::TestService::Service Thread.new do begin reqs.each do |req| - logger.info("read #{req.inspect}") + GRPC.logger.info("read #{req.inspect}") resp_size = req.response_parameters[0].size resp = cls.new(payload: Payload.new(type: req.response_type, body: nulls(resp_size))) q.push(resp) end - logger.info('finished reads') + GRPC.logger.info('finished reads') q.push(self) rescue StandardError => e q.push(e) # share the exception with the enumerator @@ -179,10 +179,10 @@ def main s = GRPC::RpcServer.new if opts['secure'] s.add_http2_port(host, test_server_creds) - logger.info("... running securely on #{host}") + GRPC.logger.info("... running securely on #{host}") else s.add_http2_port(host) - logger.info("... running insecurely on #{host}") + GRPC.logger.info("... running insecurely on #{host}") end s.handle(TestTarget) s.run_till_terminated diff --git a/src/ruby/bin/math_client.rb b/src/ruby/bin/math_client.rb index db254efb00..6319cda309 100755 --- a/src/ruby/bin/math_client.rb +++ b/src/ruby/bin/math_client.rb @@ -46,51 +46,51 @@ require 'optparse' include GRPC::Core::TimeConsts def do_div(stub) - logger.info('request_response') - logger.info('----------------') + GRPC.logger.info('request_response') + GRPC.logger.info('----------------') req = Math::DivArgs.new(dividend: 7, divisor: 3) - logger.info("div(7/3): req=#{req.inspect}") + GRPC.logger.info("div(7/3): req=#{req.inspect}") resp = stub.div(req, INFINITE_FUTURE) - logger.info("Answer: #{resp.inspect}") - logger.info('----------------') + GRPC.logger.info("Answer: #{resp.inspect}") + GRPC.logger.info('----------------') end def do_sum(stub) # to make client streaming requests, pass an enumerable of the inputs - logger.info('client_streamer') - logger.info('---------------') + GRPC.logger.info('client_streamer') + GRPC.logger.info('---------------') reqs = [1, 2, 3, 4, 5].map { |x| Math::Num.new(num: x) } - logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}") + GRPC.logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}") resp = stub.sum(reqs) # reqs.is_a?(Enumerable) - logger.info("Answer: #{resp.inspect}") - logger.info('---------------') + GRPC.logger.info("Answer: #{resp.inspect}") + GRPC.logger.info('---------------') end def do_fib(stub) - logger.info('server_streamer') - logger.info('----------------') + GRPC.logger.info('server_streamer') + GRPC.logger.info('----------------') req = Math::FibArgs.new(limit: 11) - logger.info("fib(11): req=#{req.inspect}") + GRPC.logger.info("fib(11): req=#{req.inspect}") resp = stub.fib(req, INFINITE_FUTURE) resp.each do |r| - logger.info("Answer: #{r.inspect}") + GRPC.logger.info("Answer: #{r.inspect}") end - logger.info('----------------') + GRPC.logger.info('----------------') end def do_div_many(stub) - logger.info('bidi_streamer') - logger.info('-------------') + GRPC.logger.info('bidi_streamer') + GRPC.logger.info('-------------') reqs = [] reqs << Math::DivArgs.new(dividend: 7, divisor: 3) reqs << Math::DivArgs.new(dividend: 5, divisor: 2) reqs << Math::DivArgs.new(dividend: 7, divisor: 2) - logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}") + GRPC.logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}") resp = stub.div_many(reqs, 10) resp.each do |r| - logger.info("Answer: #{r.inspect}") + GRPC.logger.info("Answer: #{r.inspect}") end - logger.info('----------------') + GRPC.logger.info('----------------') end def load_test_certs @@ -132,10 +132,10 @@ def main p stub_opts p options['host'] stub = Math::Math::Stub.new(options['host'], **stub_opts) - logger.info("... connecting securely on #{options['host']}") + GRPC.logger.info("... connecting securely on #{options['host']}") else stub = Math::Math::Stub.new(options['host']) - logger.info("... connecting insecurely on #{options['host']}") + GRPC.logger.info("... connecting insecurely on #{options['host']}") end do_div(stub) diff --git a/src/ruby/bin/math_server.rb b/src/ruby/bin/math_server.rb index e46d9c671f..b41ccf6ce1 100755 --- a/src/ruby/bin/math_server.rb +++ b/src/ruby/bin/math_server.rb @@ -128,13 +128,13 @@ class Calculator < Math::Math::Service t = Thread.new do begin requests.each do |req| - logger.info("read #{req.inspect}") + GRPC.logger.info("read #{req.inspect}") resp = Math::DivReply.new(quotient: req.dividend / req.divisor, remainder: req.dividend % req.divisor) q.push(resp) Thread.pass # let the internal Bidi threads run end - logger.info('finished reads') + GRPC.logger.info('finished reads') q.push(self) rescue StandardError => e q.push(e) # share the exception with the enumerator @@ -176,10 +176,10 @@ def main s = GRPC::RpcServer.new if options['secure'] s.add_http2_port(options['host'], test_server_creds) - logger.info("... running securely on #{options['host']}") + GRPC.logger.info("... running securely on #{options['host']}") else s.add_http2_port(options['host']) - logger.info("... running insecurely on #{options['host']}") + GRPC.logger.info("... running insecurely on #{options['host']}") end s.handle(Calculator) diff --git a/src/ruby/bin/noproto_client.rb b/src/ruby/bin/noproto_client.rb index f3fd110347..390a9c59c3 100755 --- a/src/ruby/bin/noproto_client.rb +++ b/src/ruby/bin/noproto_client.rb @@ -94,15 +94,15 @@ def main p stub_opts p options['host'] stub = NoProtoStub.new(options['host'], **stub_opts) - logger.info("... connecting securely on #{options['host']}") + GRPC.logger.info("... connecting securely on #{options['host']}") else stub = NoProtoStub.new(options['host']) - logger.info("... connecting insecurely on #{options['host']}") + GRPC.logger.info("... connecting insecurely on #{options['host']}") end - logger.info('sending a NoProto rpc') + GRPC.logger.info('sending a NoProto rpc') resp = stub.an_rpc(NoProtoMsg.new) - logger.info("got a response: #{resp}") + GRPC.logger.info("got a response: #{resp}") end main diff --git a/src/ruby/bin/noproto_server.rb b/src/ruby/bin/noproto_server.rb index f71daeadb3..90baaf9a2e 100755 --- a/src/ruby/bin/noproto_server.rb +++ b/src/ruby/bin/noproto_server.rb @@ -63,7 +63,7 @@ class NoProto < NoProtoService end def an_rpc(req, _call) - logger.info('echo service received a request') + GRPC.logger.info('echo service received a request') req end end @@ -98,10 +98,10 @@ def main s = GRPC::RpcServer.new if options['secure'] s.add_http2_port(options['host'], test_server_creds) - logger.info("... running securely on #{options['host']}") + GRPC.logger.info("... running securely on #{options['host']}") else s.add_http2_port(options['host']) - logger.info("... running insecurely on #{options['host']}") + GRPC.logger.info("... running insecurely on #{options['host']}") end s.handle(NoProto) diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 947c39cd22..5f7beb5ab1 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -188,7 +188,7 @@ module GRPC # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - logger.debug("sending #{req}, marshalled? #{marshalled}") + GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") if marshalled payload = req else @@ -230,14 +230,14 @@ module GRPC @call.metadata = batch_result.metadata @metadata_tag = nil end - logger.debug("received req: #{batch_result}") + GRPC.logger.debug("received req: #{batch_result}") unless batch_result.nil? || batch_result.message.nil? - logger.debug("received req.to_s: #{batch_result.message}") + GRPC.logger.debug("received req.to_s: #{batch_result.message}") res = @unmarshal.call(batch_result.message) - logger.debug("received_req (unmarshalled): #{res.inspect}") + GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}") return res end - logger.debug('found nil; the final response has been sent') + GRPC.logger.debug('found nil; the final response has been sent') nil end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 4ca3004d6f..67143d40cf 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -115,10 +115,10 @@ module GRPC return enum_for(:each_queued_msg) unless block_given? count = 0 loop do - logger.debug("each_queued_msg: msg##{count}") + GRPC.logger.debug("each_queued_msg: msg##{count}") count += 1 req = @readq.pop - logger.debug("each_queued_msg: req = #{req}") + GRPC.logger.debug("each_queued_msg: req = #{req}") throw req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req @@ -134,22 +134,22 @@ module GRPC begin count = 0 requests.each do |req| - logger.debug("bidi-write_loop: #{count}") + GRPC.logger.debug("bidi-write_loop: #{count}") count += 1 payload = @marshal.call(req) @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_MESSAGE => payload) end if is_client - logger.debug("bidi-write-loop: sent #{count}, waiting to finish") + GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting") batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_CLOSE_FROM_CLIENT => nil, RECV_STATUS_ON_CLIENT => nil) batch_result.check_status end rescue StandardError => e - logger.warn('bidi-write_loop: failed') - logger.warn(e) + GRPC.logger.warn('bidi-write_loop: failed') + GRPC.logger.warn(e) raise e end end @@ -164,7 +164,7 @@ module GRPC # queue the initial read before beginning the loop loop do - logger.debug("bidi-read_loop: #{count}") + GRPC.logger.debug("bidi-read_loop: #{count}") count += 1 # TODO: ensure metadata is read if available, currently it's not batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, @@ -172,19 +172,19 @@ module GRPC # handle the next message if batch_result.message.nil? @readq.push(END_OF_READS) - logger.debug('bidi-read-loop: done reading!') + GRPC.logger.debug('bidi-read-loop: done reading!') break end # push the latest read onto the queue and continue reading - logger.debug("received req: #{batch_result.message}") + GRPC.logger.debug("received req: #{batch_result.message}") res = @unmarshal.call(batch_result.message) @readq.push(res) end rescue StandardError => e - logger.warn('bidi: read_loop failed') - logger.warn(e) + GRPC.logger.warn('bidi: read_loop failed') + GRPC.logger.warn(e) @readq.push(e) # let each_queued_msg terminate with this error end end diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index 10211ae239..2fd61c5f7e 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -84,22 +84,22 @@ module GRPC rescue BadStatus => e # this is raised by handlers that want GRPC to send an application error # code and detail message and some additional app-specific metadata. - logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}") + GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}") send_status(active_call, e.code, e.details, **e.metadata) rescue Core::CallError => e # This is raised by GRPC internals but should rarely, if ever happen. # Log it, but don't notify the other endpoint.. - logger.warn("failed call: #{active_call}\n#{e}") + GRPC.logger.warn("failed call: #{active_call}\n#{e}") rescue Core::OutOfTime # This is raised when active_call#method.call exceeeds the deadline # event. Send a status of deadline exceeded - logger.warn("late call: #{active_call}") + GRPC.logger.warn("late call: #{active_call}") send_status(active_call, DEADLINE_EXCEEDED, 'late') rescue StandardError => e # This will usuaally be an unhandled error in the handling code. # Send back a UNKNOWN status to the client - logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") - logger.warn(e) + GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN") + GRPC.logger.warn(e) send_status(active_call, UNKNOWN, 'no reason given') end @@ -139,8 +139,8 @@ module GRPC details = 'Not sure why' if details.nil? active_client.send_status(code, details, code == OK, **kw) rescue StandardError => e - logger.warn("Could not send status #{code}:#{details}") - logger.warn(e) + GRPC.logger.warn("Could not send status #{code}:#{details}") + GRPC.logger.warn(e) end end end diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index de22466089..665c144432 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -94,7 +94,7 @@ module GRPC def schedule(*args, &blk) fail 'already stopped' if @stopped return if blk.nil? - logger.info('schedule another job') + GRPC.logger.info('schedule another job') @jobs << [blk, args] end @@ -114,14 +114,14 @@ module GRPC # Stops the jobs in the pool def stop - logger.info('stopping, will wait for all the workers to exit') + GRPC.logger.info('stopping, will wait for all the workers to exit') @workers.size.times { schedule { throw :exit } } @stopped = true @stop_mutex.synchronize do # wait @keep_alive for works to stop @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers - logger.info('stopped, all workers are shutdown') + GRPC.logger.info('stopped, all workers are shutdown') end protected @@ -129,14 +129,14 @@ module GRPC # Forcibly shutdown any threads that are still alive. def forcibly_stop_workers return unless @workers.size > 0 - logger.info("forcibly terminating #{@workers.size} worker(s)") + GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)") @workers.each do |t| next unless t.alive? begin t.exit rescue StandardError => e - logger.warn('error while terminating a worker') - logger.warn(e) + GRPC.logger.warn('error while terminating a worker') + GRPC.logger.warn(e) end end end @@ -156,8 +156,8 @@ module GRPC blk, args = @jobs.pop blk.call(*args) rescue StandardError => e - logger.warn('Error in worker thread') - logger.warn(e) + GRPC.logger.warn('Error in worker thread') + GRPC.logger.warn(e) end end end @@ -365,7 +365,7 @@ module GRPC # the server to stop. def run if rpc_descs.size.zero? - logger.warn('did not run as no services were present') + GRPC.logger.warn('did not run as no services were present') return end @run_mutex.synchronize do @@ -381,9 +381,9 @@ module GRPC # Sends UNAVAILABLE if there are too many unprocessed jobs def available?(an_rpc) jobs_count, max = @pool.jobs_waiting, @max_waiting_requests - logger.info("waiting: #{jobs_count}, max: #{max}") + GRPC.logger.info("waiting: #{jobs_count}, max: #{max}") return an_rpc if @pool.jobs_waiting <= @max_waiting_requests - logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") + GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) c.send_status(StatusCodes::UNAVAILABLE, '') @@ -394,7 +394,7 @@ module GRPC def found?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) - logger.warn("NOT_FOUND: #{an_rpc}") + GRPC.logger.warn("NOT_FOUND: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) c.send_status(StatusCodes::NOT_FOUND, '') @@ -434,7 +434,7 @@ module GRPC return nil unless found?(an_rpc) # Create the ActiveCall - logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") + GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] ActiveCall.new(an_rpc.call, @cq, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), @@ -474,7 +474,7 @@ module GRPC else handlers[route] = service.method(rpc_name) end - logger.info("handling #{route} with #{handlers[route]}") + GRPC.logger.info("handling #{route} with #{handlers[route]}") end end end diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb index 8ea2c82f17..3b9743ea66 100644 --- a/src/ruby/lib/grpc/generic/service.rb +++ b/src/ruby/lib/grpc/generic/service.rb @@ -175,23 +175,23 @@ module GRPC route = "/#{route_prefix}/#{name}" if desc.request_response? define_method(mth_name) do |req, deadline = nil, **kw| - logger.debug("calling #{@host}:#{route}") + GRPC.logger.debug("calling #{@host}:#{route}") request_response(route, req, marshal, unmarshal, deadline, **kw) end elsif desc.client_streamer? define_method(mth_name) do |reqs, deadline = nil, **kw| - logger.debug("calling #{@host}:#{route}") + GRPC.logger.debug("calling #{@host}:#{route}") client_streamer(route, reqs, marshal, unmarshal, deadline, **kw) end elsif desc.server_streamer? define_method(mth_name) do |req, deadline = nil, **kw, &blk| - logger.debug("calling #{@host}:#{route}") + GRPC.logger.debug("calling #{@host}:#{route}") server_streamer(route, req, marshal, unmarshal, deadline, **kw, &blk) end else # is a bidi_stream define_method(mth_name) do |reqs, deadline = nil, **kw, &blk| - logger.debug("calling #{@host}:#{route}") + GRPC.logger.debug("calling #{@host}:#{route}") bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw, &blk) end diff --git a/src/ruby/lib/grpc/logconfig.rb b/src/ruby/lib/grpc/logconfig.rb index f36906fc45..96812170ba 100644 --- a/src/ruby/lib/grpc/logconfig.rb +++ b/src/ruby/lib/grpc/logconfig.rb @@ -29,7 +29,10 @@ require 'logging' -include Logging.globally # logger is accessible everywhere +# GRPC contains the General RPC module. +module GRPC + extend Logging.globally +end Logging.logger.root.appenders = Logging.appenders.stdout Logging.logger.root.level = :info diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 2cd21a15e3..640b0f656c 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -69,7 +69,7 @@ class EchoService end def an_rpc(req, call) - logger.info('echo service received a request') + GRPC.logger.info('echo service received a request') call.output_metadata.update(@trailing_metadata) @received_md << call.metadata unless call.metadata.nil? req @@ -109,7 +109,7 @@ class SlowService end def an_rpc(req, call) - logger.info("starting a slow #{@delay} rpc") + GRPC.logger.info("starting a slow #{@delay} rpc") sleep @delay @received_md << call.metadata unless call.metadata.nil? req # send back the req as the response |