aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-05-24 15:58:14 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-05-24 15:58:14 -0700
commit8b5276c02ec7aebdf749bfd95d140406e6ea2226 (patch)
tree8389493193029a3cb5c80b11f595bc9d4d5035d7 /src
parent24d115654056aa2be3d548d80ea97937eb15b2a1 (diff)
parent031dea1df4b6213b9f9779a824fccc6d348b8648 (diff)
Merge github.com:grpc/grpc into we-dont-need-no-backup
Conflicts: src/core/surface/call.c test/core/end2end/dualstack_socket_test.c test/core/end2end/tests/early_server_shutdown_finishes_inflight_calls.c test/core/end2end/tests/early_server_shutdown_finishes_tags.c test/core/end2end/tests/graceful_server_shutdown.c test/core/end2end/tests/invoke_large_request.c test/core/end2end/tests/max_concurrent_streams.c test/core/end2end/tests/max_message_length.c test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c test/core/end2end/tests/request_response_with_metadata_and_payload.c test/core/end2end/tests/request_response_with_payload.c test/core/end2end/tests/request_response_with_payload_and_call_creds.c test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c test/core/end2end/tests/request_with_large_metadata.c test/core/end2end/tests/request_with_payload.c test/core/httpcli/httpcli_test.c tools/run_tests/run_tests.py
Diffstat (limited to 'src')
-rw-r--r--src/compiler/csharp_generator.cc77
-rw-r--r--src/compiler/generator_helpers.h19
-rw-r--r--src/compiler/objective_c_generator.cc307
-rw-r--r--src/compiler/objective_c_generator.h9
-rw-r--r--src/compiler/objective_c_generator_helpers.h12
-rw-r--r--src/compiler/objective_c_plugin.cc79
-rw-r--r--src/core/httpcli/httpcli.c10
-rw-r--r--src/core/iomgr/sockaddr_utils.c6
-rw-r--r--src/core/support/subprocess_posix.c6
-rw-r--r--src/core/surface/call.c54
-rw-r--r--src/core/surface/server.c4
-rw-r--r--src/core/transport/chttp2/frame.h2
-rw-r--r--src/core/transport/chttp2/frame_rst_stream.c40
-rw-r--r--src/core/transport/chttp2/frame_rst_stream.h11
-rw-r--r--src/core/transport/chttp2/hpack_parser.c4
-rw-r--r--src/core/transport/chttp2/hpack_parser.h2
-rw-r--r--src/core/transport/chttp2_transport.c117
-rw-r--r--src/cpp/server/server.cc2
-rw-r--r--src/csharp/Grpc.Auth/Grpc.Auth.csproj1
-rw-r--r--src/csharp/Grpc.Auth/Grpc.Auth.nuspec3
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj7
-rw-r--r--src/csharp/Grpc.Core.Tests/packages.config1
-rw-r--r--src/csharp/Grpc.Core/AsyncClientStreamingCall.cs35
-rw-r--r--src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs44
-rw-r--r--src/csharp/Grpc.Core/AsyncServerStreamingCall.cs27
-rw-r--r--src/csharp/Grpc.Core/Call.cs2
-rw-r--r--src/csharp/Grpc.Core/Calls.cs6
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj4
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.nuspec6
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamReader.cs9
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamWriter.cs5
-rw-r--r--src/csharp/Grpc.Core/IClientStreamWriter.cs5
-rw-r--r--src/csharp/Grpc.Core/IServerStreamWriter.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs6
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs29
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs25
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRequestStream.cs29
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs3
-rw-r--r--src/csharp/Grpc.Core/ServerCallContext.cs1
-rw-r--r--src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs30
-rw-r--r--src/csharp/Grpc.Core/packages.config1
-rw-r--r--src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj4
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs51
-rw-r--r--src/csharp/Grpc.Examples.Tests/packages.config1
-rw-r--r--src/csharp/Grpc.Examples/Grpc.Examples.csproj3
-rw-r--r--src/csharp/Grpc.Examples/MathExamples.cs40
-rw-r--r--src/csharp/Grpc.Examples/MathGrpc.cs44
-rw-r--r--src/csharp/Grpc.Examples/MathServiceImpl.cs4
-rw-r--r--src/csharp/Grpc.Examples/packages.config1
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj3
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs168
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TestGrpc.cs70
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs4
-rw-r--r--src/csharp/Grpc.IntegrationTesting/packages.config1
-rw-r--r--src/csharp/build_packages.bat4
-rw-r--r--src/node/bin/README.md16
-rwxr-xr-xsrc/node/bin/service_packager2
-rw-r--r--src/node/cli/service_packager.js142
-rw-r--r--src/node/cli/service_packager/index.js36
-rw-r--r--src/node/cli/service_packager/package.json.template17
-rw-r--r--src/node/interop/interop_client.js6
-rw-r--r--src/node/package.json3
-rw-r--r--src/node/src/client.js58
-rw-r--r--src/node/test/end_to_end_test.js28
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m6
-rw-r--r--src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h17
-rw-r--r--src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m8
-rw-r--r--src/objective-c/ProtoRPC/ProtoRPC.m4
-rw-r--r--src/objective-c/README.md48
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h6
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m10
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateWriter.m4
-rw-r--r--src/objective-c/RxLibrary/GRXWriteable.h8
-rw-r--r--src/objective-c/RxLibrary/GRXWriteable.m4
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.h6
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.m8
-rw-r--r--src/objective-c/RxLibrary/transformations/GRXMappingWriter.m4
-rw-r--r--src/objective-c/examples/Sample/Podfile2
-rw-r--r--src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m39
-rw-r--r--src/python/src/grpc/_adapter/_face_test_case.py2
-rw-r--r--src/python/src/grpc/_adapter/_links_test.py4
-rw-r--r--src/python/src/grpc/_adapter/_lonely_rear_link_test.py2
-rw-r--r--src/python/src/grpc/_adapter/fore.py2
-rw-r--r--src/python/src/grpc/_adapter/rear.py2
-rw-r--r--src/python/src/grpc/early_adopter/implementations.py2
-rw-r--r--src/python/src/grpc/framework/base/implementations_test.py2
-rw-r--r--src/python/src/grpc/framework/face/_calls.py2
-rw-r--r--src/python/src/grpc/framework/face/_test_case.py2
-rw-r--r--src/python/src/grpc/framework/face/demonstration.py2
-rw-r--r--src/python/src/grpc/framework/face/testing/base_util.py2
-rw-r--r--src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py2
-rwxr-xr-xsrc/ruby/bin/apis/pubsub_demo.rb4
-rwxr-xr-xsrc/ruby/bin/interop/interop_client.rb6
-rwxr-xr-xsrc/ruby/bin/interop/interop_server.rb8
-rwxr-xr-xsrc/ruby/bin/math_client.rb44
-rwxr-xr-xsrc/ruby/bin/math_server.rb8
-rwxr-xr-xsrc/ruby/bin/noproto_client.rb8
-rwxr-xr-xsrc/ruby/bin/noproto_server.rb6
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb10
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb22
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb14
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb28
-rw-r--r--src/ruby/lib/grpc/generic/service.rb8
-rw-r--r--src/ruby/lib/grpc/logconfig.rb5
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb4
106 files changed, 1329 insertions, 798 deletions
diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc
index 82dd06bcec..5dd078b303 100644
--- a/src/compiler/csharp_generator.cc
+++ b/src/compiler/csharp_generator.cc
@@ -51,20 +51,49 @@ using grpc_generator::METHODTYPE_NO_STREAMING;
using grpc_generator::METHODTYPE_CLIENT_STREAMING;
using grpc_generator::METHODTYPE_SERVER_STREAMING;
using grpc_generator::METHODTYPE_BIDI_STREAMING;
+using grpc_generator::StringReplace;
using std::map;
using std::vector;
namespace grpc_csharp_generator {
namespace {
-std::string GetCSharpNamespace(const FileDescriptor* file) {
- // TODO(jtattermusch): this should be based on csharp_namespace option
+// TODO(jtattermusch): make GetFileNamespace part of libprotoc public API.
+// NOTE: Implementation needs to match exactly to GetFileNamespace
+// defined in csharp_helpers.h in protoc csharp plugin.
+// We cannot reference it directly because google3 protobufs
+// don't have a csharp protoc plugin.
+std::string GetFileNamespace(const FileDescriptor* file) {
+ if (file->options().has_csharp_namespace()) {
+ return file->options().csharp_namespace();
+ }
return file->package();
}
-std::string GetMessageType(const Descriptor* message) {
- // TODO(jtattermusch): this has to match with C# protobuf generator
- return message->name();
+std::string ToCSharpName(const std::string& name, const FileDescriptor* file) {
+ std::string result = GetFileNamespace(file);
+ if (result != "") {
+ result += '.';
+ }
+ std::string classname;
+ if (file->package().empty()) {
+ classname = name;
+ } else {
+ // Strip the proto package from full_name since we've replaced it with
+ // the C# namespace.
+ classname = name.substr(file->package().size() + 1);
+ }
+ result += StringReplace(classname, ".", ".Types.", false);
+ return "global::" + result;
+}
+
+// TODO(jtattermusch): make GetClassName part of libprotoc public API.
+// NOTE: Implementation needs to match exactly to GetClassName
+// defined in csharp_helpers.h in protoc csharp plugin.
+// We cannot reference it directly because google3 protobufs
+// don't have a csharp protoc plugin.
+std::string GetClassName(const Descriptor* message) {
+ return ToCSharpName(message->full_name(), message->file());
}
std::string GetServiceClassName(const ServiceDescriptor* service) {
@@ -114,22 +143,22 @@ std::string GetMethodRequestParamMaybe(const MethodDescriptor *method) {
if (method->client_streaming()) {
return "";
}
- return GetMessageType(method->input_type()) + " request, ";
+ return GetClassName(method->input_type()) + " request, ";
}
std::string GetMethodReturnTypeClient(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
- return "Task<" + GetMessageType(method->output_type()) + ">";
+ return "Task<" + GetClassName(method->output_type()) + ">";
case METHODTYPE_CLIENT_STREAMING:
- return "AsyncClientStreamingCall<" + GetMessageType(method->input_type())
- + ", " + GetMessageType(method->output_type()) + ">";
+ return "AsyncClientStreamingCall<" + GetClassName(method->input_type())
+ + ", " + GetClassName(method->output_type()) + ">";
case METHODTYPE_SERVER_STREAMING:
- return "AsyncServerStreamingCall<" + GetMessageType(method->output_type())
+ return "AsyncServerStreamingCall<" + GetClassName(method->output_type())
+ ">";
case METHODTYPE_BIDI_STREAMING:
- return "AsyncDuplexStreamingCall<" + GetMessageType(method->input_type())
- + ", " + GetMessageType(method->output_type()) + ">";
+ return "AsyncDuplexStreamingCall<" + GetClassName(method->input_type())
+ + ", " + GetClassName(method->output_type()) + ">";
}
GOOGLE_LOG(FATAL)<< "Can't get here.";
return "";
@@ -139,10 +168,10 @@ std::string GetMethodRequestParamServer(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
case METHODTYPE_SERVER_STREAMING:
- return GetMessageType(method->input_type()) + " request";
+ return GetClassName(method->input_type()) + " request";
case METHODTYPE_CLIENT_STREAMING:
case METHODTYPE_BIDI_STREAMING:
- return "IAsyncStreamReader<" + GetMessageType(method->input_type())
+ return "IAsyncStreamReader<" + GetClassName(method->input_type())
+ "> requestStream";
}
GOOGLE_LOG(FATAL)<< "Can't get here.";
@@ -153,7 +182,7 @@ std::string GetMethodReturnTypeServer(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
case METHODTYPE_CLIENT_STREAMING:
- return "Task<" + GetMessageType(method->output_type()) + ">";
+ return "Task<" + GetClassName(method->output_type()) + ">";
case METHODTYPE_SERVER_STREAMING:
case METHODTYPE_BIDI_STREAMING:
return "Task";
@@ -169,7 +198,7 @@ std::string GetMethodResponseStreamMaybe(const MethodDescriptor *method) {
return "";
case METHODTYPE_SERVER_STREAMING:
case METHODTYPE_BIDI_STREAMING:
- return ", IServerStreamWriter<" + GetMessageType(method->output_type())
+ return ", IServerStreamWriter<" + GetClassName(method->output_type())
+ "> responseStream";
}
GOOGLE_LOG(FATAL)<< "Can't get here.";
@@ -202,7 +231,7 @@ void GenerateMarshallerFields(Printer* out, const ServiceDescriptor *service) {
out->Print(
"static readonly Marshaller<$type$> $fieldname$ = Marshallers.Create((arg) => arg.ToByteArray(), $type$.ParseFrom);\n",
"fieldname", GetMarshallerFieldName(message), "type",
- GetMessageType(message));
+ GetClassName(message));
}
out->Print("\n");
}
@@ -211,8 +240,8 @@ void GenerateStaticMethodField(Printer* out, const MethodDescriptor *method) {
out->Print(
"static readonly Method<$request$, $response$> $fieldname$ = new Method<$request$, $response$>(\n",
"fieldname", GetMethodFieldName(method), "request",
- GetMessageType(method->input_type()), "response",
- GetMessageType(method->output_type()));
+ GetClassName(method->input_type()), "response",
+ GetClassName(method->output_type()));
out->Indent();
out->Indent();
out->Print("$methodtype$,\n", "methodtype",
@@ -242,8 +271,8 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) {
out->Print(
"$response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken));\n",
"methodname", method->name(), "request",
- GetMessageType(method->input_type()), "response",
- GetMessageType(method->output_type()));
+ GetClassName(method->input_type()), "response",
+ GetClassName(method->output_type()));
}
std::string method_name = method->name();
@@ -310,8 +339,8 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
out->Print(
"public $response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken))\n",
"methodname", method->name(), "request",
- GetMessageType(method->input_type()), "response",
- GetMessageType(method->output_type()));
+ GetClassName(method->input_type()), "response",
+ GetClassName(method->output_type()));
out->Print("{\n");
out->Indent();
out->Print("var call = CreateCall($servicenamefield$, $methodfield$);\n",
@@ -466,7 +495,7 @@ grpc::string GetServices(const FileDescriptor *file) {
// TODO(jtattermusch): add using for protobuf message classes
out.Print("\n");
- out.Print("namespace $namespace$ {\n", "namespace", GetCSharpNamespace(file));
+ out.Print("namespace $namespace$ {\n", "namespace", GetFileNamespace(file));
out.Indent();
for (int i = 0; i < file->service_count(); i++) {
GenerateService(&out, file->service(i));
diff --git a/src/compiler/generator_helpers.h b/src/compiler/generator_helpers.h
index 7ce4ec526c..7bdaff1c9b 100644
--- a/src/compiler/generator_helpers.h
+++ b/src/compiler/generator_helpers.h
@@ -60,21 +60,26 @@ inline grpc::string StripProto(grpc::string filename) {
}
inline grpc::string StringReplace(grpc::string str, const grpc::string &from,
- const grpc::string &to) {
+ const grpc::string &to, bool replace_all) {
size_t pos = 0;
- for (;;) {
+ do {
pos = str.find(from, pos);
if (pos == grpc::string::npos) {
break;
}
str.replace(pos, from.length(), to);
pos += to.length();
- }
+ } while(replace_all);
return str;
}
+inline grpc::string StringReplace(grpc::string str, const grpc::string &from,
+ const grpc::string &to) {
+ return StringReplace(str, from, to, true);
+}
+
inline std::vector<grpc::string> tokenize(const grpc::string &input,
const grpc::string &delimiters) {
std::vector<grpc::string> tokens;
@@ -103,6 +108,14 @@ inline grpc::string CapitalizeFirstLetter(grpc::string s) {
return s;
}
+inline grpc::string LowercaseFirstLetter(grpc::string s) {
+ if (s.empty()) {
+ return s;
+ }
+ s[0] = ::tolower(s[0]);
+ return s;
+}
+
inline grpc::string LowerUnderscoreToUpperCamel(grpc::string str) {
std::vector<grpc::string> tokens = tokenize(str, "_");
grpc::string result = "";
diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc
index c68c9c37c2..8f35302bee 100644
--- a/src/compiler/objective_c_generator.cc
+++ b/src/compiler/objective_c_generator.cc
@@ -40,195 +40,200 @@
#include <sstream>
+using ::grpc::protobuf::io::Printer;
+using ::grpc::protobuf::MethodDescriptor;
+using ::grpc::protobuf::ServiceDescriptor;
+using ::std::map;
+using ::grpc::string;
+
namespace grpc_objective_c_generator {
namespace {
-void PrintSimpleBlockSignature(grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
- (*vars)["method_name"] = method->name();
- (*vars)["request_type"] = PrefixedName(method->input_type()->name());
- (*vars)["response_type"] = PrefixedName(method->output_type()->name());
+void PrintProtoRpcDeclarationAsPragma(Printer *printer,
+ const MethodDescriptor *method,
+ map<string, string> vars) {
+ vars["client_stream"] = method->client_streaming() ? "stream " : "";
+ vars["server_stream"] = method->server_streaming() ? "stream " : "";
- if (method->server_streaming()) {
- printer->Print("// When the response stream finishes, the handler is "
- "called with nil for both arguments.\n\n");
- } else {
- printer->Print("// The handler is only called once.\n\n");
- }
- printer->Print(*vars, "- (id<GRXLiveSource>)$method_name$WithRequest:"
- "($request_type$)request completionHandler:(void(^)"
- "($response_type$ *, NSError *))handler");
+ printer->Print(vars,
+ "#pragma mark $method_name$($client_stream$$request_type$)"
+ " returns ($server_stream$$response_type$)\n\n");
}
-void PrintSimpleDelegateSignature(grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
- (*vars)["method_name"] = method->name();
- (*vars)["request_type"] = PrefixedName(method->input_type()->name());
+void PrintMethodSignature(Printer *printer,
+ const MethodDescriptor *method,
+ const map<string, string>& vars) {
+ // TODO(jcanizales): Print method comments.
- printer->Print(*vars, "- (id<GRXLiveSource>)$method_name$WithRequest:"
- "($request_type$)request delegate:(id<GRXSink>)delegate");
-}
+ printer->Print(vars, "- ($return_type$)$method_name$With");
+ if (method->client_streaming()) {
+ printer->Print("RequestsWriter:(id<GRXWriter>)request");
+ } else {
+ printer->Print(vars, "Request:($prefix$$request_type$ *)request");
+ }
-void PrintAdvancedSignature(grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
- (*vars)["method_name"] = method->name();
- printer->Print(*vars, "- (GRXSource *)$method_name$WithRequest:"
- "(id<GRXSource>)request");
+ // TODO(jcanizales): Put this on a new line and align colons.
+ // TODO(jcanizales): eventHandler for server streaming?
+ printer->Print(" handler:(void(^)(");
+ if (method->server_streaming()) {
+ printer->Print("BOOL done, ");
+ }
+ printer->Print(vars,
+ "$prefix$$response_type$ *response, NSError *error))handler");
}
-void PrintSourceMethodSimpleBlock(grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
- PrintSimpleBlockSignature(printer, method, vars);
-
- (*vars)["method_name"] = method->name();
- printer->Print(" {\n");
- printer->Indent();
- printer->Print(*vars, "return [[self $method_name$WithRequest:request] "
- "connectHandler:^(id value, NSError *error) {\n");
- printer->Indent();
- printer->Print("handler(value, error);\n");
- printer->Outdent();
- printer->Print("}];\n");
- printer->Outdent();
- printer->Print("}\n");
+void PrintSimpleSignature(Printer *printer,
+ const MethodDescriptor *method,
+ map<string, string> vars) {
+ vars["method_name"] =
+ grpc_generator::LowercaseFirstLetter(vars["method_name"]);
+ vars["return_type"] = "void";
+ PrintMethodSignature(printer, method, vars);
}
-void PrintSourceMethodSimpleDelegate(grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
- PrintSimpleDelegateSignature(printer, method, vars);
-
- (*vars)["method_name"] = method->name();
- printer->Print(" {\n");
- printer->Indent();
- printer->Print(*vars, "return [[self $method_name$WithRequest:request]"
- "connectToSink:delegate];\n");
- printer->Outdent();
- printer->Print("}\n");
+void PrintAdvancedSignature(Printer *printer,
+ const MethodDescriptor *method,
+ map<string, string> vars) {
+ vars["method_name"] = "RPCTo" + vars["method_name"];
+ vars["return_type"] = "ProtoRPC *";
+ PrintMethodSignature(printer, method, vars);
}
-void PrintSourceMethodAdvanced(grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
+void PrintMethodDeclarations(Printer *printer,
+ const MethodDescriptor *method,
+ map<string, string> vars) {
+ vars["method_name"] = method->name();
+ vars["request_type"] = method->input_type()->name();
+ vars["response_type"] = method->output_type()->name();
+
+ PrintProtoRpcDeclarationAsPragma(printer, method, vars);
+
+ PrintSimpleSignature(printer, method, vars);
+ printer->Print(";\n\n");
PrintAdvancedSignature(printer, method, vars);
+ printer->Print(";\n\n\n");
+}
- (*vars)["method_name"] = method->name();
- printer->Print(" {\n");
- printer->Indent();
- printer->Print(*vars, "return [self $method_name$WithRequest:request "
- "client:[self newClient]];\n");
- printer->Outdent();
+void PrintSimpleImplementation(Printer *printer,
+ const MethodDescriptor *method,
+ map<string, string> vars) {
+ printer->Print("{\n");
+ printer->Print(vars, " [[self RPCTo$method_name$With");
+ if (method->client_streaming()) {
+ printer->Print("RequestsWriter:request");
+ } else {
+ printer->Print("Request:request");
+ }
+ printer->Print(" handler:handler] start];\n");
printer->Print("}\n");
}
-void PrintSourceMethodHandler(grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars) {
- (*vars)["method_name"] = method->name();
- (*vars)["response_type"] = PrefixedName(method->output_type()->name());
- (*vars)["caps_name"] = grpc_generator::CapitalizeFirstLetter(method->name());
-
- printer->Print(*vars, "- (GRXSource *)$method_name$WithRequest:"
- "(id<GRXSource>)request client:(PBgRPCClient *)client {\n");
- printer->Indent();
- printer->Print(*vars,
- "return [self responseWithMethod:$@\"$caps_name\"\n");
- printer->Print(*vars,
- " class:[$response_type$ class]\n");
- printer->Print(" request:request\n");
- printer->Print(" client:client];\n");
- printer->Outdent();
+void PrintAdvancedImplementation(Printer *printer,
+ const MethodDescriptor *method,
+ map<string, string> vars) {
+ printer->Print("{\n");
+ printer->Print(vars, " return [self RPCToMethod:@\"$method_name$\"\n");
+
+ printer->Print(" requestsWriter:");
+ if (method->client_streaming()) {
+ printer->Print("request\n");
+ } else {
+ printer->Print("[GRXWriter writerWithValue:request]\n");
+ }
+
+ printer->Print(vars,
+ " responseClass:[$prefix$$response_type$ class]\n");
+
+ printer->Print(" responsesWriteable:[GRXWriteable ");
+ if (method->server_streaming()) {
+ printer->Print("writeableWithStreamHandler:handler]];\n");
+ } else {
+ printer->Print("writeableWithSingleValueHandler:handler]];\n");
+ }
+
printer->Print("}\n");
}
+void PrintMethodImplementations(Printer *printer,
+ const MethodDescriptor *method,
+ map<string, string> vars) {
+ vars["method_name"] = method->name();
+ vars["request_type"] = method->input_type()->name();
+ vars["response_type"] = method->output_type()->name();
+
+ PrintProtoRpcDeclarationAsPragma(printer, method, vars);
+
+ // TODO(jcanizales): Print documentation from the method.
+ PrintSimpleSignature(printer, method, vars);
+ PrintSimpleImplementation(printer, method, vars);
+
+ printer->Print("// Returns a not-yet-started RPC object.\n");
+ PrintAdvancedSignature(printer, method, vars);
+ PrintAdvancedImplementation(printer, method, vars);
}
-grpc::string GetHeader(const grpc::protobuf::ServiceDescriptor *service,
- const grpc::string message_header) {
- grpc::string output;
+} // namespace
+
+string GetHeader(const ServiceDescriptor *service, const string prefix) {
+ string output;
grpc::protobuf::io::StringOutputStream output_stream(&output);
- grpc::protobuf::io::Printer printer(&output_stream, '$');
- std::map<grpc::string, grpc::string> vars;
- printer.Print("#import \"PBgRPCClient.h\"\n");
- printer.Print("#import \"PBStub.h\"\n");
- vars["message_header"] = message_header;
- printer.Print(vars, "#import \"$message_header$\"\n\n");
- printer.Print("@protocol GRXSource\n");
- printer.Print("@class GRXSource\n\n");
- vars["service_name"] = service->name();
- printer.Print("@protocol $service_name$Stub <NSObject>\n\n");
- printer.Print("#pragma mark Simple block handlers\n\n");
- for (int i = 0; i < service->method_count(); i++) {
- PrintSimpleBlockSignature(&printer, service->method(i), &vars);
- printer.Print(";\n");
- }
- printer.Print("\n");
- printer.Print("#pragma mark Simple delegate handlers.\n\n");
- printer.Print("# TODO(jcanizales): Use high-level snippets to remove this duplication.");
- for (int i = 0; i < service->method_count(); i++) {
- PrintSimpleDelegateSignature(&printer, service->method(i), &vars);
- printer.Print(";\n");
- }
- printer.Print("\n");
- printer.Print("#pragma mark Advanced handlers.\n\n");
+ Printer printer(&output_stream, '$');
+
+ printer.Print("@protocol GRXWriteable;\n");
+ printer.Print("@protocol GRXWriter;\n\n");
+
+ map<string, string> vars = {{"service_name", service->name()},
+ {"prefix", prefix}};
+ printer.Print(vars, "@protocol $prefix$$service_name$ <NSObject>\n\n");
+
for (int i = 0; i < service->method_count(); i++) {
- PrintAdvancedSignature(&printer, service->method(i), &vars);
- printer.Print(";\n");
+ PrintMethodDeclarations(&printer, service->method(i), vars);
}
- printer.Print("\n");
printer.Print("@end\n\n");
- printer.Print("// Basic stub that only does marshalling and parsing\n");
- printer.Print(vars, "@interface $service_name$Stub :"
- " PBStub<$service_name$Stub>\n");
- printer.Print("- (instancetype)initWithHost:(NSString *)host;\n");
+
+ printer.Print("// Basic service implementation, over gRPC, that only does"
+ " marshalling and parsing.\n");
+ printer.Print(vars, "@interface $prefix$$service_name$ :"
+ " ProtoService<$prefix$$service_name$>\n");
+ printer.Print("- (instancetype)initWithHost:(NSString *)host"
+ " NS_DESIGNATED_INITIALIZER;\n");
printer.Print("@end\n");
return output;
}
-grpc::string GetSource(const grpc::protobuf::ServiceDescriptor *service) {
- grpc::string output;
+string GetSource(const ServiceDescriptor *service, const string prefix) {
+ string output;
grpc::protobuf::io::StringOutputStream output_stream(&output);
- grpc::protobuf::io::Printer printer(&output_stream, '$');
- std::map<grpc::string, grpc::string> vars;
- vars["service_name"] = service->name();
- printer.Print(vars, "#import \"$service_name$Stub.pb.h\"\n");
- printer.Print("#import \"PBGeneratedMessage+GRXSource.h\"\n\n");
- vars["full_name"] = service->full_name();
+ Printer printer(&output_stream, '$');
+
+ map<string, string> vars = {{"service_name", service->name()},
+ {"package", service->file()->package()},
+ {"prefix", prefix}};
+
+ printer.Print(vars,
+ "static NSString *const kPackageName = @\"$package$\";\n");
printer.Print(vars,
- "static NSString *const kInterface = @\"$full_name$\";\n");
- printer.Print("@implementation $service_name$Stub\n\n");
+ "static NSString *const kServiceName = @\"$service_name$\";\n\n");
+
+ printer.Print(vars, "@implementation $prefix$$service_name$\n\n");
+
+ printer.Print("// Designated initializer\n");
printer.Print("- (instancetype)initWithHost:(NSString *)host {\n");
- printer.Indent();
- printer.Print("if ((self = [super initWithHost:host "
- "interface:kInterface])) {\n");
- printer.Print("}\n");
- printer.Print("return self;\n");
- printer.Outdent();
+ printer.Print(" return (self = [super initWithHost:host"
+ " packageName:kPackageName serviceName:kServiceName]);\n");
printer.Print("}\n\n");
- printer.Print("#pragma mark Simple block handlers.\n");
- for (int i = 0; i < service->method_count(); i++) {
- PrintSourceMethodSimpleBlock(&printer, service->method(i), &vars);
- }
- printer.Print("\n");
- printer.Print("#pragma mark Simple delegate handlers.\n");
- for (int i = 0; i < service->method_count(); i++) {
- PrintSourceMethodSimpleDelegate(&printer, service->method(i), &vars);
- }
- printer.Print("\n");
- printer.Print("#pragma mark Advanced handlers.\n");
- for (int i = 0; i < service->method_count(); i++) {
- PrintSourceMethodAdvanced(&printer, service->method(i), &vars);
- }
- printer.Print("\n");
- printer.Print("#pragma mark Handlers for subclasses "
- "(stub wrappers) to override.\n");
+ printer.Print("// Override superclass initializer to disallow different"
+ " package and service names.\n");
+ printer.Print("- (instancetype)initWithHost:(NSString *)host\n");
+ printer.Print(" packageName:(NSString *)packageName\n");
+ printer.Print(" serviceName:(NSString *)serviceName {\n");
+ printer.Print(" return [self initWithHost:host];\n");
+ printer.Print("}\n\n\n");
+
for (int i = 0; i < service->method_count(); i++) {
- PrintSourceMethodHandler(&printer, service->method(i), &vars);
+ PrintMethodImplementations(&printer, service->method(i), vars);
}
+
printer.Print("@end\n");
return output;
}
diff --git a/src/compiler/objective_c_generator.h b/src/compiler/objective_c_generator.h
index 93c730b34e..548e96fcf1 100644
--- a/src/compiler/objective_c_generator.h
+++ b/src/compiler/objective_c_generator.h
@@ -38,10 +38,15 @@
namespace grpc_objective_c_generator {
+// Returns the content to be included in the "global_scope" insertion point of
+// the generated header file.
grpc::string GetHeader(const grpc::protobuf::ServiceDescriptor *service,
- const grpc::string message_header);
+ const grpc::string prefix);
-grpc::string GetSource(const grpc::protobuf::ServiceDescriptor *service);
+// Returns the content to be included in the "global_scope" insertion point of
+// the generated implementation file.
+grpc::string GetSource(const grpc::protobuf::ServiceDescriptor *service,
+ const grpc::string prefix);
} // namespace grpc_objective_c_generator
diff --git a/src/compiler/objective_c_generator_helpers.h b/src/compiler/objective_c_generator_helpers.h
index 6a7c13991f..d92a2b5e9a 100644
--- a/src/compiler/objective_c_generator_helpers.h
+++ b/src/compiler/objective_c_generator_helpers.h
@@ -40,18 +40,8 @@
namespace grpc_objective_c_generator {
-const grpc::string prefix = "PBG";
-
inline grpc::string MessageHeaderName(const grpc::protobuf::FileDescriptor *file) {
- return grpc_generator::FileNameInUpperCamel(file) + ".pb.h";
-}
-
-inline grpc::string StubFileName(grpc::string service_name) {
- return prefix + service_name + "Stub";
-}
-
-inline grpc::string PrefixedName(grpc::string name) {
- return prefix + name;
+ return grpc_generator::FileNameInUpperCamel(file) + ".pbobjc.h";
}
}
diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc
index eebce0cd20..3cb170e95c 100644
--- a/src/compiler/objective_c_plugin.cc
+++ b/src/compiler/objective_c_plugin.cc
@@ -39,54 +39,77 @@
#include "src/compiler/objective_c_generator.h"
#include "src/compiler/objective_c_generator_helpers.h"
+using ::grpc::string;
+
class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
public:
ObjectiveCGrpcGenerator() {}
virtual ~ObjectiveCGrpcGenerator() {}
virtual bool Generate(const grpc::protobuf::FileDescriptor *file,
- const grpc::string &parameter,
+ const string &parameter,
grpc::protobuf::compiler::GeneratorContext *context,
- grpc::string *error) const {
+ string *error) const {
if (file->service_count() == 0) {
// No services. Do nothing.
return true;
}
- for (int i = 0; i < file->service_count(); i++) {
- const grpc::protobuf::ServiceDescriptor *service = file->service(i);
- grpc::string file_name = grpc_objective_c_generator::StubFileName(
- service->name());
-
- // Generate .pb.h
- grpc::string header_code = grpc_objective_c_generator::GetHeader(
- service, grpc_objective_c_generator::MessageHeaderName(file));
- std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> header_output(
- context->Open(file_name + ".pb.h"));
- grpc::protobuf::io::CodedOutputStream header_coded_out(
- header_output.get());
- header_coded_out.WriteRaw(header_code.data(), header_code.size());
-
- // Generate .pb.m
- grpc::string source_code = grpc_objective_c_generator::GetSource(service);
- std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> source_output(
- context->Open(file_name + ".pb.m"));
- grpc::protobuf::io::CodedOutputStream source_coded_out(
- source_output.get());
- source_coded_out.WriteRaw(source_code.data(), source_code.size());
+ string file_name = grpc_generator::FileNameInUpperCamel(file);
+ string prefix = file->options().objc_class_prefix();
+
+ {
+ // Generate .pbrpc.h
+
+ string imports = string("#import \"") + file_name + ".pbobjc.h\"\n"
+ "#import <gRPC/ProtoService.h>\n";
+
+ // TODO(jcanizales): Instead forward-declare the input and output types
+ // and import the files in the .pbrpc.m
+ string proto_imports;
+ for (int i = 0; i < file->dependency_count(); i++) {
+ string header = grpc_objective_c_generator::MessageHeaderName(
+ file->dependency(i));
+ proto_imports += string("#import \"") + header + "\"\n";
+ }
+
+ string declarations;
+ for (int i = 0; i < file->service_count(); i++) {
+ const grpc::protobuf::ServiceDescriptor *service = file->service(i);
+ declarations += grpc_objective_c_generator::GetHeader(service, prefix);
+ }
+
+ Write(context, file_name + ".pbrpc.h",
+ imports + '\n' + proto_imports + '\n' + declarations);
+ }
+
+ {
+ // Generate .pbrpc.m
+
+ string imports = string("#import \"") + file_name + ".pbrpc.h\"\n"
+ "#import <gRPC/GRXWriteable.h>\n"
+ "#import <gRPC/GRXWriter+Immediate.h>\n"
+ "#import <gRPC/ProtoRPC.h>\n";
+
+ string definitions;
+ for (int i = 0; i < file->service_count(); i++) {
+ const grpc::protobuf::ServiceDescriptor *service = file->service(i);
+ definitions += grpc_objective_c_generator::GetSource(service, prefix);
+ }
+
+ Write(context, file_name + ".pbrpc.m", imports + '\n' + definitions);
}
return true;
}
private:
- // Insert the given code into the given file at the given insertion point.
- void Insert(grpc::protobuf::compiler::GeneratorContext *context,
- const grpc::string &filename, const grpc::string &insertion_point,
- const grpc::string &code) const {
+ // Write the given code into the given file.
+ void Write(grpc::protobuf::compiler::GeneratorContext *context,
+ const string &filename, const string &code) const {
std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output(
- context->OpenForInsert(filename, insertion_point));
+ context->Open(filename));
grpc::protobuf::io::CodedOutputStream coded_out(output.get());
coded_out.WriteRaw(code.data(), code.size());
}
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 1fa6c97cfb..7c8c80d162 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -68,7 +68,6 @@ static grpc_httpcli_post_override g_post_override = NULL;
static void next_address(internal_request *req);
static void finish(internal_request *req, int success) {
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
req->on_response(req->user_data, success ? &req->parser.r : NULL);
grpc_httpcli_parser_destroy(&req->parser);
if (req->addresses != NULL) {
@@ -87,8 +86,6 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
internal_request *req = user_data;
size_t i;
- gpr_log(GPR_DEBUG, "%s nslices=%d status=%d", __FUNCTION__, nslices, status);
-
for (i = 0; i < nslices; i++) {
if (GPR_SLICE_LENGTH(slices[i])) {
req->have_read_byte = 1;
@@ -121,13 +118,11 @@ done:
}
static void on_written(internal_request *req) {
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
grpc_endpoint_notify_on_read(req->ep, on_read, req);
}
static void done_write(void *arg, grpc_endpoint_cb_status status) {
internal_request *req = arg;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
switch (status) {
case GRPC_ENDPOINT_CB_OK:
on_written(req);
@@ -142,7 +137,6 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) {
static void start_write(internal_request *req) {
gpr_slice_ref(req->request_text);
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
switch (
grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) {
case GRPC_ENDPOINT_WRITE_DONE:
@@ -160,7 +154,6 @@ static void on_secure_transport_setup_done(void *rp,
grpc_security_status status,
grpc_endpoint *secure_endpoint) {
internal_request *req = rp;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (status != GRPC_SECURITY_OK) {
gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
finish(req, 0);
@@ -173,7 +166,6 @@ static void on_secure_transport_setup_done(void *rp,
static void on_connected(void *arg, grpc_endpoint *tcp) {
internal_request *req = arg;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (!tcp) {
next_address(req);
return;
@@ -201,7 +193,6 @@ static void on_connected(void *arg, grpc_endpoint *tcp) {
static void next_address(internal_request *req) {
grpc_resolved_address *addr;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (req->next_address == req->addresses->naddrs) {
finish(req, 0);
return;
@@ -214,7 +205,6 @@ static void next_address(internal_request *req) {
static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
internal_request *req = arg;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (!addresses) {
finish(req, 0);
return;
diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c
index 740bbe716e..3d202a5cc8 100644
--- a/src/core/iomgr/sockaddr_utils.c
+++ b/src/core/iomgr/sockaddr_utils.c
@@ -169,8 +169,7 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) {
case AF_UNIX:
return 1;
default:
- gpr_log(GPR_ERROR, "Unknown socket family %d in %s", addr->sa_family,
- __FUNCTION__);
+ gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port", addr->sa_family);
return 0;
}
}
@@ -184,8 +183,7 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port) {
((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
return 1;
default:
- gpr_log(GPR_ERROR, "Unknown socket family %d in %s", addr->sa_family,
- __FUNCTION__);
+ gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", addr->sa_family);
return 0;
}
}
diff --git a/src/core/support/subprocess_posix.c b/src/core/support/subprocess_posix.c
index 642520bb47..b4631fa0ed 100644
--- a/src/core/support/subprocess_posix.c
+++ b/src/core/support/subprocess_posix.c
@@ -57,7 +57,7 @@ struct gpr_subprocess {
const char *gpr_subprocess_binary_extension() { return ""; }
-gpr_subprocess *gpr_subprocess_create(int argc, char **argv) {
+gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) {
gpr_subprocess *r;
int pid;
char **exec_args;
@@ -92,7 +92,11 @@ void gpr_subprocess_destroy(gpr_subprocess *p) {
int gpr_subprocess_join(gpr_subprocess *p) {
int status;
+retry:
if (waitpid(p->pid, &status, 0) == -1) {
+ if (errno == EINTR) {
+ goto retry;
+ }
gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno));
return -1;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index d403eeede6..4d2ba7cd7d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -401,6 +401,7 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) {
+ if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
(is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
@@ -408,8 +409,7 @@ static int need_more_data(grpc_call *call) {
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
- (call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
- call->read_state < READ_STATE_GOT_INITIAL_METADATA);
+ (call->write_state == WRITE_STATE_INITIAL && !call->is_client);
}
static void unlock(grpc_call *call) {
@@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
switch ((grpc_ioreq_op)i) {
case GRPC_IOREQ_RECV_MESSAGE:
case GRPC_IOREQ_SEND_MESSAGE:
- if (master->success) {
- call->request_set[i] = REQSET_EMPTY;
- } else {
+ call->request_set[i] = REQSET_EMPTY;
+ if (!master->success) {
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
break;
@@ -583,6 +582,23 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
}
}
+static void early_out_write_ops(grpc_call *call) {
+ switch (call->write_state) {
+ case WRITE_STATE_WRITE_CLOSED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ /* fallthrough */
+ case WRITE_STATE_STARTED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
+ /* fallthrough */
+ case WRITE_STATE_INITIAL:
+ /* do nothing */
+ break;
+ }
+}
+
static void call_on_done_send(void *pc, int success) {
grpc_call *call = pc;
lock(call);
@@ -601,7 +617,9 @@ static void call_on_done_send(void *pc, int success) {
}
if (!success) {
call->write_state = WRITE_STATE_WRITE_CLOSED;
+ early_out_write_ops(call);
}
+ call->send_ops.nops = 0;
call->last_send_contains = 0;
call->sending = 0;
unlock(call);
@@ -921,23 +939,6 @@ static void finish_read_ops(grpc_call *call) {
}
}
-static void early_out_write_ops(grpc_call *call) {
- switch (call->write_state) {
- case WRITE_STATE_WRITE_CLOSED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
- /* fallthrough */
- case WRITE_STATE_STARTED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
- /* fallthrough */
- case WRITE_STATE_INITIAL:
- /* do nothing */
- break;
- }
-}
-
static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t nreqs,
grpc_ioreq_completion_func completion,
@@ -1178,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
+ grpc_cq_end_op(call->cq, tag, call, success);
+}
+
+static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
@@ -1188,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t out;
const grpc_op *op;
grpc_ioreq *req;
+ void (*finish_func)(grpc_call *, int, void *) = finish_batch;
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
@@ -1271,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_status_on_client.trailing_metadata;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
req = &reqs[out++];
@@ -1280,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_close_on_server.cancelled;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
}
}
grpc_cq_begin_op(call->cq, call);
- return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
+ return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func,
tag);
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 351ed5b758..24a23ae5c4 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -427,6 +427,8 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_add_callback(kill_zombie, elem);
} else if (calld->state == PENDING) {
call_list_remove(calld, PENDING_START);
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
}
gpr_mu_unlock(&chand->server->mu);
break;
@@ -663,7 +665,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host) {
registered_method *m;
if (!method) {
- gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
+ gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL");
return NULL;
}
for (m = server->registered_methods; m; m = m->next) {
diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h
index ac76c4cc9c..c9e3e13042 100644
--- a/src/core/transport/chttp2/frame.h
+++ b/src/core/transport/chttp2/frame.h
@@ -53,12 +53,14 @@ typedef struct {
gpr_uint8 send_ping_ack;
gpr_uint8 process_ping_reply;
gpr_uint8 goaway;
+ gpr_uint8 rst_stream;
gpr_int64 initial_window_update;
gpr_uint32 window_update;
gpr_uint32 goaway_last_stream_index;
gpr_uint32 goaway_error;
gpr_slice goaway_text;
+ gpr_uint32 rst_stream_reason;
} grpc_chttp2_parse_state;
#define GRPC_CHTTP2_FRAME_DATA 0
diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c
index 368ca86481..3016aac7a2 100644
--- a/src/core/transport/chttp2/frame_rst_stream.c
+++ b/src/core/transport/chttp2/frame_rst_stream.c
@@ -32,6 +32,9 @@
*/
#include "src/core/transport/chttp2/frame_rst_stream.h"
+
+#include <grpc/support/log.h>
+
#include "src/core/transport/chttp2/frame.h"
gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {
@@ -54,3 +57,40 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {
return slice;
}
+
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
+ grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags) {
+ if (length != 4) {
+ gpr_log(GPR_ERROR, "invalid rst_stream: length=%d, flags=%02x", length, flags);
+ return GRPC_CHTTP2_CONNECTION_ERROR;
+ }
+ parser->byte = 0;
+ return GRPC_CHTTP2_PARSE_OK;
+}
+
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
+ void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
+ int is_last) {
+ gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
+ gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
+ gpr_uint8 *cur = beg;
+ grpc_chttp2_rst_stream_parser *p = parser;
+
+ while (p->byte != 4 && cur != end) {
+ p->reason_bytes[p->byte] = *cur;
+ cur++;
+ p->byte++;
+ }
+
+ if (p->byte == 4) {
+ GPR_ASSERT(is_last);
+ state->rst_stream = 1;
+ state->rst_stream_reason =
+ (((gpr_uint32)p->reason_bytes[0]) << 24) |
+ (((gpr_uint32)p->reason_bytes[1]) << 16) |
+ (((gpr_uint32)p->reason_bytes[2]) << 8) |
+ (((gpr_uint32)p->reason_bytes[3]));
+ }
+
+ return GRPC_CHTTP2_PARSE_OK;
+}
diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h
index 2d3ee18637..07a3c98d03 100644
--- a/src/core/transport/chttp2/frame_rst_stream.h
+++ b/src/core/transport/chttp2/frame_rst_stream.h
@@ -35,7 +35,18 @@
#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H
#include <grpc/support/slice.h>
+#include "src/core/transport/chttp2/frame.h"
+
+typedef struct {
+ gpr_uint8 byte;
+ gpr_uint8 reason_bytes[4];
+} grpc_chttp2_rst_stream_parser;
gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code);
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
+ grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags);
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
+ void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */
diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c
index 3fd8f67226..a489543868 100644
--- a/src/core/transport/chttp2/hpack_parser.c
+++ b/src/core/transport/chttp2/hpack_parser.c
@@ -654,7 +654,7 @@ static int parse_stream_weight(grpc_chttp2_hpack_parser *p,
return 1;
}
- return parse_begin(p, cur + 1, end);
+ return p->after_prioritization(p, cur + 1, end);
}
static int parse_stream_dep3(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur,
@@ -1349,7 +1349,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p,
}
void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) {
- GPR_ASSERT(p->state == parse_begin);
+ p->after_prioritization = p->state;
p->state = parse_stream_dep0;
}
diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h
index bb4c1a1f49..bfc06b3980 100644
--- a/src/core/transport/chttp2/hpack_parser.h
+++ b/src/core/transport/chttp2/hpack_parser.h
@@ -62,6 +62,8 @@ struct grpc_chttp2_hpack_parser {
grpc_chttp2_hpack_parser_state state;
/* future states dependent on the opening op code */
const grpc_chttp2_hpack_parser_state *next_state;
+ /* what to do after skipping prioritization data */
+ grpc_chttp2_hpack_parser_state after_prioritization;
/* the value we're currently parsing */
union {
gpr_uint32 *value;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index a6f9f782a1..9dc5f23389 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -154,7 +154,13 @@ typedef enum {
WRITE_STATE_OPEN,
WRITE_STATE_QUEUED_CLOSE,
WRITE_STATE_SENT_CLOSE
-} WRITE_STATE;
+} write_state;
+
+typedef enum {
+ DONT_SEND_CLOSED = 0,
+ SEND_CLOSED,
+ SEND_CLOSED_WITH_RST_STREAM
+} send_closed;
typedef struct {
stream *head;
@@ -267,6 +273,7 @@ struct transport {
grpc_chttp2_window_update_parser window_update;
grpc_chttp2_settings_parser settings;
grpc_chttp2_ping_parser ping;
+ grpc_chttp2_rst_stream_parser rst_stream;
} simple_parsers;
/* goaway */
@@ -312,8 +319,8 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
- WRITE_STATE write_state;
- gpr_uint8 send_closed;
+ write_state write_state;
+ send_closed send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
@@ -937,7 +944,11 @@ static int prepare_write(transport *t) {
if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
s->outgoing_sopb->nops == 0) {
- s->send_closed = 1;
+ if (!t->is_client && !s->read_closed) {
+ s->send_closed = SEND_CLOSED_WITH_RST_STREAM;
+ } else {
+ s->send_closed = SEND_CLOSED;
+ }
}
if (s->writing_sopb.nops > 0 || s->send_closed) {
stream_list_join(t, s, WRITING);
@@ -982,9 +993,12 @@ static void finalize_outbuf(transport *t) {
while ((s = stream_list_remove_head(t, WRITING))) {
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
- s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
+ s->send_closed != DONT_SEND_CLOSED, s->id, &t->hpack_compressor, &t->outbuf);
s->writing_sopb.nops = 0;
- if (s->send_closed) {
+ if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) {
+ gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR));
+ }
+ if (s->send_closed != DONT_SEND_CLOSED) {
stream_list_join(t, s, WRITTEN_CLOSED);
}
}
@@ -999,9 +1013,10 @@ static void finish_write_common(transport *t, int success) {
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
s->write_state = WRITE_STATE_SENT_CLOSE;
- if (1||!s->cancelled) {
- maybe_finish_read(t, s);
+ if (!t->is_client) {
+ s->read_closed = 1;
}
+ maybe_finish_read(t, s);
}
t->outbuf.count = 0;
t->outbuf.length = 0;
@@ -1127,6 +1142,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
if (op->recv_ops) {
GPR_ASSERT(s->incoming_sopb == NULL);
+ GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED);
s->recv_done_closure.cb = op->on_done_recv;
s->recv_done_closure.user_data = op->recv_user_data;
s->incoming_sopb = op->recv_ops;
@@ -1214,12 +1230,14 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
- schedule_nuke_sopb(t, &s->parser.incoming_sopb);
- if (s->outgoing_sopb) {
- schedule_nuke_sopb(t, s->outgoing_sopb);
- s->outgoing_sopb = NULL;
- stream_list_remove(t, s, WRITABLE);
- schedule_cb(t, s->send_done_closure, 0);
+ if (error_code != GRPC_CHTTP2_NO_ERROR) {
+ schedule_nuke_sopb(t, &s->parser.incoming_sopb);
+ if (s->outgoing_sopb) {
+ schedule_nuke_sopb(t, s->outgoing_sopb);
+ s->outgoing_sopb = NULL;
+ stream_list_remove(t, s, WRITABLE);
+ schedule_cb(t, s->send_done_closure, 0);
+ }
}
if (s->cancelled) {
send_rst = 0;
@@ -1228,31 +1246,34 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
s->cancelled = 1;
stream_list_join(t, s, CANCELLED);
- gpr_ltoa(local_status, buffer);
- add_incoming_metadata(
- t, s,
- grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
- if (!optional_message) {
- switch (local_status) {
- case GRPC_STATUS_CANCELLED:
- add_incoming_metadata(
- t, s, grpc_mdelem_from_strings(t->metadata_context,
- "grpc-message", "Cancelled"));
- break;
- default:
- break;
- }
- } else {
+ if (error_code != GRPC_CHTTP2_NO_ERROR) {
+ /* synthesize a status if we don't believe we'll get one */
+ gpr_ltoa(local_status, buffer);
add_incoming_metadata(
t, s,
- grpc_mdelem_from_metadata_strings(
- t->metadata_context,
- grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
- grpc_mdstr_ref(optional_message)));
+ grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
+ if (!optional_message) {
+ switch (local_status) {
+ case GRPC_STATUS_CANCELLED:
+ add_incoming_metadata(
+ t, s, grpc_mdelem_from_strings(t->metadata_context,
+ "grpc-message", "Cancelled"));
+ break;
+ default:
+ break;
+ }
+ } else {
+ add_incoming_metadata(
+ t, s,
+ grpc_mdelem_from_metadata_strings(
+ t->metadata_context,
+ grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
+ grpc_mdstr_ref(optional_message)));
+ }
+ add_metadata_batch(t, s);
}
- add_metadata_batch(t, s);
- maybe_finish_read(t, s);
}
+ maybe_finish_read(t, s);
}
if (!id) send_rst = 0;
if (send_rst) {
@@ -1527,6 +1548,19 @@ static int init_ping_parser(transport *t) {
return ok;
}
+static int init_rst_stream_parser(transport *t) {
+ int ok = GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_rst_stream_parser_begin_frame(&t->simple_parsers.rst_stream,
+ t->incoming_frame_size,
+ t->incoming_frame_flags);
+ if (!ok) {
+ drop_connection(t);
+ }
+ t->parser = grpc_chttp2_rst_stream_parser_parse;
+ t->parser_data = &t->simple_parsers.rst_stream;
+ return ok;
+}
+
static int init_goaway_parser(transport *t) {
int ok =
GRPC_CHTTP2_PARSE_OK ==
@@ -1581,12 +1615,7 @@ static int init_frame_parser(transport *t) {
gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
return 0;
case GRPC_CHTTP2_FRAME_RST_STREAM:
- /* TODO(ctiller): actually parse the reason */
- cancel_stream_id(
- t, t->incoming_stream_id,
- grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
- GRPC_CHTTP2_CANCEL, 0);
- return init_skip_frame(t, 0);
+ return init_rst_stream_parser(t);
case GRPC_CHTTP2_FRAME_SETTINGS:
return init_settings_frame_parser(t);
case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
@@ -1650,6 +1679,12 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (st.goaway) {
add_goaway(t, st.goaway_error, st.goaway_text);
}
+ if (st.rst_stream) {
+ cancel_stream_id(
+ t, t->incoming_stream_id,
+ grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason),
+ st.rst_stream_reason, 0);
+ }
if (st.process_ping_reply) {
for (i = 0; i < t->ping_count; i++) {
if (0 ==
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 62f4020d7e..e66b4ed2d8 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -149,7 +149,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ cq_.Pluck(&buf); /* status ignored */
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.csproj b/src/csharp/Grpc.Auth/Grpc.Auth.csproj
index f7724ea643..e6abbbfdf0 100644
--- a/src/csharp/Grpc.Auth/Grpc.Auth.csproj
+++ b/src/csharp/Grpc.Auth/Grpc.Auth.csproj
@@ -10,6 +10,7 @@
<RootNamespace>Grpc.Auth</RootNamespace>
<AssemblyName>Grpc.Auth</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+ <DocumentationFile>bin\$(Configuration)\Grpc.Auth.Xml</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec
index 28ec93d3c5..85aee35566 100644
--- a/src/csharp/Grpc.Auth/Grpc.Auth.nuspec
+++ b/src/csharp/Grpc.Auth/Grpc.Auth.nuspec
@@ -22,5 +22,8 @@
</metadata>
<files>
<file src="bin/Release/Grpc.Auth.dll" target="lib/net45" />
+ <file src="bin/Release/Grpc.Auth.pdb" target="lib/net45" />
+ <file src="bin/Release/Grpc.Auth.xml" target="lib/net45" />
+ <file src="**\*.cs" target="src" />
</files>
</package>
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index eac8d16fb1..62cb443272 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -34,6 +34,9 @@
<HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath>
</Reference>
<Reference Include="System" />
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
@@ -57,7 +60,5 @@
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
- <ItemGroup>
- <Folder Include="Internal\" />
- </ItemGroup>
+ <ItemGroup />
</Project> \ No newline at end of file
diff --git a/src/csharp/Grpc.Core.Tests/packages.config b/src/csharp/Grpc.Core.Tests/packages.config
index c714ef3a23..28af8d78c6 100644
--- a/src/csharp/Grpc.Core.Tests/packages.config
+++ b/src/csharp/Grpc.Core.Tests/packages.config
@@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
index b95776f66d..d66b0d4974 100644
--- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
@@ -40,33 +40,17 @@ namespace Grpc.Core
/// <summary>
/// Return type for client streaming calls.
/// </summary>
- public sealed class AsyncClientStreamingCall<TRequest, TResponse>
- where TRequest : class
- where TResponse : class
+ public sealed class AsyncClientStreamingCall<TRequest, TResponse> : IDisposable
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> result;
+ readonly Action disposeAction;
- public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result)
+ public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction)
{
this.requestStream = requestStream;
this.result = result;
- }
-
- /// <summary>
- /// Writes a request to RequestStream.
- /// </summary>
- public Task Write(TRequest message)
- {
- return requestStream.Write(message);
- }
-
- /// <summary>
- /// Closes the RequestStream.
- /// </summary>
- public Task Close()
- {
- return requestStream.Close();
+ this.disposeAction = disposeAction;
}
/// <summary>
@@ -99,5 +83,16 @@ namespace Grpc.Core
{
return result.GetAwaiter();
}
+
+ /// <summary>
+ /// Provides means to provide after the call.
+ /// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything.
+ /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
+ /// As a result, all resources being used by the call should be released eventually.
+ /// </summary>
+ public void Dispose()
+ {
+ disposeAction.Invoke();
+ }
}
}
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index ee05437416..4c0d5936ac 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -40,42 +40,17 @@ namespace Grpc.Core
/// <summary>
/// Return type for bidirectional streaming calls.
/// </summary>
- public sealed class AsyncDuplexStreamingCall<TRequest, TResponse>
- where TRequest : class
- where TResponse : class
+ public sealed class AsyncDuplexStreamingCall<TRequest, TResponse> : IDisposable
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
+ readonly Action disposeAction;
- public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream)
+ public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
- }
-
- /// <summary>
- /// Writes a request to RequestStream.
- /// </summary>
- public Task Write(TRequest message)
- {
- return requestStream.Write(message);
- }
-
- /// <summary>
- /// Closes the RequestStream.
- /// </summary>
- public Task Close()
- {
- return requestStream.Close();
- }
-
- /// <summary>
- /// Reads a response from ResponseStream.
- /// </summary>
- /// <returns></returns>
- public Task<TResponse> ReadNext()
- {
- return responseStream.ReadNext();
+ this.disposeAction = disposeAction;
}
/// <summary>
@@ -99,5 +74,16 @@ namespace Grpc.Core
return requestStream;
}
}
+
+ /// <summary>
+ /// Provides means to cleanup after the call.
+ /// If the call has already finished normally (request stream has been completed and response stream has been fully read), doesn't do anything.
+ /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
+ /// As a result, all resources being used by the call should be released eventually.
+ /// </summary>
+ public void Dispose()
+ {
+ disposeAction.Invoke();
+ }
}
}
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index 73b9614985..7a479b9a23 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -40,23 +40,15 @@ namespace Grpc.Core
/// <summary>
/// Return type for server streaming calls.
/// </summary>
- public sealed class AsyncServerStreamingCall<TResponse>
- where TResponse : class
+ public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
+ readonly Action disposeAction;
- public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream)
+ public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
{
this.responseStream = responseStream;
- }
-
- /// <summary>
- /// Reads the next response from ResponseStream
- /// </summary>
- /// <returns></returns>
- public Task<TResponse> ReadNext()
- {
- return responseStream.ReadNext();
+ this.disposeAction = disposeAction;
}
/// <summary>
@@ -69,5 +61,16 @@ namespace Grpc.Core
return responseStream;
}
}
+
+ /// <summary>
+ /// Provides means to cleanup after the call.
+ /// If the call has already finished normally (response stream has been fully read), doesn't do anything.
+ /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
+ /// As a result, all resources being used by the call should be released eventually.
+ /// </summary>
+ public void Dispose()
+ {
+ disposeAction.Invoke();
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs
index d1ee59ff0a..37b452f020 100644
--- a/src/csharp/Grpc.Core/Call.cs
+++ b/src/csharp/Grpc.Core/Call.cs
@@ -41,8 +41,6 @@ namespace Grpc.Core
/// Abstraction of a call to be invoked on a client.
/// </summary>
public class Call<TRequest, TResponse>
- where TRequest : class
- where TResponse : class
{
readonly string name;
readonly Marshaller<TRequest> requestMarshaller;
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index ba42a2d4f8..9f8baac684 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -73,7 +73,7 @@ namespace Grpc.Core
asyncCall.StartServerStreamingCall(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
- return new AsyncServerStreamingCall<TResponse>(responseStream);
+ return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel);
}
public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
@@ -85,7 +85,7 @@ namespace Grpc.Core
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
- return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask);
+ return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel);
}
public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
@@ -98,7 +98,7 @@ namespace Grpc.Core
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
- return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream);
+ return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel);
}
private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token)
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index f5f2cf5f22..fe2d446a35 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -13,6 +13,7 @@
<AssemblyName>Grpc.Core</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<NuGetPackageImportStamp>8bb563fb</NuGetPackageImportStamp>
+ <DocumentationFile>bin\$(Configuration)\Grpc.Core.Xml</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
@@ -37,6 +38,9 @@
<Reference Include="System.Collections.Immutable">
<HintPath>..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncDuplexStreamingCall.cs" />
diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec
index e54908cb8b..69e8497bb7 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.nuspec
+++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec
@@ -16,10 +16,14 @@
<tags>gRPC RPC Protocol HTTP/2</tags>
<dependencies>
<dependency id="Microsoft.Bcl.Immutable" version="1.0.34" />
- <dependency id="grpc.native.csharp_ext" version="0.8.0.0" />
+ <dependency id="Ix-Async" version="1.2.3" />
+ <dependency id="grpc.native.csharp_ext" version="0.9.0.0" />
</dependencies>
</metadata>
<files>
<file src="bin/Release/Grpc.Core.dll" target="lib/net45" />
+ <file src="bin/Release/Grpc.Core.pdb" target="lib/net45" />
+ <file src="bin/Release/Grpc.Core.xml" target="lib/net45" />
+ <file src="**\*.cs" target="src" />
</files>
</package>
diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
index 699741cd05..371fbf27ce 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
@@ -43,13 +43,8 @@ namespace Grpc.Core
/// A stream of messages to be read.
/// </summary>
/// <typeparam name="T"></typeparam>
- public interface IAsyncStreamReader<T>
- where T : class
+ public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse>
{
- /// <summary>
- /// Reads a single message. Returns null if the last message was already read.
- /// A following read can only be started when the previous one finishes.
- /// </summary>
- Task<T> ReadNext();
+ // TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface.
}
}
diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
index 4bd8bfb8df..2000210252 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
@@ -44,12 +44,11 @@ namespace Grpc.Core
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IAsyncStreamWriter<T>
- where T : class
{
/// <summary>
- /// Writes a single message. Only one write can be pending at a time.
+ /// Writes a single asynchronously. Only one write can be pending at a time.
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
- Task Write(T message);
+ Task WriteAsync(T message);
}
}
diff --git a/src/csharp/Grpc.Core/IClientStreamWriter.cs b/src/csharp/Grpc.Core/IClientStreamWriter.cs
index 0847a928e6..a3028bc374 100644
--- a/src/csharp/Grpc.Core/IClientStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IClientStreamWriter.cs
@@ -44,11 +44,10 @@ namespace Grpc.Core
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IClientStreamWriter<T> : IAsyncStreamWriter<T>
- where T : class
{
/// <summary>
- /// Closes the stream. Can only be called once there is no pending write. No writes should follow calling this.
+ /// Completes/closes the stream. Can only be called once there is no pending write. No writes should follow calling this.
/// </summary>
- Task Close();
+ Task CompleteAsync();
}
}
diff --git a/src/csharp/Grpc.Core/IServerStreamWriter.cs b/src/csharp/Grpc.Core/IServerStreamWriter.cs
index 199a585a3f..9f3af59109 100644
--- a/src/csharp/Grpc.Core/IServerStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IServerStreamWriter.cs
@@ -43,7 +43,7 @@ namespace Grpc.Core
/// A writable stream of messages that is used in server-side handlers.
/// </summary>
public interface IServerStreamWriter<T> : IAsyncStreamWriter<T>
- where T : class
{
+ // TODO(jtattermusch): consider just using IAsyncStreamWriter instead of this interface.
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 1697058732..58f493463b 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -38,8 +38,6 @@ namespace Grpc.Core.Internal
/// Writes requests asynchronously to an underlying AsyncCall object.
/// </summary>
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
- where TRequest : class
- where TResponse : class
{
readonly AsyncCall<TRequest, TResponse> call;
@@ -48,14 +46,14 @@ namespace Grpc.Core.Internal
this.call = call;
}
- public Task Write(TRequest message)
+ public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, taskSource.CompletionDelegate);
return taskSource.Task;
}
- public Task Close()
+ public Task CompleteAsync()
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index b2378cade6..6c44521038 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCall<TRequest, TResponse> call;
+ TResponse current;
public ClientResponseStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
}
- public Task<TResponse> ReadNext()
+ public TResponse Current
{
+ get
+ {
+ if (current == null)
+ {
+ throw new InvalidOperationException("No current element is available.");
+ }
+ return current;
+ }
+ }
+
+ public async Task<bool> MoveNext(CancellationToken token)
+ {
+ if (token != CancellationToken.None)
+ {
+ throw new InvalidOperationException("Cancellation of individual reads is not supported.");
+ }
var taskSource = new AsyncCompletionTaskSource<TResponse>();
call.StartReadMessage(taskSource.CompletionDelegate);
- return taskSource.Task;
+ var result = await taskSource.Task;
+ this.current = result;
+ return result != null;
+ }
+
+ public void Dispose()
+ {
+ // TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 95d8e97869..f494d9e0ff 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@@ -71,12 +72,13 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
- var request = await requestStream.ReadNext();
+ Preconditions.CheckArgument(await requestStream.MoveNext());
+ var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- Preconditions.CheckArgument(await requestStream.ReadNext() == null);
+ Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
var result = await handler(context, request);
- await responseStream.Write(result);
+ await responseStream.WriteAsync(result);
}
catch (Exception e)
{
@@ -85,7 +87,7 @@ namespace Grpc.Core.Internal
}
try
{
- await responseStream.WriteStatus(status);
+ await responseStream.WriteStatusAsync(status);
}
catch (OperationCanceledException)
{
@@ -122,9 +124,10 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
- var request = await requestStream.ReadNext();
+ Preconditions.CheckArgument(await requestStream.MoveNext());
+ var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- Preconditions.CheckArgument(await requestStream.ReadNext() == null);
+ Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
await handler(context, request, responseStream);
@@ -137,7 +140,7 @@ namespace Grpc.Core.Internal
try
{
- await responseStream.WriteStatus(status);
+ await responseStream.WriteStatusAsync(status);
}
catch (OperationCanceledException)
{
@@ -178,7 +181,7 @@ namespace Grpc.Core.Internal
var result = await handler(context, requestStream);
try
{
- await responseStream.Write(result);
+ await responseStream.WriteAsync(result);
}
catch (OperationCanceledException)
{
@@ -193,7 +196,7 @@ namespace Grpc.Core.Internal
try
{
- await responseStream.WriteStatus(status);
+ await responseStream.WriteStatusAsync(status);
}
catch (OperationCanceledException)
{
@@ -240,7 +243,7 @@ namespace Grpc.Core.Internal
}
try
{
- await responseStream.WriteStatus(status);
+ await responseStream.WriteStatusAsync(status);
}
catch (OperationCanceledException)
{
@@ -263,7 +266,7 @@ namespace Grpc.Core.Internal
var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall);
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
- await responseStream.WriteStatus(new Status(StatusCode.Unimplemented, "No such method."));
+ await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."));
// TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed.
await requestStream.ToList();
await finishedTask;
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index d9ee0c815b..3fccb88abb 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
+ TRequest current;
public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call)
{
this.call = call;
}
- public Task<TRequest> ReadNext()
+ public TRequest Current
{
+ get
+ {
+ if (current == null)
+ {
+ throw new InvalidOperationException("No current element is available.");
+ }
+ return current;
+ }
+ }
+
+ public async Task<bool> MoveNext(CancellationToken token)
+ {
+ if (token != CancellationToken.None)
+ {
+ throw new InvalidOperationException("Cancellation of individual reads is not supported.");
+ }
var taskSource = new AsyncCompletionTaskSource<TRequest>();
call.StartReadMessage(taskSource.CompletionDelegate);
- return taskSource.Task;
+ var result = await taskSource.Task;
+ this.current = result;
+ return result != null;
+ }
+
+ public void Dispose()
+ {
+ // TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index da688d504f..a2d77dd5b7 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -49,14 +49,14 @@ namespace Grpc.Core.Internal
this.call = call;
}
- public Task Write(TResponse message)
+ public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, taskSource.CompletionDelegate);
return taskSource.Task;
}
- public Task WriteStatus(Status status)
+ public Task WriteStatusAsync(Status status)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendStatusFromServer(status, taskSource.CompletionDelegate);
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index 731ea2be81..7a1c016ae2 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -39,9 +39,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
- // TODO: we need to make sure that the delegates are not collected before invoked.
- //internal delegate void ServerShutdownCallbackDelegate(bool success);
-
/// <summary>
/// grpc_server from grpc/grpc.h
/// </summary>
diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs
index e873b3e88a..bc9a499c51 100644
--- a/src/csharp/Grpc.Core/ServerCallContext.cs
+++ b/src/csharp/Grpc.Core/ServerCallContext.cs
@@ -42,7 +42,6 @@ namespace Grpc.Core
/// </summary>
public sealed class ServerCallContext
{
-
// TODO(jtattermusch): add cancellationToken
// TODO(jtattermusch): add deadline info
diff --git a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
index f915155f8a..8a748b45a8 100644
--- a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
+++ b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
@@ -49,14 +49,9 @@ namespace Grpc.Core.Utils
public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
where T : class
{
- while (true)
+ while (await streamReader.MoveNext())
{
- var elem = await streamReader.ReadNext();
- if (elem == null)
- {
- break;
- }
- await asyncAction(elem);
+ await asyncAction(streamReader.Current);
}
}
@@ -67,32 +62,27 @@ namespace Grpc.Core.Utils
where T : class
{
var result = new List<T>();
- while (true)
+ while (await streamReader.MoveNext())
{
- var elem = await streamReader.ReadNext();
- if (elem == null)
- {
- break;
- }
- result.Add(elem);
+ result.Add(streamReader.Current);
}
return result;
}
/// <summary>
/// Writes all elements from given enumerable to the stream.
- /// Closes the stream afterwards unless close = false.
+ /// Completes the stream afterwards unless close = false.
/// </summary>
- public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool close = true)
+ public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
where T : class
{
foreach (var element in elements)
{
- await streamWriter.Write(element);
+ await streamWriter.WriteAsync(element);
}
- if (close)
+ if (complete)
{
- await streamWriter.Close();
+ await streamWriter.CompleteAsync();
}
}
@@ -104,7 +94,7 @@ namespace Grpc.Core.Utils
{
foreach (var element in elements)
{
- await streamWriter.Write(element);
+ await streamWriter.WriteAsync(element);
}
}
}
diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config
index 71967de56e..fb7eaaeeda 100644
--- a/src/csharp/Grpc.Core/packages.config
+++ b/src/csharp/Grpc.Core/packages.config
@@ -2,5 +2,6 @@
<packages>
<package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" />
<package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl.Immutable" version="1.0.34" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
index 87ccf07dd8..6e84add42b 100644
--- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
+++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
@@ -37,6 +37,10 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async, Version=1.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index 4997d3aa42..5aa6f4162d 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -96,7 +96,19 @@ namespace math.Tests
Assert.AreEqual(0, response.Remainder);
}
- // TODO(jtattermusch): test division by zero
+ [Test]
+ public void DivByZero()
+ {
+ try
+ {
+ DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build());
+ Assert.Fail();
+ }
+ catch (RpcException e)
+ {
+ Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
+ }
+ }
[Test]
public void DivAsync()
@@ -114,11 +126,12 @@ namespace math.Tests
{
Task.Run(async () =>
{
- var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build());
-
- var responses = await call.ResponseStream.ToList();
- CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 },
- responses.ConvertAll((n) => n.Num_));
+ using (var call = client.Fib(new FibArgs.Builder { Limit = 6 }.Build()))
+ {
+ var responses = await call.ResponseStream.ToList();
+ CollectionAssert.AreEqual(new List<long> { 1, 1, 2, 3, 5, 8 },
+ responses.ConvertAll((n) => n.Num_));
+ }
}).Wait();
}
@@ -128,13 +141,15 @@ namespace math.Tests
{
Task.Run(async () =>
{
- var call = client.Sum();
- var numbers = new List<long> { 10, 20, 30 }.ConvertAll(
- n => Num.CreateBuilder().SetNum_(n).Build());
+ using (var call = client.Sum())
+ {
+ var numbers = new List<long> { 10, 20, 30 }.ConvertAll(
+ n => Num.CreateBuilder().SetNum_(n).Build());
- await call.RequestStream.WriteAll(numbers);
- var result = await call.Result;
- Assert.AreEqual(60, result.Num_);
+ await call.RequestStream.WriteAll(numbers);
+ var result = await call.Result;
+ Assert.AreEqual(60, result.Num_);
+ }
}).Wait();
}
@@ -150,12 +165,14 @@ namespace math.Tests
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};
- var call = client.DivMany();
- await call.RequestStream.WriteAll(divArgsList);
- var result = await call.ResponseStream.ToList();
+ using (var call = client.DivMany())
+ {
+ await call.RequestStream.WriteAll(divArgsList);
+ var result = await call.ResponseStream.ToList();
- CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient));
- CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder));
+ CollectionAssert.AreEqual(new long[] { 3, 4, 3 }, result.ConvertAll((divReply) => divReply.Quotient));
+ CollectionAssert.AreEqual(new long[] { 1, 16, 1 }, result.ConvertAll((divReply) => divReply.Remainder));
+ }
}).Wait();
}
}
diff --git a/src/csharp/Grpc.Examples.Tests/packages.config b/src/csharp/Grpc.Examples.Tests/packages.config
index 4d6ec63b3c..cc6e9af40f 100644
--- a/src/csharp/Grpc.Examples.Tests/packages.config
+++ b/src/csharp/Grpc.Examples.Tests/packages.config
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Examples/Grpc.Examples.csproj b/src/csharp/Grpc.Examples/Grpc.Examples.csproj
index 2c5019c214..5ce490f403 100644
--- a/src/csharp/Grpc.Examples/Grpc.Examples.csproj
+++ b/src/csharp/Grpc.Examples/Grpc.Examples.csproj
@@ -35,6 +35,9 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs
index ab06a44c0d..d2cfbee18f 100644
--- a/src/csharp/Grpc.Examples/MathExamples.cs
+++ b/src/csharp/Grpc.Examples/MathExamples.cs
@@ -51,18 +51,13 @@ namespace math
Console.WriteLine("DivAsync Result: " + result);
}
- public static async Task DivAsyncWithCancellationExample(Math.IMathClient stub)
- {
- Task<DivReply> resultTask = stub.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
- DivReply result = await resultTask;
- Console.WriteLine(result);
- }
-
public static async Task FibExample(Math.IMathClient stub)
{
- var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build());
- List<Num> result = await call.ResponseStream.ToList();
- Console.WriteLine("Fib Result: " + string.Join("|", result));
+ using (var call = stub.Fib(new FibArgs.Builder { Limit = 5 }.Build()))
+ {
+ List<Num> result = await call.ResponseStream.ToList();
+ Console.WriteLine("Fib Result: " + string.Join("|", result));
+ }
}
public static async Task SumExample(Math.IMathClient stub)
@@ -74,9 +69,11 @@ namespace math
new Num.Builder { Num_ = 3 }.Build()
};
- var call = stub.Sum();
- await call.RequestStream.WriteAll(numbers);
- Console.WriteLine("Sum Result: " + await call.Result);
+ using (var call = stub.Sum())
+ {
+ await call.RequestStream.WriteAll(numbers);
+ Console.WriteLine("Sum Result: " + await call.Result);
+ }
}
public static async Task DivManyExample(Math.IMathClient stub)
@@ -87,9 +84,11 @@ namespace math
new DivArgs.Builder { Dividend = 100, Divisor = 21 }.Build(),
new DivArgs.Builder { Dividend = 7, Divisor = 2 }.Build()
};
- var call = stub.DivMany();
- await call.RequestStream.WriteAll(divArgsList);
- Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList()));
+ using (var call = stub.DivMany())
+ {
+ await call.RequestStream.WriteAll(divArgsList);
+ Console.WriteLine("DivMany Result: " + string.Join("|", await call.ResponseStream.ToList()));
+ }
}
public static async Task DependendRequestsExample(Math.IMathClient stub)
@@ -101,9 +100,12 @@ namespace math
new Num.Builder { Num_ = 3 }.Build()
};
- var sumCall = stub.Sum();
- await sumCall.RequestStream.WriteAll(numbers);
- Num sum = await sumCall.Result;
+ Num sum;
+ using (var sumCall = stub.Sum())
+ {
+ await sumCall.RequestStream.WriteAll(numbers);
+ sum = await sumCall.Result;
+ }
DivReply result = await stub.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build());
Console.WriteLine("Avg Result: " + result);
diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs
index 2546fd220d..b9efc44e8c 100644
--- a/src/csharp/Grpc.Examples/MathGrpc.cs
+++ b/src/csharp/Grpc.Examples/MathGrpc.cs
@@ -12,30 +12,30 @@ namespace math {
{
static readonly string __ServiceName = "math.Math";
- static readonly Marshaller<DivArgs> __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom);
- static readonly Marshaller<DivReply> __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom);
- static readonly Marshaller<FibArgs> __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), FibArgs.ParseFrom);
- static readonly Marshaller<Num> __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom);
+ static readonly Marshaller<global::math.DivArgs> __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivArgs.ParseFrom);
+ static readonly Marshaller<global::math.DivReply> __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivReply.ParseFrom);
+ static readonly Marshaller<global::math.FibArgs> __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.FibArgs.ParseFrom);
+ static readonly Marshaller<global::math.Num> __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), global::math.Num.ParseFrom);
- static readonly Method<DivArgs, DivReply> __Method_Div = new Method<DivArgs, DivReply>(
+ static readonly Method<global::math.DivArgs, global::math.DivReply> __Method_Div = new Method<global::math.DivArgs, global::math.DivReply>(
MethodType.Unary,
"Div",
__Marshaller_DivArgs,
__Marshaller_DivReply);
- static readonly Method<DivArgs, DivReply> __Method_DivMany = new Method<DivArgs, DivReply>(
+ static readonly Method<global::math.DivArgs, global::math.DivReply> __Method_DivMany = new Method<global::math.DivArgs, global::math.DivReply>(
MethodType.DuplexStreaming,
"DivMany",
__Marshaller_DivArgs,
__Marshaller_DivReply);
- static readonly Method<FibArgs, Num> __Method_Fib = new Method<FibArgs, Num>(
+ static readonly Method<global::math.FibArgs, global::math.Num> __Method_Fib = new Method<global::math.FibArgs, global::math.Num>(
MethodType.ServerStreaming,
"Fib",
__Marshaller_FibArgs,
__Marshaller_Num);
- static readonly Method<Num, Num> __Method_Sum = new Method<Num, Num>(
+ static readonly Method<global::math.Num, global::math.Num> __Method_Sum = new Method<global::math.Num, global::math.Num>(
MethodType.ClientStreaming,
"Sum",
__Marshaller_Num,
@@ -44,20 +44,20 @@ namespace math {
// client-side stub interface
public interface IMathClient
{
- DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken));
- Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken));
- AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken));
- AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken));
- AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken));
+ global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken));
+ Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken));
+ AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken));
+ AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken));
+ AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken));
}
// server-side interface
public interface IMath
{
- Task<DivReply> Div(ServerCallContext context, DivArgs request);
- Task DivMany(ServerCallContext context, IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream);
- Task Fib(ServerCallContext context, FibArgs request, IServerStreamWriter<Num> responseStream);
- Task<Num> Sum(ServerCallContext context, IAsyncStreamReader<Num> requestStream);
+ Task<global::math.DivReply> Div(ServerCallContext context, global::math.DivArgs request);
+ Task DivMany(ServerCallContext context, IAsyncStreamReader<global::math.DivArgs> requestStream, IServerStreamWriter<global::math.DivReply> responseStream);
+ Task Fib(ServerCallContext context, global::math.FibArgs request, IServerStreamWriter<global::math.Num> responseStream);
+ Task<global::math.Num> Sum(ServerCallContext context, IAsyncStreamReader<global::math.Num> requestStream);
}
// client stub
@@ -69,27 +69,27 @@ namespace math {
public MathClient(Channel channel, StubConfiguration config) : base(channel, config)
{
}
- public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken))
+ public global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Div);
return Calls.BlockingUnaryCall(call, request, token);
}
- public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken))
+ public Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Div);
return Calls.AsyncUnaryCall(call, request, token);
}
- public AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_DivMany);
return Calls.AsyncDuplexStreamingCall(call, token);
}
- public AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken))
+ public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Fib);
return Calls.AsyncServerStreamingCall(call, request, token);
}
- public AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken))
+ public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Sum);
return Calls.AsyncClientStreamingCall(call, token);
diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs
index 3b33b09bbd..e247ac9d73 100644
--- a/src/csharp/Grpc.Examples/MathServiceImpl.cs
+++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs
@@ -62,7 +62,7 @@ namespace math
{
foreach (var num in FibInternal(request.Limit))
{
- await responseStream.Write(num);
+ await responseStream.WriteAsync(num);
}
}
}
@@ -81,7 +81,7 @@ namespace math
{
await requestStream.ForEach(async divArgs =>
{
- await responseStream.Write(DivInternal(divArgs));
+ await responseStream.WriteAsync(DivInternal(divArgs));
});
}
diff --git a/src/csharp/Grpc.Examples/packages.config b/src/csharp/Grpc.Examples/packages.config
index 51c17bcd5e..4c8d60fa62 100644
--- a/src/csharp/Grpc.Examples/packages.config
+++ b/src/csharp/Grpc.Examples/packages.config
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 1ca3dd24e1..b3a0a2917b 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -54,6 +54,9 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
<Reference Include="System.Net" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Net.Http.Extensions">
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 02f8a369de..dfaf18cae1 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -213,11 +213,13 @@ namespace Grpc.IntegrationTesting
var bodySizes = new List<int> { 27182, 8, 1828, 45904 }.ConvertAll((size) => StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build());
- var call = client.StreamingInputCall();
- await call.RequestStream.WriteAll(bodySizes);
+ using (var call = client.StreamingInputCall())
+ {
+ await call.RequestStream.WriteAll(bodySizes);
- var response = await call.Result;
- Assert.AreEqual(74922, response.AggregatedPayloadSize);
+ var response = await call.Result;
+ Assert.AreEqual(74922, response.AggregatedPayloadSize);
+ }
Console.WriteLine("Passed!");
}).Wait();
}
@@ -236,14 +238,15 @@ namespace Grpc.IntegrationTesting
(size) => ResponseParameters.CreateBuilder().SetSize(size).Build()))
.Build();
- var call = client.StreamingOutputCall(request);
-
- var responseList = await call.ResponseStream.ToList();
- foreach (var res in responseList)
+ using (var call = client.StreamingOutputCall(request))
{
- Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type);
+ var responseList = await call.ResponseStream.ToList();
+ foreach (var res in responseList)
+ {
+ Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type);
+ }
+ CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length));
}
- CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length));
Console.WriteLine("Passed!");
}).Wait();
}
@@ -254,51 +257,48 @@ namespace Grpc.IntegrationTesting
{
Console.WriteLine("running ping_pong");
- var call = client.FullDuplexCall();
-
- StreamingOutputCallResponse response;
-
- await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
- .SetResponseType(PayloadType.COMPRESSABLE)
- .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
- .SetPayload(CreateZerosPayload(27182)).Build());
-
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(31415, response.Payload.Body.Length);
+ using (var call = client.FullDuplexCall())
+ {
+ await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
+ .SetPayload(CreateZerosPayload(27182)).Build());
- await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
- .SetResponseType(PayloadType.COMPRESSABLE)
- .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9))
- .SetPayload(CreateZerosPayload(8)).Build());
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(9, response.Payload.Body.Length);
+ await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9))
+ .SetPayload(CreateZerosPayload(8)).Build());
- await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
- .SetResponseType(PayloadType.COMPRESSABLE)
- .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653))
- .SetPayload(CreateZerosPayload(1828)).Build());
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length);
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(2653, response.Payload.Body.Length);
+ await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653))
+ .SetPayload(CreateZerosPayload(1828)).Build());
- await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
- .SetResponseType(PayloadType.COMPRESSABLE)
- .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979))
- .SetPayload(CreateZerosPayload(45904)).Build());
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length);
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(58979, response.Payload.Body.Length);
+ await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979))
+ .SetPayload(CreateZerosPayload(45904)).Build());
- await call.RequestStream.Close();
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length);
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(null, response);
+ await call.RequestStream.CompleteAsync();
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ }
Console.WriteLine("Passed!");
}).Wait();
}
@@ -308,12 +308,13 @@ namespace Grpc.IntegrationTesting
Task.Run(async () =>
{
Console.WriteLine("running empty_stream");
- var call = client.FullDuplexCall();
- await call.Close();
-
- var responseList = await call.ResponseStream.ToList();
- Assert.AreEqual(0, responseList.Count);
+ using (var call = client.FullDuplexCall())
+ {
+ await call.RequestStream.CompleteAsync();
+ var responseList = await call.ResponseStream.ToList();
+ Assert.AreEqual(0, responseList.Count);
+ }
Console.WriteLine("Passed!");
}).Wait();
}
@@ -365,19 +366,21 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("running cancel_after_begin");
var cts = new CancellationTokenSource();
- var call = client.StreamingInputCall(cts.Token);
- // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
- await Task.Delay(1000);
- cts.Cancel();
-
- try
+ using (var call = client.StreamingInputCall(cts.Token))
{
- var response = await call.Result;
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
+ // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
+ await Task.Delay(1000);
+ cts.Cancel();
+
+ try
+ {
+ var response = await call.Result;
+ Assert.Fail();
+ }
+ catch (RpcException e)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
+ }
}
Console.WriteLine("Passed!");
}).Wait();
@@ -390,29 +393,28 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("running cancel_after_first_response");
var cts = new CancellationTokenSource();
- var call = client.FullDuplexCall(cts.Token);
-
- StreamingOutputCallResponse response;
-
- await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
- .SetResponseType(PayloadType.COMPRESSABLE)
- .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
- .SetPayload(CreateZerosPayload(27182)).Build());
+ using (var call = client.FullDuplexCall(cts.Token))
+ {
+ await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
+ .SetPayload(CreateZerosPayload(27182)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(31415, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
- cts.Cancel();
+ cts.Cancel();
- try
- {
- response = await call.ResponseStream.ReadNext();
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
+ try
+ {
+ await call.ResponseStream.MoveNext();
+ Assert.Fail();
+ }
+ catch (RpcException e)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
+ }
}
Console.WriteLine("Passed!");
}).Wait();
diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
index 679aafb57a..ee077f9f56 100644
--- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
+++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
@@ -12,45 +12,45 @@ namespace grpc.testing {
{
static readonly string __ServiceName = "grpc.testing.TestService";
- static readonly Marshaller<Empty> __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), Empty.ParseFrom);
- static readonly Marshaller<SimpleRequest> __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), SimpleRequest.ParseFrom);
- static readonly Marshaller<SimpleResponse> __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), SimpleResponse.ParseFrom);
- static readonly Marshaller<StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallRequest.ParseFrom);
- static readonly Marshaller<StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallResponse.ParseFrom);
- static readonly Marshaller<StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallRequest.ParseFrom);
- static readonly Marshaller<StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallResponse.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.Empty> __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.Empty.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.SimpleRequest> __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleRequest.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.SimpleResponse> __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleResponse.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallRequest.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallResponse.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallRequest.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallResponse.ParseFrom);
- static readonly Method<Empty, Empty> __Method_EmptyCall = new Method<Empty, Empty>(
+ static readonly Method<global::grpc.testing.Empty, global::grpc.testing.Empty> __Method_EmptyCall = new Method<global::grpc.testing.Empty, global::grpc.testing.Empty>(
MethodType.Unary,
"EmptyCall",
__Marshaller_Empty,
__Marshaller_Empty);
- static readonly Method<SimpleRequest, SimpleResponse> __Method_UnaryCall = new Method<SimpleRequest, SimpleResponse>(
+ static readonly Method<global::grpc.testing.SimpleRequest, global::grpc.testing.SimpleResponse> __Method_UnaryCall = new Method<global::grpc.testing.SimpleRequest, global::grpc.testing.SimpleResponse>(
MethodType.Unary,
"UnaryCall",
__Marshaller_SimpleRequest,
__Marshaller_SimpleResponse);
- static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
+ static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>(
MethodType.ServerStreaming,
"StreamingOutputCall",
__Marshaller_StreamingOutputCallRequest,
__Marshaller_StreamingOutputCallResponse);
- static readonly Method<StreamingInputCallRequest, StreamingInputCallResponse> __Method_StreamingInputCall = new Method<StreamingInputCallRequest, StreamingInputCallResponse>(
+ static readonly Method<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> __Method_StreamingInputCall = new Method<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse>(
MethodType.ClientStreaming,
"StreamingInputCall",
__Marshaller_StreamingInputCallRequest,
__Marshaller_StreamingInputCallResponse);
- static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_FullDuplexCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
+ static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_FullDuplexCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>(
MethodType.DuplexStreaming,
"FullDuplexCall",
__Marshaller_StreamingOutputCallRequest,
__Marshaller_StreamingOutputCallResponse);
- static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_HalfDuplexCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
+ static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_HalfDuplexCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>(
MethodType.DuplexStreaming,
"HalfDuplexCall",
__Marshaller_StreamingOutputCallRequest,
@@ -59,25 +59,25 @@ namespace grpc.testing {
// client-side stub interface
public interface ITestServiceClient
{
- Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken));
- Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken));
- SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken));
- Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken));
- AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken));
- AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
- AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken));
- AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken));
+ global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken));
+ Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken));
+ global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken));
+ Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken));
+ AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken));
+ AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
+ AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken));
+ AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken));
}
// server-side interface
public interface ITestService
{
- Task<Empty> EmptyCall(ServerCallContext context, Empty request);
- Task<SimpleResponse> UnaryCall(ServerCallContext context, SimpleRequest request);
- Task StreamingOutputCall(ServerCallContext context, StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
- Task<StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<StreamingInputCallRequest> requestStream);
- Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
- Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
+ Task<global::grpc.testing.Empty> EmptyCall(ServerCallContext context, global::grpc.testing.Empty request);
+ Task<global::grpc.testing.SimpleResponse> UnaryCall(ServerCallContext context, global::grpc.testing.SimpleRequest request);
+ Task StreamingOutputCall(ServerCallContext context, global::grpc.testing.StreamingOutputCallRequest request, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream);
+ Task<global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingInputCallRequest> requestStream);
+ Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream);
+ Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream);
}
// client stub
@@ -89,42 +89,42 @@ namespace grpc.testing {
public TestServiceClient(Channel channel, StubConfiguration config) : base(channel, config)
{
}
- public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken))
+ public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_EmptyCall);
return Calls.BlockingUnaryCall(call, request, token);
}
- public Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken))
+ public Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_EmptyCall);
return Calls.AsyncUnaryCall(call, request, token);
}
- public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken))
+ public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_UnaryCall);
return Calls.BlockingUnaryCall(call, request, token);
}
- public Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken))
+ public Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_UnaryCall);
return Calls.AsyncUnaryCall(call, request, token);
}
- public AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken))
+ public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_StreamingOutputCall);
return Calls.AsyncServerStreamingCall(call, request, token);
}
- public AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
+ public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_StreamingInputCall);
return Calls.AsyncClientStreamingCall(call, token);
}
- public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_FullDuplexCall);
return Calls.AsyncDuplexStreamingCall(call, token);
}
- public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_HalfDuplexCall);
return Calls.AsyncDuplexStreamingCall(call, token);
diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
index d6ba61ef82..6bd997d1f4 100644
--- a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
+++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
@@ -64,7 +64,7 @@ namespace grpc.testing
{
var response = StreamingOutputCallResponse.CreateBuilder()
.SetPayload(CreateZerosPayload(responseParam.Size)).Build();
- await responseStream.Write(response);
+ await responseStream.WriteAsync(response);
}
}
@@ -86,7 +86,7 @@ namespace grpc.testing
{
var response = StreamingOutputCallResponse.CreateBuilder()
.SetPayload(CreateZerosPayload(responseParam.Size)).Build();
- await responseStream.Write(response);
+ await responseStream.WriteAsync(response);
}
});
}
diff --git a/src/csharp/Grpc.IntegrationTesting/packages.config b/src/csharp/Grpc.IntegrationTesting/packages.config
index e33b6e3e46..291b7b8599 100644
--- a/src/csharp/Grpc.IntegrationTesting/packages.config
+++ b/src/csharp/Grpc.IntegrationTesting/packages.config
@@ -3,6 +3,7 @@
<package id="Google.Apis.Auth" version="1.9.1" targetFramework="net45" />
<package id="Google.Apis.Core" version="1.9.1" targetFramework="net45" />
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl" version="1.1.9" targetFramework="net45" />
<package id="Microsoft.Bcl.Async" version="1.0.168" targetFramework="net45" />
<package id="Microsoft.Bcl.Build" version="1.0.14" targetFramework="net45" />
diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat
index fe7e0a495f..7cb78bddf4 100644
--- a/src/csharp/build_packages.bat
+++ b/src/csharp/build_packages.bat
@@ -11,8 +11,8 @@ endlocal
@call buildall.bat || goto :error
%NUGET% pack ..\..\vsprojects\nuget_package\grpc.native.csharp_ext.nuspec || goto :error
-%NUGET% pack Grpc.Core\Grpc.Core.nuspec || goto :error
-%NUGET% pack Grpc.Auth\Grpc.Auth.nuspec || goto :error
+%NUGET% pack Grpc.Core\Grpc.Core.nuspec -Symbols || goto :error
+%NUGET% pack Grpc.Auth\Grpc.Auth.nuspec -Symbols || goto :error
%NUGET% pack Grpc.nuspec || goto :error
goto :EOF
diff --git a/src/node/bin/README.md b/src/node/bin/README.md
new file mode 100644
index 0000000000..2f856e428e
--- /dev/null
+++ b/src/node/bin/README.md
@@ -0,0 +1,16 @@
+# Command Line Tools
+
+# Service Packager
+
+The command line tool `bin/service_packager`, when called with the following command line:
+
+```bash
+service_packager proto_file -o output_path -n name -v version [-i input_path...]
+```
+
+Populates `output_path` with a node package consisting of a `package.json` populated with `name` and `version`, an `index.js`, a `LICENSE` file copied from gRPC, and a `service.json`, which is compiled from `proto_file` and the given `input_path`s. `require('output_path')` returns an object that is equivalent to
+
+```js
+{ client: require('grpc').load('service.json'),
+ auth: require('google-auth-library') }
+```
diff --git a/src/node/bin/service_packager b/src/node/bin/service_packager
new file mode 100755
index 0000000000..c7f2460997
--- /dev/null
+++ b/src/node/bin/service_packager
@@ -0,0 +1,2 @@
+#!/usr/bin/env node
+require(__dirname+'/../cli/service_packager.js').main(process.argv.slice(2)); \ No newline at end of file
diff --git a/src/node/cli/service_packager.js b/src/node/cli/service_packager.js
new file mode 100644
index 0000000000..f29180b252
--- /dev/null
+++ b/src/node/cli/service_packager.js
@@ -0,0 +1,142 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+'use strict';
+
+var fs = require('fs');
+var path = require('path');
+
+var _ = require('underscore');
+var async = require('async');
+var pbjs = require('protobufjs/cli/pbjs');
+var parseArgs = require('minimist');
+var Mustache = require('mustache');
+
+var package_json = require('../package.json');
+
+var template_path = path.resolve(__dirname, 'service_packager');
+
+var package_tpl_path = path.join(template_path, 'package.json.template');
+
+var arg_format = {
+ string: ['include', 'out', 'name', 'version'],
+ alias: {
+ include: 'i',
+ out: 'o',
+ name: 'n',
+ version: 'v'
+ }
+};
+
+// TODO(mlumish): autogenerate README.md from proto file
+
+/**
+ * Render package.json file from template using provided parameters.
+ * @param {Object} params Map of parameter names to values
+ * @param {function(Error, string)} callback Callback to pass rendered template
+ * text to
+ */
+function generatePackage(params, callback) {
+ fs.readFile(package_tpl_path, {encoding: 'utf-8'}, function(err, template) {
+ if (err) {
+ callback(err);
+ } else {
+ var rendered = Mustache.render(template, params);
+ callback(null, rendered);
+ }
+ });
+}
+
+/**
+ * Copy a file
+ * @param {string} src_path The filepath to copy from
+ * @param {string} dest_path The filepath to copy to
+ */
+function copyFile(src_path, dest_path) {
+ fs.createReadStream(src_path).pipe(fs.createWriteStream(dest_path));
+}
+
+/**
+ * Run the script. Copies the index.js and LICENSE files to the output path,
+ * renders the package.json template to the output path, and generates a
+ * service.json file from the input proto files using pbjs. The arguments are
+ * taken directly from the command line, and handled as follows:
+ * -i (--include) : An include path for pbjs (can be dpulicated)
+ * -o (--output): The output path
+ * -n (--name): The name of the package
+ * -v (--version): The package version
+ * @param {Array} argv The argument vector
+ */
+function main(argv) {
+ var args = parseArgs(argv, arg_format);
+ var out_path = path.resolve(args.out);
+ var include_dirs = [];
+ if (args.include) {
+ include_dirs = _.map(_.flatten([args.include]), function(p) {
+ return path.resolve(p);
+ });
+ }
+ args.grpc_version = package_json.version;
+ generatePackage(args, function(err, rendered) {
+ if (err) throw err;
+ fs.writeFile(path.join(out_path, 'package.json'), rendered, function(err) {
+ if (err) throw err;
+ });
+ });
+ copyFile(path.join(template_path, 'index.js'),
+ path.join(out_path, 'index.js'));
+ copyFile(path.join(__dirname, '..', 'LICENSE'),
+ path.join(out_path, 'LICENSE'));
+
+ var service_stream = fs.createWriteStream(path.join(out_path,
+ 'service.json'));
+ var pbjs_args = _.flatten(['node', 'pbjs',
+ args._[0],
+ '-legacy',
+ _.map(include_dirs, function(dir) {
+ return "-path=" + dir;
+ })]);
+ var old_stdout = process.stdout;
+ process.__defineGetter__('stdout', function() {
+ return service_stream;
+ });
+ var pbjs_status = pbjs.main(pbjs_args);
+ process.__defineGetter__('stdout', function() {
+ return old_stdout;
+ });
+ if (pbjs_status !== pbjs.STATUS_OK) {
+ throw new Error('pbjs failed with status code ' + pbjs_status);
+ }
+}
+
+exports.main = main;
diff --git a/src/node/cli/service_packager/index.js b/src/node/cli/service_packager/index.js
new file mode 100644
index 0000000000..811e08b89a
--- /dev/null
+++ b/src/node/cli/service_packager/index.js
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+var grpc = require('grpc');
+exports.client = grpc.load(__dirname + '/service.json', 'json');
+exports.auth = require('google-auth-library');
diff --git a/src/node/cli/service_packager/package.json.template b/src/node/cli/service_packager/package.json.template
new file mode 100644
index 0000000000..9f9019172e
--- /dev/null
+++ b/src/node/cli/service_packager/package.json.template
@@ -0,0 +1,17 @@
+{
+ "name": "{{{name}}}",
+ "version": "{{{version}}}",
+ "author": "Google Inc.",
+ "description": "Client library for {{{name}}} built on gRPC",
+ "license": "Apache-2.0",
+ "dependencies": {
+ "grpc": "{{{grpc_version}}}",
+ "google-auth-library": "^0.9.2"
+ },
+ "main": "index.js",
+ "files": [
+ "LICENSE",
+ "index.js",
+ "service.json"
+ ]
+}
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js
index 80f811901c..455055d9f3 100644
--- a/src/node/interop/interop_client.js
+++ b/src/node/interop/interop_client.js
@@ -154,13 +154,15 @@ function serverStreaming(client, done) {
arg.response_parameters[resp_index].size);
resp_index += 1;
});
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
+ call.on('end', function() {
assert.strictEqual(resp_index, 4);
if (done) {
done();
}
});
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, grpc.status.OK);
+ });
}
/**
diff --git a/src/node/package.json b/src/node/package.json
index 8d413c3ffa..3f31ba49ff 100644
--- a/src/node/package.json
+++ b/src/node/package.json
@@ -36,6 +36,7 @@
"jshint": "^2.5.0",
"minimist": "^1.1.0",
"mocha": "~1.21.0",
+ "mustache": "^2.0.0",
"strftime": "^0.8.2"
},
"engines": {
@@ -46,6 +47,8 @@
"README.md",
"index.js",
"binding.gyp",
+ "bin",
+ "cli",
"examples",
"ext",
"interop",
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 46d476b9f4..65339406b2 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -81,7 +81,8 @@ function _write(chunk, encoding, callback) {
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
this.call.startBatch(batch, function(err, event) {
if (err) {
- throw err;
+ // Something has gone wrong. Stop writing by failing to call callback
+ return;
}
callback();
});
@@ -120,10 +121,8 @@ function _read(size) {
*/
function readCallback(err, event) {
if (err) {
- throw err;
- }
- if (self.finished) {
- self.push(null);
+ // Something has gone wrong. Stop reading and wait for status
+ self.finished = true;
return;
}
var data = event.read;
@@ -237,10 +236,6 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
- if (err) {
- callback(err);
- return;
- }
emitter.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -248,6 +243,12 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
callback(error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ callback(err);
+ return;
+ }
}
emitter.emit('metadata', response.metadata);
callback(null, deserialize(response.read));
@@ -300,7 +301,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) {
if (err) {
- callback(err);
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
return;
}
stream.emit('metadata', response.metadata);
@@ -309,10 +311,6 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
- if (err) {
- callback(err);
- return;
- }
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -320,6 +318,12 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
callback(error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ callback(err);
+ return;
+ }
}
callback(null, deserialize(response.read));
});
@@ -373,16 +377,15 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
- throw err;
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
+ return;
}
stream.emit('metadata', response.metadata);
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
- if (err) {
- throw err;
- }
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -390,6 +393,12 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ stream.emit('error', err);
+ return;
+ }
}
});
});
@@ -438,16 +447,15 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
- throw err;
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
+ return;
}
stream.emit('metadata', response.metadata);
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
- if (err) {
- throw err;
- }
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -455,6 +463,12 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ stream.emit('error', err);
+ return;
+ }
}
});
});
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index 60e9861bc8..667852f382 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -286,20 +286,24 @@ describe('end-to-end', function() {
assert.ifError(err);
assert(response['send metadata']);
assert.strictEqual(response.read.toString(), requests[0]);
- var end_batch = {};
- end_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
- end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
- 'metadata': {},
- 'code': grpc.status.OK,
- 'details': status_text
- };
- end_batch[grpc.opType.RECV_MESSAGE] = true;
- server_call.startBatch(end_batch, function(err, response) {
+ var snd_batch = {};
+ snd_batch[grpc.opType.RECV_MESSAGE] = true;
+ server_call.startBatch(snd_batch, function(err, response) {
assert.ifError(err);
- assert(response['send status']);
- assert(!response.cancelled);
assert.strictEqual(response.read.toString(), requests[1]);
- done();
+ var end_batch = {};
+ end_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
+ 'metadata': {},
+ 'code': grpc.status.OK,
+ 'details': status_text
+ };
+ server_call.startBatch(end_batch, function(err, response) {
+ assert.ifError(err);
+ assert(response['send status']);
+ assert(!response.cancelled);
+ done();
+ });
});
});
});
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 0f458b40cd..a4a0ddb324 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -231,7 +231,7 @@
handler:resumingHandler]] errorHandler:errorHandler];
}
-- (void)didReceiveValue:(id)value {
+- (void)writeValue:(id)value {
// TODO(jcanizales): Throw/assert if value isn't NSData.
// Pause the input and only resume it when the C layer notifies us that writes
@@ -255,7 +255,7 @@
errorHandler:errorHandler];
}
-- (void)didFinishWithError:(NSError *)errorOrNil {
+- (void)writesFinishedWithError:(NSError *)errorOrNil {
if (errorOrNil) {
[self cancel];
} else {
@@ -306,7 +306,7 @@
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
// The following produces a retain cycle self:_responseWriteable:self, which is only
- // broken when didFinishWithError: is sent to the wrapped writeable.
+ // broken when writesFinishedWithError: is sent to the wrapped writeable.
// Care is taken not to retain self strongly in any of the blocks used in
// the implementation of GRPCCall, so that the life of the instance is
// determined by this retain cycle.
diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h
index 24c2b96729..1ef245fe37 100644
--- a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h
+++ b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h
@@ -38,11 +38,11 @@
// This is a thread-safe wrapper over a GRXWriteable instance. It lets one
// enqueue calls to a GRXWriteable instance for the main thread, guaranteeing
-// that didFinishWithError: is the last message sent to it (no matter what
+// that writesFinishedWithError: is the last message sent to it (no matter what
// messages are sent to the wrapper, in what order, nor from which thread). It
// also guarantees that, if cancelWithError: is called from the main thread
// (e.g. by the app cancelling the writes), no further messages are sent to the
-// writeable except didFinishWithError:.
+// writeable except writesFinishedWithError:.
//
// TODO(jcanizales): Let the user specify another queue for the writeable
// callbacks.
@@ -51,23 +51,22 @@
// The GRXWriteable passed is the wrapped writeable.
// Both the GRXWriter instance and the GRXWriteable instance are retained until
-// didFinishWithError: is sent to the writeable, and released after that.
+// writesFinishedWithError: is sent to the writeable, and released after that.
// This is used to create a retain cycle that keeps both objects alive until the
// writing is explicitly finished.
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(id<GRXWriter>)writer
NS_DESIGNATED_INITIALIZER;
-// Enqueues didReceiveValue: to be sent to the writeable in the main thread.
-// The passed handler is invoked from the main thread after didReceiveValue:
-// returns.
+// Enqueues writeValue: to be sent to the writeable in the main thread.
+// The passed handler is invoked from the main thread after writeValue: returns.
- (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler;
-// Enqueues didFinishWithError:nil to be sent to the writeable in the main
+// Enqueues writesFinishedWithError:nil to be sent to the writeable in the main
// thread. After that message is sent to the writeable, all other methods of
// this object are effectively noops.
- (void)enqueueSuccessfulCompletion;
-// If the writeable has not yet received a didFinishWithError: message, this
+// If the writeable has not yet received a writesFinishedWithError: message, this
// will enqueue one to be sent to it in the main thread, and cancel all other
// pending messages to the writeable enqueued by this object (both past and
// future).
@@ -75,7 +74,7 @@
- (void)cancelWithError:(NSError *)error;
// Cancels all pending messages to the writeable enqueued by this object (both
-// past and future). Because the writeable won't receive didFinishWithError:,
+// past and future). Because the writeable won't receive writesFinishedWithError:,
// this also releases the writeable and the writer.
- (void)cancelSilently;
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m
index ac444ef406..7d5ecb56d9 100644
--- a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m
+++ b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m
@@ -43,7 +43,7 @@
@implementation GRPCDelegateWrapper {
dispatch_queue_t _writeableQueue;
- // This ensures that didFinishWithError: is only sent once to the writeable.
+ // This ensures that writesFinishedWithError: is only sent once to the writeable.
dispatch_once_t _alreadyFinished;
}
@@ -69,7 +69,7 @@
// the race.
id<GRXWriteable> writeable = self.writeable;
if (writeable) {
- [writeable didReceiveValue:message];
+ [writeable writeValue:message];
handler();
}
});
@@ -80,7 +80,7 @@
dispatch_once(&_alreadyFinished, ^{
// Cancellation is now impossible. None of the other three blocks can run
// concurrently with this one.
- [self.writeable didFinishWithError:nil];
+ [self.writeable writesFinishedWithError:nil];
// Break the retain cycle with writer, and skip any possible message to the
// wrapped writeable enqueued after this one.
self.writeable = nil;
@@ -100,7 +100,7 @@
self.writeable = nil;
dispatch_async(_writeableQueue, ^{
- [writeable didFinishWithError:error];
+ [writeable writesFinishedWithError:error];
// Break the retain cycle with writer.
self.writer = nil;
});
diff --git a/src/objective-c/ProtoRPC/ProtoRPC.m b/src/objective-c/ProtoRPC/ProtoRPC.m
index c5051c0123..96608f2898 100644
--- a/src/objective-c/ProtoRPC/ProtoRPC.m
+++ b/src/objective-c/ProtoRPC/ProtoRPC.m
@@ -71,9 +71,9 @@
if ((self = [super initWithHost:host method:method requestsWriter:bytesWriter])) {
// A writeable that parses the proto messages received.
_responseWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
- [responsesWriteable didReceiveValue:[responseClass parseFromData:value]];
+ [responsesWriteable writeValue:[responseClass parseFromData:value]];
} completionHandler:^(NSError *errorOrNil) {
- [responsesWriteable didFinishWithError:errorOrNil];
+ [responsesWriteable writesFinishedWithError:errorOrNil];
}];
}
return self;
diff --git a/src/objective-c/README.md b/src/objective-c/README.md
index 05e9f2b4dc..49d7f43882 100644
--- a/src/objective-c/README.md
+++ b/src/objective-c/README.md
@@ -1,3 +1,47 @@
-gRPC implementation for Objective-C on iOS
+# gRPC for Objective-C
-This is a work in progress.
+## How to generate a client library from a Protocol Buffers definition
+
+First install v3 of the Protocol Buffers compiler (_protoc_), by cloning [its Git repository](https://github.com/google/protobuf) and following these [installation instructions](https://github.com/google/protobuf#c-installation---unix) (the ones titled C++; don't miss the note for Mac users).
+
+Then clone this repository and execute the following commands from the root directory where it was cloned.
+
+Compile the gRPC plugins for _protoc_:
+```sh
+make plugins
+```
+
+Create a symbolic link to the compiled plugin binary somewhere in your `$PATH`:
+```sh
+ln -s `pwd`/bins/opt/grpc_objective_c_plugin /usr/local/bin/protoc-gen-objcgrpc
+```
+(Notice that the name of the created link must begin with "protoc-gen-" for _protoc_ to recognize it as a plugin).
+
+If you don't want to create the symbolic link, you can alternatively copy the binary (with the appropriate name). Or you might prefer instead to specify the plugin's path as a flag when invoking _protoc_, in which case no system modification nor renaming is necessary.
+
+Finally, run _protoc_ with the following flags to generate the client library for your `.proto` files:
+
+```sh
+protoc --objc_out=. --objcgrpc_out=. *.proto
+```
+
+This will generate a pair of `.pbobjc.h`/`.pbobjc.m` files for each `.proto` file, with the messages and enums defined in them. And a pair of `.pbrpc.h`/`.pbrpc.m` files for each `.proto` file with services defined. The latter contains the code to make remote calls to the specified API.
+
+## How to integrate a generated gRPC library in your project
+
+### If you use Cocoapods
+
+This is the recommended approach.
+
+You need to create a Podspec file for the generated library. This is simply a matter of copying an example like [this one](https://github.com/grpc/grpc/blob/master/src/objective-c/examples/Sample/RemoteTestClient/RemoteTest.podspec) to the directory where the source files were generated. Update the name and other metadata of the Podspec as suitable.
+
+Once your library has a Podspec, refer to it from your Podfile using `:path` as described [here](https://guides.cocoapods.org/using/the-podfile.html#using-the-files-from-a-folder-local-to-the-machine).
+
+### If you don't use Cocoapods
+
+You need to compile the generated `.pbpbjc.*` files (the enums and messages) without ARC support, and the generated `.pbrpc.*` files (the services) with ARC support. The generated code depends on v0.3+ of the Objective-C gRPC runtime library and v3.0+ of the Objective-C Protobuf runtime library.
+
+These libraries need to be integrated into your project as described in their respective Podspec files:
+
+* [Podspec](https://github.com/grpc/grpc/blob/master/gRPC.podspec) for the Objective-C gRPC runtime library. This can be tedious to configure manually.
+* [Podspec](https://github.com/jcanizales/protobuf/blob/add-podspec/Protobuf.podspec) for the Objective-C Protobuf runtime library.
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index 4147362ba4..5e876a73bf 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -38,12 +38,12 @@
// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
// to -startWithWriteable:).
-// Once it is started, whatever values are written into it (via -didReceiveValue:) will be
-// propagated immediately, unless flow control prevents it.
+// Once it is started, whatever values are written into it (via -writeValue:) will be propagated
+// immediately, unless flow control prevents it.
// If it is throttled and keeps receiving values, as well as if it receives values before being
// started, it will buffer them and propagate them in order as soon as its state becomes
// GRXWriterStateStarted.
-// If it receives an error (via -didFinishWithError:), it will drop any buffered values and
+// If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
// propagate the error immediately.
//
// Beware that a pipe of this type can't prevent receiving more values when it is paused (for
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index 3ef470f89f..4820c84af0 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -62,7 +62,7 @@
- (void)writeBufferUntilPausedOrStopped {
while (_state == GRXWriterStateStarted && _queue.count > 0) {
- [_writeable didReceiveValue:[self popValue]];
+ [_writeable writeValue:[self popValue]];
}
if (_inputIsFinished && _queue.count == 0) {
// Our writer finished normally while we were paused or not-started-yet.
@@ -77,10 +77,10 @@
return _state == GRXWriterStateStarted && _queue.count == 0;
}
-- (void)didReceiveValue:(id)value {
+- (void)writeValue:(id)value {
if (self.shouldFastForward) {
// Skip the queue.
- [_writeable didReceiveValue:value];
+ [_writeable writeValue:value];
} else {
// Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
// So just buffer the new value.
@@ -92,7 +92,7 @@
}
}
-- (void)didFinishWithError:(NSError *)errorOrNil {
+- (void)writesFinishedWithError:(NSError *)errorOrNil {
_inputIsFinished = YES;
_errorOrNil = errorOrNil;
if (errorOrNil || self.shouldFastForward) {
@@ -140,7 +140,7 @@
- (void)finishWithError:(NSError *)errorOrNil {
id<GRXWriteable> writeable = _writeable;
self.state = GRXWriterStateFinished;
- [writeable didFinishWithError:errorOrNil];
+ [writeable writesFinishedWithError:errorOrNil];
}
@end
diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.m b/src/objective-c/RxLibrary/GRXImmediateWriter.m
index 7468af557f..0b4979872e 100644
--- a/src/objective-c/RxLibrary/GRXImmediateWriter.m
+++ b/src/objective-c/RxLibrary/GRXImmediateWriter.m
@@ -109,7 +109,7 @@
- (void)writeUntilPausedOrStopped {
id value;
while (value = [_enumerator nextObject]) {
- [_writeable didReceiveValue:value];
+ [_writeable writeValue:value];
// If the writeable has a reference to us, it might change our state to paused or finished.
if (_state == GRXWriterStatePaused || _state == GRXWriterStateFinished) {
return;
@@ -130,7 +130,7 @@
_errorOrNil = nil;
id<GRXWriteable> writeable = _writeable;
_writeable = nil;
- [writeable didFinishWithError:errorOrNil];
+ [writeable writesFinishedWithError:errorOrNil];
}
- (void)setState:(GRXWriterState)newState {
diff --git a/src/objective-c/RxLibrary/GRXWriteable.h b/src/objective-c/RxLibrary/GRXWriteable.h
index 6f6ea142e0..216de30735 100644
--- a/src/objective-c/RxLibrary/GRXWriteable.h
+++ b/src/objective-c/RxLibrary/GRXWriteable.h
@@ -38,14 +38,12 @@
@protocol GRXWriteable <NSObject>
// Push the next value of the sequence to the receiving object.
-// TODO(jcanizales): Name it enumerator:(id<GRXEnumerator>) didProduceValue:(id)?
-- (void)didReceiveValue:(id)value;
+- (void)writeValue:(id)value;
// Signal that the sequence is completed, or that an error ocurred. After this
-// message is sent to the instance, neither it nor didReceiveValue: may be
+// message is sent to the instance, neither it nor writeValue: may be
// called again.
-// TODO(jcanizales): enumerator:(id<GRXEnumerator>) didFinishWithError:(NSError*)?
-- (void)didFinishWithError:(NSError *)errorOrNil;
+- (void)writesFinishedWithError:(NSError *)errorOrNil;
@end
typedef void (^GRXValueHandler)(id value);
diff --git a/src/objective-c/RxLibrary/GRXWriteable.m b/src/objective-c/RxLibrary/GRXWriteable.m
index 7000a078d1..63f7c3e7f3 100644
--- a/src/objective-c/RxLibrary/GRXWriteable.m
+++ b/src/objective-c/RxLibrary/GRXWriteable.m
@@ -76,13 +76,13 @@
return self;
}
-- (void)didReceiveValue:(id)value {
+- (void)writeValue:(id)value {
if (_valueHandler) {
_valueHandler(value);
}
}
-- (void)didFinishWithError:(NSError *)errorOrNil {
+- (void)writesFinishedWithError:(NSError *)errorOrNil {
if (_completionHandler) {
_completionHandler(errorOrNil);
}
diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h
index 68c294f007..dcf44e3143 100644
--- a/src/objective-c/RxLibrary/GRXWriter.h
+++ b/src/objective-c/RxLibrary/GRXWriter.h
@@ -50,7 +50,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
// The writer is temporarily paused, and won't send any more values to the
// writeable unless its state is set back to Started. The writer might still
// transition to the Finished state at any moment, and is allowed to send
- // didFinishWithError: to its writeable.
+ // writesFinishedWithError: to its writeable.
//
// Not all implementations of writer have to support pausing, and thus
// trying to set an writer's state to this value might have no effect.
@@ -59,7 +59,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
// The writer has released its writeable and won't interact with it anymore.
//
// One seldomly wants to set an writer's state to this value, as its
- // writeable isn't notified with a didFinishWithError: message. Instead, sending
+ // writeable isn't notified with a writesFinishedWithError: message. Instead, sending
// finishWithError: to the writer will make it notify the writeable and then
// transition to this state.
GRXWriterStateFinished
@@ -105,7 +105,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
// This method might only be called on writers in the NotStarted state.
- (void)startWithWriteable:(id<GRXWriteable>)writeable;
-// Send didFinishWithError:errorOrNil immediately to the writeable, and don't send
+// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send
// any more messages to it.
//
// This method might only be called on writers in the Started or Paused
diff --git a/src/objective-c/RxLibrary/GRXWriter.m b/src/objective-c/RxLibrary/GRXWriter.m
index b48a44f3a7..cc14383560 100644
--- a/src/objective-c/RxLibrary/GRXWriter.m
+++ b/src/objective-c/RxLibrary/GRXWriter.m
@@ -62,7 +62,7 @@
- (void)finishOutputWithError:(NSError *)errorOrNil {
id<GRXWriteable> writeable = _writeable;
_writeable = nil;
- [writeable didFinishWithError:errorOrNil];
+ [writeable writesFinishedWithError:errorOrNil];
}
// This is used to stop the input writer. It nillifies our reference to it
@@ -75,11 +75,11 @@
#pragma mark GRXWriteable implementation
-- (void)didReceiveValue:(id)value {
- [_writeable didReceiveValue:value];
+- (void)writeValue:(id)value {
+ [_writeable writeValue:value];
}
-- (void)didFinishWithError:(NSError *)errorOrNil {
+- (void)writesFinishedWithError:(NSError *)errorOrNil {
_writer = nil;
[self finishOutputWithError:errorOrNil];
}
diff --git a/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m b/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m
index 8a41c819a6..2cdfea1b67 100644
--- a/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m
+++ b/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m
@@ -57,7 +57,7 @@ static id (^kIdentity)(id value) = ^id(id value) {
}
// Override
-- (void)didReceiveValue:(id)value {
- [super didReceiveValue:_map(value)];
+- (void)writeValue:(id)value {
+ [super writeValue:_map(value)];
}
@end
diff --git a/src/objective-c/examples/Sample/Podfile b/src/objective-c/examples/Sample/Podfile
index d30d9c5210..e8b78647ac 100644
--- a/src/objective-c/examples/Sample/Podfile
+++ b/src/objective-c/examples/Sample/Podfile
@@ -2,7 +2,7 @@ source 'https://github.com/CocoaPods/Specs.git'
platform :ios, '8.0'
pod 'gRPC', :path => "../../../.."
-pod 'Protobuf', :git => 'https://github.com/jcanizales/protobuf.git', :branch => 'add-podspec'
+pod 'Protobuf', :git => 'https://github.com/google/protobuf.git'
pod 'Route_guide', :path => "RouteGuideClient"
pod 'RemoteTest', :path => "RemoteTestClient"
diff --git a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
index 7cebc0c2a7..8e0e11d23d 100644
--- a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
+++ b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
@@ -208,7 +208,7 @@
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
- [requestsBuffer didReceiveValue:request];
+ [requestsBuffer writeValue:request];
[_service fullDuplexCallWithRequestsWriter:requestsBuffer
handler:^(BOOL done,
@@ -225,9 +225,9 @@
if (index < 4) {
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
- [requestsBuffer didReceiveValue:request];
+ [requestsBuffer writeValue:request];
} else {
- [requestsBuffer didFinishWithError:nil];
+ [requestsBuffer writesFinishedWithError:nil];
}
}
@@ -269,4 +269,37 @@
[self waitForExpectationsWithTimeout:1 handler:nil];
}
+- (void)testCancelAfterFirstResponseRPC {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterFirstResponse"];
+
+ // A buffered pipe to which we write a single value but never close
+ GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
+
+ __block BOOL receivedResponse = NO;
+
+ id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
+ requestedResponseSize:@31415];
+
+ [requestsBuffer writeValue:request];
+
+ __block ProtoRPC *call = [_service RPCToFullDuplexCallWithRequestsWriter:requestsBuffer
+ handler:^(BOOL done,
+ RMTStreamingOutputCallResponse *response,
+ NSError *error) {
+ if (receivedResponse) {
+ XCTAssert(done, @"Unexpected extra response %@", response);
+ XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
+ [expectation fulfill];
+ } else {
+ XCTAssertNil(error, @"Finished with unexpected error: %@", error);
+ XCTAssertFalse(done, @"Finished without response");
+ XCTAssertNotNil(response);
+ receivedResponse = YES;
+ [call cancel];
+ }
+ }];
+ [call start];
+ [self waitForExpectationsWithTimeout:4 handler:nil];
+}
+
@end
diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py
index 923e889844..5fa974ed06 100644
--- a/src/python/src/grpc/_adapter/_face_test_case.py
+++ b/src/python/src/grpc/_adapter/_face_test_case.py
@@ -43,7 +43,7 @@ from grpc.framework.foundation import logging_pool
_TIMEOUT = 3
_MAXIMUM_TIMEOUT = 90
-_MAXIMUM_POOL_SIZE = 400
+_MAXIMUM_POOL_SIZE = 4
class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py
index 4987be389a..4fd76f60f8 100644
--- a/src/python/src/grpc/_adapter/_links_test.py
+++ b/src/python/src/grpc/_adapter/_links_test.py
@@ -54,8 +54,8 @@ def _transform_metadata(unused_metadata):
class RoundTripTest(unittest.TestCase):
def setUp(self):
- self.fore_link_pool = logging_pool.pool(80)
- self.rear_link_pool = logging_pool.pool(80)
+ self.fore_link_pool = logging_pool.pool(8)
+ self.rear_link_pool = logging_pool.pool(8)
def tearDown(self):
self.rear_link_pool.shutdown(wait=True)
diff --git a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py
index 25799d679c..bdb1ee2379 100644
--- a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py
+++ b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py
@@ -43,7 +43,7 @@ _TIMEOUT = 2
class LonelyRearLinkTest(unittest.TestCase):
def setUp(self):
- self.pool = logging_pool.pool(80)
+ self.pool = logging_pool.pool(8)
def tearDown(self):
self.pool.shutdown(wait=True)
diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py
index 05016cdaf3..69e145e3f6 100644
--- a/src/python/src/grpc/_adapter/fore.py
+++ b/src/python/src/grpc/_adapter/fore.py
@@ -41,7 +41,7 @@ from grpc.framework.base import null
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
-_THREAD_POOL_SIZE = 100
+_THREAD_POOL_SIZE = 10
@enum.unique
diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py
index dd0a486117..b3b0b4ed32 100644
--- a/src/python/src/grpc/_adapter/rear.py
+++ b/src/python/src/grpc/_adapter/rear.py
@@ -41,7 +41,7 @@ from grpc.framework.base import null
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
-_THREAD_POOL_SIZE = 100
+_THREAD_POOL_SIZE = 10
_INVOCATION_EVENT_KINDS = (
_low.Event.Kind.METADATA_ACCEPTED,
diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py
index f3f2a043eb..10919fae69 100644
--- a/src/python/src/grpc/early_adopter/implementations.py
+++ b/src/python/src/grpc/early_adopter/implementations.py
@@ -41,7 +41,7 @@ from grpc.framework.base import util as _base_utilities
from grpc.framework.face import implementations as _face_implementations
from grpc.framework.foundation import logging_pool
-_THREAD_POOL_SIZE = 80
+_THREAD_POOL_SIZE = 8
_ONE_DAY_IN_SECONDS = 24 * 60 * 60
diff --git a/src/python/src/grpc/framework/base/implementations_test.py b/src/python/src/grpc/framework/base/implementations_test.py
index 11e49caf75..d40bb4d92e 100644
--- a/src/python/src/grpc/framework/base/implementations_test.py
+++ b/src/python/src/grpc/framework/base/implementations_test.py
@@ -36,7 +36,7 @@ from grpc.framework.base import interfaces_test_case
from grpc.framework.base import util
from grpc.framework.foundation import logging_pool
-POOL_MAX_WORKERS = 100
+POOL_MAX_WORKERS = 10
DEFAULT_TIMEOUT = 30
MAXIMUM_TIMEOUT = 60
diff --git a/src/python/src/grpc/framework/face/_calls.py b/src/python/src/grpc/framework/face/_calls.py
index 75a550e3c7..87edeb0f0e 100644
--- a/src/python/src/grpc/framework/face/_calls.py
+++ b/src/python/src/grpc/framework/face/_calls.py
@@ -248,7 +248,7 @@ class _OperationFuture(future.Future):
"""See future.Future.add_done_callback for specification."""
with self._condition:
if self._callbacks is not None:
- self._callbacks.add(fn)
+ self._callbacks.append(fn)
return
callable_util.call_logging_exceptions(fn, _DONE_CALLBACK_LOG_MESSAGE, self)
diff --git a/src/python/src/grpc/framework/face/_test_case.py b/src/python/src/grpc/framework/face/_test_case.py
index b3a012db00..642d500628 100644
--- a/src/python/src/grpc/framework/face/_test_case.py
+++ b/src/python/src/grpc/framework/face/_test_case.py
@@ -35,7 +35,7 @@ from grpc.framework.face.testing import test_case
from grpc.framework.foundation import logging_pool
_TIMEOUT = 3
-_MAXIMUM_POOL_SIZE = 100
+_MAXIMUM_POOL_SIZE = 10
class FaceTestCase(test_case.FaceTestCase):
diff --git a/src/python/src/grpc/framework/face/demonstration.py b/src/python/src/grpc/framework/face/demonstration.py
index eabeac4569..f6b4b609ff 100644
--- a/src/python/src/grpc/framework/face/demonstration.py
+++ b/src/python/src/grpc/framework/face/demonstration.py
@@ -34,7 +34,7 @@ from grpc.framework.base import implementations as _base_implementations
from grpc.framework.face import implementations
from grpc.framework.foundation import logging_pool
-_POOL_SIZE_LIMIT = 20
+_POOL_SIZE_LIMIT = 5
_MAXIMUM_TIMEOUT = 90
diff --git a/src/python/src/grpc/framework/face/testing/base_util.py b/src/python/src/grpc/framework/face/testing/base_util.py
index 151d0ef793..1df1529b27 100644
--- a/src/python/src/grpc/framework/face/testing/base_util.py
+++ b/src/python/src/grpc/framework/face/testing/base_util.py
@@ -38,7 +38,7 @@ from grpc.framework.base import in_memory
from grpc.framework.base import interfaces # pylint: disable=unused-import
from grpc.framework.foundation import logging_pool
-_POOL_SIZE_LIMIT = 20
+_POOL_SIZE_LIMIT = 5
_MAXIMUM_TIMEOUT = 90
diff --git a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
index 0d51b64f1b..21bf9a4248 100644
--- a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
+++ b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
@@ -44,7 +44,7 @@ from grpc.framework.foundation import future
from grpc.framework.foundation import logging_pool
_TIMEOUT = 3
-_MAXIMUM_POOL_SIZE = 100
+_MAXIMUM_POOL_SIZE = 10
class _PauseableIterator(object):
diff --git a/src/ruby/bin/apis/pubsub_demo.rb b/src/ruby/bin/apis/pubsub_demo.rb
index 6d69b0f21e..a039d036ac 100755
--- a/src/ruby/bin/apis/pubsub_demo.rb
+++ b/src/ruby/bin/apis/pubsub_demo.rb
@@ -79,7 +79,7 @@ end
def publisher_stub(opts)
address = "#{opts.host}:#{opts.port}"
stub_clz = Tech::Pubsub::PublisherService::Stub # shorter
- logger.info("... access PublisherService at #{address}")
+ GRPC.logger.info("... access PublisherService at #{address}")
stub_clz.new(address,
creds: ssl_creds, update_metadata: auth_proc(opts),
GRPC::Core::Channel::SSL_TARGET => opts.host)
@@ -89,7 +89,7 @@ end
def subscriber_stub(opts)
address = "#{opts.host}:#{opts.port}"
stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter
- logger.info("... access SubscriberService at #{address}")
+ GRPC.logger.info("... access SubscriberService at #{address}")
stub_clz.new(address,
creds: ssl_creds, update_metadata: auth_proc(opts),
GRPC::Core::Channel::SSL_TARGET => opts.host)
diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index a388924722..8df03ffb3c 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -70,7 +70,7 @@ end
# loads the certificates used to access the test server securely.
def load_prod_cert
fail 'could not find a production cert' if ENV['SSL_CERT_FILE'].nil?
- logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}")
+ GRPC.logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}")
File.open(ENV['SSL_CERT_FILE']).read
end
@@ -115,10 +115,10 @@ def create_stub(opts)
stub_opts[:update_metadata] = auth_creds.updater_proc
end
- logger.info("... connecting securely to #{address}")
+ GRPC.logger.info("... connecting securely to #{address}")
Grpc::Testing::TestService::Stub.new(address, **stub_opts)
else
- logger.info("... connecting insecurely to #{address}")
+ GRPC.logger.info("... connecting insecurely to #{address}")
Grpc::Testing::TestService::Stub.new(address)
end
end
diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb
index 72570d92f3..78cb8dd836 100755
--- a/src/ruby/bin/interop/interop_server.rb
+++ b/src/ruby/bin/interop/interop_server.rb
@@ -129,13 +129,13 @@ class TestTarget < Grpc::Testing::TestService::Service
Thread.new do
begin
reqs.each do |req|
- logger.info("read #{req.inspect}")
+ GRPC.logger.info("read #{req.inspect}")
resp_size = req.response_parameters[0].size
resp = cls.new(payload: Payload.new(type: req.response_type,
body: nulls(resp_size)))
q.push(resp)
end
- logger.info('finished reads')
+ GRPC.logger.info('finished reads')
q.push(self)
rescue StandardError => e
q.push(e) # share the exception with the enumerator
@@ -179,10 +179,10 @@ def main
s = GRPC::RpcServer.new
if opts['secure']
s.add_http2_port(host, test_server_creds)
- logger.info("... running securely on #{host}")
+ GRPC.logger.info("... running securely on #{host}")
else
s.add_http2_port(host)
- logger.info("... running insecurely on #{host}")
+ GRPC.logger.info("... running insecurely on #{host}")
end
s.handle(TestTarget)
s.run_till_terminated
diff --git a/src/ruby/bin/math_client.rb b/src/ruby/bin/math_client.rb
index db254efb00..6319cda309 100755
--- a/src/ruby/bin/math_client.rb
+++ b/src/ruby/bin/math_client.rb
@@ -46,51 +46,51 @@ require 'optparse'
include GRPC::Core::TimeConsts
def do_div(stub)
- logger.info('request_response')
- logger.info('----------------')
+ GRPC.logger.info('request_response')
+ GRPC.logger.info('----------------')
req = Math::DivArgs.new(dividend: 7, divisor: 3)
- logger.info("div(7/3): req=#{req.inspect}")
+ GRPC.logger.info("div(7/3): req=#{req.inspect}")
resp = stub.div(req, INFINITE_FUTURE)
- logger.info("Answer: #{resp.inspect}")
- logger.info('----------------')
+ GRPC.logger.info("Answer: #{resp.inspect}")
+ GRPC.logger.info('----------------')
end
def do_sum(stub)
# to make client streaming requests, pass an enumerable of the inputs
- logger.info('client_streamer')
- logger.info('---------------')
+ GRPC.logger.info('client_streamer')
+ GRPC.logger.info('---------------')
reqs = [1, 2, 3, 4, 5].map { |x| Math::Num.new(num: x) }
- logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}")
+ GRPC.logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}")
resp = stub.sum(reqs) # reqs.is_a?(Enumerable)
- logger.info("Answer: #{resp.inspect}")
- logger.info('---------------')
+ GRPC.logger.info("Answer: #{resp.inspect}")
+ GRPC.logger.info('---------------')
end
def do_fib(stub)
- logger.info('server_streamer')
- logger.info('----------------')
+ GRPC.logger.info('server_streamer')
+ GRPC.logger.info('----------------')
req = Math::FibArgs.new(limit: 11)
- logger.info("fib(11): req=#{req.inspect}")
+ GRPC.logger.info("fib(11): req=#{req.inspect}")
resp = stub.fib(req, INFINITE_FUTURE)
resp.each do |r|
- logger.info("Answer: #{r.inspect}")
+ GRPC.logger.info("Answer: #{r.inspect}")
end
- logger.info('----------------')
+ GRPC.logger.info('----------------')
end
def do_div_many(stub)
- logger.info('bidi_streamer')
- logger.info('-------------')
+ GRPC.logger.info('bidi_streamer')
+ GRPC.logger.info('-------------')
reqs = []
reqs << Math::DivArgs.new(dividend: 7, divisor: 3)
reqs << Math::DivArgs.new(dividend: 5, divisor: 2)
reqs << Math::DivArgs.new(dividend: 7, divisor: 2)
- logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}")
+ GRPC.logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}")
resp = stub.div_many(reqs, 10)
resp.each do |r|
- logger.info("Answer: #{r.inspect}")
+ GRPC.logger.info("Answer: #{r.inspect}")
end
- logger.info('----------------')
+ GRPC.logger.info('----------------')
end
def load_test_certs
@@ -132,10 +132,10 @@ def main
p stub_opts
p options['host']
stub = Math::Math::Stub.new(options['host'], **stub_opts)
- logger.info("... connecting securely on #{options['host']}")
+ GRPC.logger.info("... connecting securely on #{options['host']}")
else
stub = Math::Math::Stub.new(options['host'])
- logger.info("... connecting insecurely on #{options['host']}")
+ GRPC.logger.info("... connecting insecurely on #{options['host']}")
end
do_div(stub)
diff --git a/src/ruby/bin/math_server.rb b/src/ruby/bin/math_server.rb
index e46d9c671f..b41ccf6ce1 100755
--- a/src/ruby/bin/math_server.rb
+++ b/src/ruby/bin/math_server.rb
@@ -128,13 +128,13 @@ class Calculator < Math::Math::Service
t = Thread.new do
begin
requests.each do |req|
- logger.info("read #{req.inspect}")
+ GRPC.logger.info("read #{req.inspect}")
resp = Math::DivReply.new(quotient: req.dividend / req.divisor,
remainder: req.dividend % req.divisor)
q.push(resp)
Thread.pass # let the internal Bidi threads run
end
- logger.info('finished reads')
+ GRPC.logger.info('finished reads')
q.push(self)
rescue StandardError => e
q.push(e) # share the exception with the enumerator
@@ -176,10 +176,10 @@ def main
s = GRPC::RpcServer.new
if options['secure']
s.add_http2_port(options['host'], test_server_creds)
- logger.info("... running securely on #{options['host']}")
+ GRPC.logger.info("... running securely on #{options['host']}")
else
s.add_http2_port(options['host'])
- logger.info("... running insecurely on #{options['host']}")
+ GRPC.logger.info("... running insecurely on #{options['host']}")
end
s.handle(Calculator)
diff --git a/src/ruby/bin/noproto_client.rb b/src/ruby/bin/noproto_client.rb
index f3fd110347..390a9c59c3 100755
--- a/src/ruby/bin/noproto_client.rb
+++ b/src/ruby/bin/noproto_client.rb
@@ -94,15 +94,15 @@ def main
p stub_opts
p options['host']
stub = NoProtoStub.new(options['host'], **stub_opts)
- logger.info("... connecting securely on #{options['host']}")
+ GRPC.logger.info("... connecting securely on #{options['host']}")
else
stub = NoProtoStub.new(options['host'])
- logger.info("... connecting insecurely on #{options['host']}")
+ GRPC.logger.info("... connecting insecurely on #{options['host']}")
end
- logger.info('sending a NoProto rpc')
+ GRPC.logger.info('sending a NoProto rpc')
resp = stub.an_rpc(NoProtoMsg.new)
- logger.info("got a response: #{resp}")
+ GRPC.logger.info("got a response: #{resp}")
end
main
diff --git a/src/ruby/bin/noproto_server.rb b/src/ruby/bin/noproto_server.rb
index f71daeadb3..90baaf9a2e 100755
--- a/src/ruby/bin/noproto_server.rb
+++ b/src/ruby/bin/noproto_server.rb
@@ -63,7 +63,7 @@ class NoProto < NoProtoService
end
def an_rpc(req, _call)
- logger.info('echo service received a request')
+ GRPC.logger.info('echo service received a request')
req
end
end
@@ -98,10 +98,10 @@ def main
s = GRPC::RpcServer.new
if options['secure']
s.add_http2_port(options['host'], test_server_creds)
- logger.info("... running securely on #{options['host']}")
+ GRPC.logger.info("... running securely on #{options['host']}")
else
s.add_http2_port(options['host'])
- logger.info("... running insecurely on #{options['host']}")
+ GRPC.logger.info("... running insecurely on #{options['host']}")
end
s.handle(NoProto)
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 947c39cd22..5f7beb5ab1 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -188,7 +188,7 @@ module GRPC
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
- logger.debug("sending #{req}, marshalled? #{marshalled}")
+ GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
if marshalled
payload = req
else
@@ -230,14 +230,14 @@ module GRPC
@call.metadata = batch_result.metadata
@metadata_tag = nil
end
- logger.debug("received req: #{batch_result}")
+ GRPC.logger.debug("received req: #{batch_result}")
unless batch_result.nil? || batch_result.message.nil?
- logger.debug("received req.to_s: #{batch_result.message}")
+ GRPC.logger.debug("received req.to_s: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
- logger.debug("received_req (unmarshalled): #{res.inspect}")
+ GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
- logger.debug('found nil; the final response has been sent')
+ GRPC.logger.debug('found nil; the final response has been sent')
nil
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 4ca3004d6f..67143d40cf 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -115,10 +115,10 @@ module GRPC
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
- logger.debug("each_queued_msg: msg##{count}")
+ GRPC.logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
- logger.debug("each_queued_msg: req = #{req}")
+ GRPC.logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
@@ -134,22 +134,22 @@ module GRPC
begin
count = 0
requests.each do |req|
- logger.debug("bidi-write_loop: #{count}")
+ GRPC.logger.debug("bidi-write_loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
if is_client
- logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
+ GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting")
batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil,
RECV_STATUS_ON_CLIENT => nil)
batch_result.check_status
end
rescue StandardError => e
- logger.warn('bidi-write_loop: failed')
- logger.warn(e)
+ GRPC.logger.warn('bidi-write_loop: failed')
+ GRPC.logger.warn(e)
raise e
end
end
@@ -164,7 +164,7 @@ module GRPC
# queue the initial read before beginning the loop
loop do
- logger.debug("bidi-read_loop: #{count}")
+ GRPC.logger.debug("bidi-read_loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
@@ -172,19 +172,19 @@ module GRPC
# handle the next message
if batch_result.message.nil?
@readq.push(END_OF_READS)
- logger.debug('bidi-read-loop: done reading!')
+ GRPC.logger.debug('bidi-read-loop: done reading!')
break
end
# push the latest read onto the queue and continue reading
- logger.debug("received req: #{batch_result.message}")
+ GRPC.logger.debug("received req: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
@readq.push(res)
end
rescue StandardError => e
- logger.warn('bidi: read_loop failed')
- logger.warn(e)
+ GRPC.logger.warn('bidi: read_loop failed')
+ GRPC.logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
end
end
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 10211ae239..2fd61c5f7e 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -84,22 +84,22 @@ module GRPC
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application error
# code and detail message and some additional app-specific metadata.
- logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
+ GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details, **e.metadata)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
- logger.warn("failed call: #{active_call}\n#{e}")
+ GRPC.logger.warn("failed call: #{active_call}\n#{e}")
rescue Core::OutOfTime
# This is raised when active_call#method.call exceeeds the deadline
# event. Send a status of deadline exceeded
- logger.warn("late call: #{active_call}")
+ GRPC.logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
- logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
- logger.warn(e)
+ GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
+ GRPC.logger.warn(e)
send_status(active_call, UNKNOWN, 'no reason given')
end
@@ -139,8 +139,8 @@ module GRPC
details = 'Not sure why' if details.nil?
active_client.send_status(code, details, code == OK, **kw)
rescue StandardError => e
- logger.warn("Could not send status #{code}:#{details}")
- logger.warn(e)
+ GRPC.logger.warn("Could not send status #{code}:#{details}")
+ GRPC.logger.warn(e)
end
end
end
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index de22466089..665c144432 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -94,7 +94,7 @@ module GRPC
def schedule(*args, &blk)
fail 'already stopped' if @stopped
return if blk.nil?
- logger.info('schedule another job')
+ GRPC.logger.info('schedule another job')
@jobs << [blk, args]
end
@@ -114,14 +114,14 @@ module GRPC
# Stops the jobs in the pool
def stop
- logger.info('stopping, will wait for all the workers to exit')
+ GRPC.logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
@stopped = true
@stop_mutex.synchronize do # wait @keep_alive for works to stop
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
- logger.info('stopped, all workers are shutdown')
+ GRPC.logger.info('stopped, all workers are shutdown')
end
protected
@@ -129,14 +129,14 @@ module GRPC
# Forcibly shutdown any threads that are still alive.
def forcibly_stop_workers
return unless @workers.size > 0
- logger.info("forcibly terminating #{@workers.size} worker(s)")
+ GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
- logger.warn('error while terminating a worker')
- logger.warn(e)
+ GRPC.logger.warn('error while terminating a worker')
+ GRPC.logger.warn(e)
end
end
end
@@ -156,8 +156,8 @@ module GRPC
blk, args = @jobs.pop
blk.call(*args)
rescue StandardError => e
- logger.warn('Error in worker thread')
- logger.warn(e)
+ GRPC.logger.warn('Error in worker thread')
+ GRPC.logger.warn(e)
end
end
end
@@ -365,7 +365,7 @@ module GRPC
# the server to stop.
def run
if rpc_descs.size.zero?
- logger.warn('did not run as no services were present')
+ GRPC.logger.warn('did not run as no services were present')
return
end
@run_mutex.synchronize do
@@ -381,9 +381,9 @@ module GRPC
# Sends UNAVAILABLE if there are too many unprocessed jobs
def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
- logger.info("waiting: #{jobs_count}, max: #{max}")
+ GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
- logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
+ GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::UNAVAILABLE, '')
@@ -394,7 +394,7 @@ module GRPC
def found?(an_rpc)
mth = an_rpc.method.to_sym
return an_rpc if rpc_descs.key?(mth)
- logger.warn("NOT_FOUND: #{an_rpc}")
+ GRPC.logger.warn("NOT_FOUND: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::NOT_FOUND, '')
@@ -434,7 +434,7 @@ module GRPC
return nil unless found?(an_rpc)
# Create the ActiveCall
- logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
+ GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
rpc_desc = rpc_descs[an_rpc.method.to_sym]
ActiveCall.new(an_rpc.call, @cq,
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
@@ -474,7 +474,7 @@ module GRPC
else
handlers[route] = service.method(rpc_name)
end
- logger.info("handling #{route} with #{handlers[route]}")
+ GRPC.logger.info("handling #{route} with #{handlers[route]}")
end
end
end
diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb
index 8ea2c82f17..3b9743ea66 100644
--- a/src/ruby/lib/grpc/generic/service.rb
+++ b/src/ruby/lib/grpc/generic/service.rb
@@ -175,23 +175,23 @@ module GRPC
route = "/#{route_prefix}/#{name}"
if desc.request_response?
define_method(mth_name) do |req, deadline = nil, **kw|
- logger.debug("calling #{@host}:#{route}")
+ GRPC.logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, deadline, **kw)
end
elsif desc.client_streamer?
define_method(mth_name) do |reqs, deadline = nil, **kw|
- logger.debug("calling #{@host}:#{route}")
+ GRPC.logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, deadline, **kw)
end
elsif desc.server_streamer?
define_method(mth_name) do |req, deadline = nil, **kw, &blk|
- logger.debug("calling #{@host}:#{route}")
+ GRPC.logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, deadline, **kw,
&blk)
end
else # is a bidi_stream
define_method(mth_name) do |reqs, deadline = nil, **kw, &blk|
- logger.debug("calling #{@host}:#{route}")
+ GRPC.logger.debug("calling #{@host}:#{route}")
bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw,
&blk)
end
diff --git a/src/ruby/lib/grpc/logconfig.rb b/src/ruby/lib/grpc/logconfig.rb
index f36906fc45..96812170ba 100644
--- a/src/ruby/lib/grpc/logconfig.rb
+++ b/src/ruby/lib/grpc/logconfig.rb
@@ -29,7 +29,10 @@
require 'logging'
-include Logging.globally # logger is accessible everywhere
+# GRPC contains the General RPC module.
+module GRPC
+ extend Logging.globally
+end
Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = :info
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 2cd21a15e3..640b0f656c 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -69,7 +69,7 @@ class EchoService
end
def an_rpc(req, call)
- logger.info('echo service received a request')
+ GRPC.logger.info('echo service received a request')
call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req
@@ -109,7 +109,7 @@ class SlowService
end
def an_rpc(req, call)
- logger.info("starting a slow #{@delay} rpc")
+ GRPC.logger.info("starting a slow #{@delay} rpc")
sleep @delay
@received_md << call.metadata unless call.metadata.nil?
req # send back the req as the response