aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Makefile8
-rw-r--r--build.json4
-rw-r--r--src/compiler/csharp_generator.cc77
-rw-r--r--src/compiler/generator_helpers.h11
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj7
-rw-r--r--src/csharp/Grpc.Core.Tests/packages.config1
-rw-r--r--src/csharp/Grpc.Core/AsyncClientStreamingCall.cs18
-rw-r--r--src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs27
-rw-r--r--src/csharp/Grpc.Core/AsyncServerStreamingCall.cs10
-rw-r--r--src/csharp/Grpc.Core/Call.cs2
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj3
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.nuspec1
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamReader.cs8
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamWriter.cs3
-rw-r--r--src/csharp/Grpc.Core/IClientStreamWriter.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs29
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs11
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRequestStream.cs29
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs3
-rw-r--r--src/csharp/Grpc.Core/ServerCallContext.cs1
-rw-r--r--src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs26
-rw-r--r--src/csharp/Grpc.Core/packages.config1
-rw-r--r--src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj4
-rw-r--r--src/csharp/Grpc.Examples.Tests/packages.config1
-rw-r--r--src/csharp/Grpc.Examples/Grpc.Examples.csproj3
-rw-r--r--src/csharp/Grpc.Examples/MathGrpc.cs44
-rw-r--r--src/csharp/Grpc.Examples/packages.config1
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj3
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs43
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TestGrpc.cs70
-rw-r--r--src/csharp/Grpc.IntegrationTesting/packages.config1
-rw-r--r--src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m33
-rw-r--r--test/cpp/qps/driver.cc12
-rw-r--r--tools/run_tests/tests.json36
35 files changed, 323 insertions, 217 deletions
diff --git a/Makefile b/Makefile
index d163a1286a..9742e691af 100644
--- a/Makefile
+++ b/Makefile
@@ -2049,6 +2049,10 @@ test_c: buildtests_c
test_cxx: buildtests_cxx
$(E) "[RUN] Testing async_end2end_test"
$(Q) $(BINDIR)/$(CONFIG)/async_end2end_test || ( echo test async_end2end_test failed ; exit 1 )
+ $(E) "[RUN] Testing async_streaming_ping_pong_test"
+ $(Q) $(BINDIR)/$(CONFIG)/async_streaming_ping_pong_test || ( echo test async_streaming_ping_pong_test failed ; exit 1 )
+ $(E) "[RUN] Testing async_unary_ping_pong_test"
+ $(Q) $(BINDIR)/$(CONFIG)/async_unary_ping_pong_test || ( echo test async_unary_ping_pong_test failed ; exit 1 )
$(E) "[RUN] Testing channel_arguments_test"
$(Q) $(BINDIR)/$(CONFIG)/channel_arguments_test || ( echo test channel_arguments_test failed ; exit 1 )
$(E) "[RUN] Testing cli_call_test"
@@ -2067,6 +2071,10 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/mock_test || ( echo test mock_test failed ; exit 1 )
$(E) "[RUN] Testing status_test"
$(Q) $(BINDIR)/$(CONFIG)/status_test || ( echo test status_test failed ; exit 1 )
+ $(E) "[RUN] Testing sync_streaming_ping_pong_test"
+ $(Q) $(BINDIR)/$(CONFIG)/sync_streaming_ping_pong_test || ( echo test sync_streaming_ping_pong_test failed ; exit 1 )
+ $(E) "[RUN] Testing sync_unary_ping_pong_test"
+ $(Q) $(BINDIR)/$(CONFIG)/sync_unary_ping_pong_test || ( echo test sync_unary_ping_pong_test failed ; exit 1 )
$(E) "[RUN] Testing thread_pool_test"
$(Q) $(BINDIR)/$(CONFIG)/thread_pool_test || ( echo test thread_pool_test failed ; exit 1 )
$(E) "[RUN] Testing thread_stress_test"
diff --git a/build.json b/build.json
index 18c0eb1211..8a7e65b99a 100644
--- a/build.json
+++ b/build.json
@@ -1813,7 +1813,6 @@
{
"name": "async_streaming_ping_pong_test",
"build": "test",
- "run": false,
"language": "c++",
"src": [
"test/cpp/qps/async_streaming_ping_pong_test.cc"
@@ -1831,7 +1830,6 @@
{
"name": "async_unary_ping_pong_test",
"build": "test",
- "run": false,
"language": "c++",
"src": [
"test/cpp/qps/async_unary_ping_pong_test.cc"
@@ -2207,7 +2205,6 @@
{
"name": "sync_streaming_ping_pong_test",
"build": "test",
- "run": false,
"language": "c++",
"src": [
"test/cpp/qps/sync_streaming_ping_pong_test.cc"
@@ -2225,7 +2222,6 @@
{
"name": "sync_unary_ping_pong_test",
"build": "test",
- "run": false,
"language": "c++",
"src": [
"test/cpp/qps/sync_unary_ping_pong_test.cc"
diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc
index 82dd06bcec..5dd078b303 100644
--- a/src/compiler/csharp_generator.cc
+++ b/src/compiler/csharp_generator.cc
@@ -51,20 +51,49 @@ using grpc_generator::METHODTYPE_NO_STREAMING;
using grpc_generator::METHODTYPE_CLIENT_STREAMING;
using grpc_generator::METHODTYPE_SERVER_STREAMING;
using grpc_generator::METHODTYPE_BIDI_STREAMING;
+using grpc_generator::StringReplace;
using std::map;
using std::vector;
namespace grpc_csharp_generator {
namespace {
-std::string GetCSharpNamespace(const FileDescriptor* file) {
- // TODO(jtattermusch): this should be based on csharp_namespace option
+// TODO(jtattermusch): make GetFileNamespace part of libprotoc public API.
+// NOTE: Implementation needs to match exactly to GetFileNamespace
+// defined in csharp_helpers.h in protoc csharp plugin.
+// We cannot reference it directly because google3 protobufs
+// don't have a csharp protoc plugin.
+std::string GetFileNamespace(const FileDescriptor* file) {
+ if (file->options().has_csharp_namespace()) {
+ return file->options().csharp_namespace();
+ }
return file->package();
}
-std::string GetMessageType(const Descriptor* message) {
- // TODO(jtattermusch): this has to match with C# protobuf generator
- return message->name();
+std::string ToCSharpName(const std::string& name, const FileDescriptor* file) {
+ std::string result = GetFileNamespace(file);
+ if (result != "") {
+ result += '.';
+ }
+ std::string classname;
+ if (file->package().empty()) {
+ classname = name;
+ } else {
+ // Strip the proto package from full_name since we've replaced it with
+ // the C# namespace.
+ classname = name.substr(file->package().size() + 1);
+ }
+ result += StringReplace(classname, ".", ".Types.", false);
+ return "global::" + result;
+}
+
+// TODO(jtattermusch): make GetClassName part of libprotoc public API.
+// NOTE: Implementation needs to match exactly to GetClassName
+// defined in csharp_helpers.h in protoc csharp plugin.
+// We cannot reference it directly because google3 protobufs
+// don't have a csharp protoc plugin.
+std::string GetClassName(const Descriptor* message) {
+ return ToCSharpName(message->full_name(), message->file());
}
std::string GetServiceClassName(const ServiceDescriptor* service) {
@@ -114,22 +143,22 @@ std::string GetMethodRequestParamMaybe(const MethodDescriptor *method) {
if (method->client_streaming()) {
return "";
}
- return GetMessageType(method->input_type()) + " request, ";
+ return GetClassName(method->input_type()) + " request, ";
}
std::string GetMethodReturnTypeClient(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
- return "Task<" + GetMessageType(method->output_type()) + ">";
+ return "Task<" + GetClassName(method->output_type()) + ">";
case METHODTYPE_CLIENT_STREAMING:
- return "AsyncClientStreamingCall<" + GetMessageType(method->input_type())
- + ", " + GetMessageType(method->output_type()) + ">";
+ return "AsyncClientStreamingCall<" + GetClassName(method->input_type())
+ + ", " + GetClassName(method->output_type()) + ">";
case METHODTYPE_SERVER_STREAMING:
- return "AsyncServerStreamingCall<" + GetMessageType(method->output_type())
+ return "AsyncServerStreamingCall<" + GetClassName(method->output_type())
+ ">";
case METHODTYPE_BIDI_STREAMING:
- return "AsyncDuplexStreamingCall<" + GetMessageType(method->input_type())
- + ", " + GetMessageType(method->output_type()) + ">";
+ return "AsyncDuplexStreamingCall<" + GetClassName(method->input_type())
+ + ", " + GetClassName(method->output_type()) + ">";
}
GOOGLE_LOG(FATAL)<< "Can't get here.";
return "";
@@ -139,10 +168,10 @@ std::string GetMethodRequestParamServer(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
case METHODTYPE_SERVER_STREAMING:
- return GetMessageType(method->input_type()) + " request";
+ return GetClassName(method->input_type()) + " request";
case METHODTYPE_CLIENT_STREAMING:
case METHODTYPE_BIDI_STREAMING:
- return "IAsyncStreamReader<" + GetMessageType(method->input_type())
+ return "IAsyncStreamReader<" + GetClassName(method->input_type())
+ "> requestStream";
}
GOOGLE_LOG(FATAL)<< "Can't get here.";
@@ -153,7 +182,7 @@ std::string GetMethodReturnTypeServer(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
case METHODTYPE_CLIENT_STREAMING:
- return "Task<" + GetMessageType(method->output_type()) + ">";
+ return "Task<" + GetClassName(method->output_type()) + ">";
case METHODTYPE_SERVER_STREAMING:
case METHODTYPE_BIDI_STREAMING:
return "Task";
@@ -169,7 +198,7 @@ std::string GetMethodResponseStreamMaybe(const MethodDescriptor *method) {
return "";
case METHODTYPE_SERVER_STREAMING:
case METHODTYPE_BIDI_STREAMING:
- return ", IServerStreamWriter<" + GetMessageType(method->output_type())
+ return ", IServerStreamWriter<" + GetClassName(method->output_type())
+ "> responseStream";
}
GOOGLE_LOG(FATAL)<< "Can't get here.";
@@ -202,7 +231,7 @@ void GenerateMarshallerFields(Printer* out, const ServiceDescriptor *service) {
out->Print(
"static readonly Marshaller<$type$> $fieldname$ = Marshallers.Create((arg) => arg.ToByteArray(), $type$.ParseFrom);\n",
"fieldname", GetMarshallerFieldName(message), "type",
- GetMessageType(message));
+ GetClassName(message));
}
out->Print("\n");
}
@@ -211,8 +240,8 @@ void GenerateStaticMethodField(Printer* out, const MethodDescriptor *method) {
out->Print(
"static readonly Method<$request$, $response$> $fieldname$ = new Method<$request$, $response$>(\n",
"fieldname", GetMethodFieldName(method), "request",
- GetMessageType(method->input_type()), "response",
- GetMessageType(method->output_type()));
+ GetClassName(method->input_type()), "response",
+ GetClassName(method->output_type()));
out->Indent();
out->Indent();
out->Print("$methodtype$,\n", "methodtype",
@@ -242,8 +271,8 @@ void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) {
out->Print(
"$response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken));\n",
"methodname", method->name(), "request",
- GetMessageType(method->input_type()), "response",
- GetMessageType(method->output_type()));
+ GetClassName(method->input_type()), "response",
+ GetClassName(method->output_type()));
}
std::string method_name = method->name();
@@ -310,8 +339,8 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
out->Print(
"public $response$ $methodname$($request$ request, CancellationToken token = default(CancellationToken))\n",
"methodname", method->name(), "request",
- GetMessageType(method->input_type()), "response",
- GetMessageType(method->output_type()));
+ GetClassName(method->input_type()), "response",
+ GetClassName(method->output_type()));
out->Print("{\n");
out->Indent();
out->Print("var call = CreateCall($servicenamefield$, $methodfield$);\n",
@@ -466,7 +495,7 @@ grpc::string GetServices(const FileDescriptor *file) {
// TODO(jtattermusch): add using for protobuf message classes
out.Print("\n");
- out.Print("namespace $namespace$ {\n", "namespace", GetCSharpNamespace(file));
+ out.Print("namespace $namespace$ {\n", "namespace", GetFileNamespace(file));
out.Indent();
for (int i = 0; i < file->service_count(); i++) {
GenerateService(&out, file->service(i));
diff --git a/src/compiler/generator_helpers.h b/src/compiler/generator_helpers.h
index a232fcc954..7bdaff1c9b 100644
--- a/src/compiler/generator_helpers.h
+++ b/src/compiler/generator_helpers.h
@@ -60,21 +60,26 @@ inline grpc::string StripProto(grpc::string filename) {
}
inline grpc::string StringReplace(grpc::string str, const grpc::string &from,
- const grpc::string &to) {
+ const grpc::string &to, bool replace_all) {
size_t pos = 0;
- for (;;) {
+ do {
pos = str.find(from, pos);
if (pos == grpc::string::npos) {
break;
}
str.replace(pos, from.length(), to);
pos += to.length();
- }
+ } while(replace_all);
return str;
}
+inline grpc::string StringReplace(grpc::string str, const grpc::string &from,
+ const grpc::string &to) {
+ return StringReplace(str, from, to, true);
+}
+
inline std::vector<grpc::string> tokenize(const grpc::string &input,
const grpc::string &delimiters) {
std::vector<grpc::string> tokens;
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index eac8d16fb1..62cb443272 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -34,6 +34,9 @@
<HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath>
</Reference>
<Reference Include="System" />
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
@@ -57,7 +60,5 @@
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
- <ItemGroup>
- <Folder Include="Internal\" />
- </ItemGroup>
+ <ItemGroup />
</Project> \ No newline at end of file
diff --git a/src/csharp/Grpc.Core.Tests/packages.config b/src/csharp/Grpc.Core.Tests/packages.config
index c714ef3a23..28af8d78c6 100644
--- a/src/csharp/Grpc.Core.Tests/packages.config
+++ b/src/csharp/Grpc.Core.Tests/packages.config
@@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
index b95776f66d..8cdc1c895b 100644
--- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
@@ -41,8 +41,6 @@ namespace Grpc.Core
/// Return type for client streaming calls.
/// </summary>
public sealed class AsyncClientStreamingCall<TRequest, TResponse>
- where TRequest : class
- where TResponse : class
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> result;
@@ -54,22 +52,6 @@ namespace Grpc.Core
}
/// <summary>
- /// Writes a request to RequestStream.
- /// </summary>
- public Task Write(TRequest message)
- {
- return requestStream.Write(message);
- }
-
- /// <summary>
- /// Closes the RequestStream.
- /// </summary>
- public Task Close()
- {
- return requestStream.Close();
- }
-
- /// <summary>
/// Asynchronous call result.
/// </summary>
public Task<TResponse> Result
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index ee05437416..0d13a3d052 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -41,8 +41,6 @@ namespace Grpc.Core
/// Return type for bidirectional streaming calls.
/// </summary>
public sealed class AsyncDuplexStreamingCall<TRequest, TResponse>
- where TRequest : class
- where TResponse : class
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
@@ -54,31 +52,6 @@ namespace Grpc.Core
}
/// <summary>
- /// Writes a request to RequestStream.
- /// </summary>
- public Task Write(TRequest message)
- {
- return requestStream.Write(message);
- }
-
- /// <summary>
- /// Closes the RequestStream.
- /// </summary>
- public Task Close()
- {
- return requestStream.Close();
- }
-
- /// <summary>
- /// Reads a response from ResponseStream.
- /// </summary>
- /// <returns></returns>
- public Task<TResponse> ReadNext()
- {
- return responseStream.ReadNext();
- }
-
- /// <summary>
/// Async stream to read streaming responses.
/// </summary>
public IAsyncStreamReader<TResponse> ResponseStream
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index 73b9614985..6a258d132c 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -41,7 +41,6 @@ namespace Grpc.Core
/// Return type for server streaming calls.
/// </summary>
public sealed class AsyncServerStreamingCall<TResponse>
- where TResponse : class
{
readonly IAsyncStreamReader<TResponse> responseStream;
@@ -51,15 +50,6 @@ namespace Grpc.Core
}
/// <summary>
- /// Reads the next response from ResponseStream
- /// </summary>
- /// <returns></returns>
- public Task<TResponse> ReadNext()
- {
- return responseStream.ReadNext();
- }
-
- /// <summary>
/// Async stream to read streaming responses.
/// </summary>
public IAsyncStreamReader<TResponse> ResponseStream
diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs
index d1ee59ff0a..37b452f020 100644
--- a/src/csharp/Grpc.Core/Call.cs
+++ b/src/csharp/Grpc.Core/Call.cs
@@ -41,8 +41,6 @@ namespace Grpc.Core
/// Abstraction of a call to be invoked on a client.
/// </summary>
public class Call<TRequest, TResponse>
- where TRequest : class
- where TResponse : class
{
readonly string name;
readonly Marshaller<TRequest> requestMarshaller;
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index f5f2cf5f22..6b4345cbe1 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -37,6 +37,9 @@
<Reference Include="System.Collections.Immutable">
<HintPath>..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncDuplexStreamingCall.cs" />
diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec
index e54908cb8b..5269881afa 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.nuspec
+++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec
@@ -16,6 +16,7 @@
<tags>gRPC RPC Protocol HTTP/2</tags>
<dependencies>
<dependency id="Microsoft.Bcl.Immutable" version="1.0.34" />
+ <dependency id="Ix-Async" version="1.2.3" />
<dependency id="grpc.native.csharp_ext" version="0.8.0.0" />
</dependencies>
</metadata>
diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
index 699741cd05..95b674c018 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
@@ -43,13 +43,7 @@ namespace Grpc.Core
/// A stream of messages to be read.
/// </summary>
/// <typeparam name="T"></typeparam>
- public interface IAsyncStreamReader<T>
- where T : class
+ public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse>
{
- /// <summary>
- /// Reads a single message. Returns null if the last message was already read.
- /// A following read can only be started when the previous one finishes.
- /// </summary>
- Task<T> ReadNext();
}
}
diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
index 4bd8bfb8df..644f445401 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
@@ -44,10 +44,9 @@ namespace Grpc.Core
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IAsyncStreamWriter<T>
- where T : class
{
/// <summary>
- /// Writes a single message. Only one write can be pending at a time.
+ /// Writes a single asynchronously. Only one write can be pending at a time.
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
Task Write(T message);
diff --git a/src/csharp/Grpc.Core/IClientStreamWriter.cs b/src/csharp/Grpc.Core/IClientStreamWriter.cs
index 0847a928e6..cc76d1369d 100644
--- a/src/csharp/Grpc.Core/IClientStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IClientStreamWriter.cs
@@ -44,11 +44,10 @@ namespace Grpc.Core
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IClientStreamWriter<T> : IAsyncStreamWriter<T>
- where T : class
{
/// <summary>
- /// Closes the stream. Can only be called once there is no pending write. No writes should follow calling this.
+ /// Completes/closes the stream. Can only be called once there is no pending write. No writes should follow calling this.
/// </summary>
- Task Close();
+ Task Complete();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 1697058732..b9fc10cd16 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -38,8 +38,6 @@ namespace Grpc.Core.Internal
/// Writes requests asynchronously to an underlying AsyncCall object.
/// </summary>
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
- where TRequest : class
- where TResponse : class
{
readonly AsyncCall<TRequest, TResponse> call;
@@ -55,7 +53,7 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
- public Task Close()
+ public Task Complete()
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index b2378cade6..6c44521038 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCall<TRequest, TResponse> call;
+ TResponse current;
public ClientResponseStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
}
- public Task<TResponse> ReadNext()
+ public TResponse Current
{
+ get
+ {
+ if (current == null)
+ {
+ throw new InvalidOperationException("No current element is available.");
+ }
+ return current;
+ }
+ }
+
+ public async Task<bool> MoveNext(CancellationToken token)
+ {
+ if (token != CancellationToken.None)
+ {
+ throw new InvalidOperationException("Cancellation of individual reads is not supported.");
+ }
var taskSource = new AsyncCompletionTaskSource<TResponse>();
call.StartReadMessage(taskSource.CompletionDelegate);
- return taskSource.Task;
+ var result = await taskSource.Task;
+ this.current = result;
+ return result != null;
+ }
+
+ public void Dispose()
+ {
+ // TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 95d8e97869..20ac46c234 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@@ -71,9 +72,10 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
- var request = await requestStream.ReadNext();
+ Preconditions.CheckArgument(await requestStream.MoveNext());
+ var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- Preconditions.CheckArgument(await requestStream.ReadNext() == null);
+ Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
var result = await handler(context, request);
await responseStream.Write(result);
@@ -122,9 +124,10 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
- var request = await requestStream.ReadNext();
+ Preconditions.CheckArgument(await requestStream.MoveNext());
+ var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- Preconditions.CheckArgument(await requestStream.ReadNext() == null);
+ Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
await handler(context, request, responseStream);
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index d9ee0c815b..3fccb88abb 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
+ TRequest current;
public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call)
{
this.call = call;
}
- public Task<TRequest> ReadNext()
+ public TRequest Current
{
+ get
+ {
+ if (current == null)
+ {
+ throw new InvalidOperationException("No current element is available.");
+ }
+ return current;
+ }
+ }
+
+ public async Task<bool> MoveNext(CancellationToken token)
+ {
+ if (token != CancellationToken.None)
+ {
+ throw new InvalidOperationException("Cancellation of individual reads is not supported.");
+ }
var taskSource = new AsyncCompletionTaskSource<TRequest>();
call.StartReadMessage(taskSource.CompletionDelegate);
- return taskSource.Task;
+ var result = await taskSource.Task;
+ this.current = result;
+ return result != null;
+ }
+
+ public void Dispose()
+ {
+ // TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index 731ea2be81..7a1c016ae2 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -39,9 +39,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
- // TODO: we need to make sure that the delegates are not collected before invoked.
- //internal delegate void ServerShutdownCallbackDelegate(bool success);
-
/// <summary>
/// grpc_server from grpc/grpc.h
/// </summary>
diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs
index e873b3e88a..bc9a499c51 100644
--- a/src/csharp/Grpc.Core/ServerCallContext.cs
+++ b/src/csharp/Grpc.Core/ServerCallContext.cs
@@ -42,7 +42,6 @@ namespace Grpc.Core
/// </summary>
public sealed class ServerCallContext
{
-
// TODO(jtattermusch): add cancellationToken
// TODO(jtattermusch): add deadline info
diff --git a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
index f915155f8a..a4f8989b30 100644
--- a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
+++ b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
@@ -49,14 +49,9 @@ namespace Grpc.Core.Utils
public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
where T : class
{
- while (true)
+ while (await streamReader.MoveNext())
{
- var elem = await streamReader.ReadNext();
- if (elem == null)
- {
- break;
- }
- await asyncAction(elem);
+ await asyncAction(streamReader.Current);
}
}
@@ -67,32 +62,27 @@ namespace Grpc.Core.Utils
where T : class
{
var result = new List<T>();
- while (true)
+ while (await streamReader.MoveNext())
{
- var elem = await streamReader.ReadNext();
- if (elem == null)
- {
- break;
- }
- result.Add(elem);
+ result.Add(streamReader.Current);
}
return result;
}
/// <summary>
/// Writes all elements from given enumerable to the stream.
- /// Closes the stream afterwards unless close = false.
+ /// Completes the stream afterwards unless close = false.
/// </summary>
- public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool close = true)
+ public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
where T : class
{
foreach (var element in elements)
{
await streamWriter.Write(element);
}
- if (close)
+ if (complete)
{
- await streamWriter.Close();
+ await streamWriter.Complete();
}
}
diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config
index 71967de56e..fb7eaaeeda 100644
--- a/src/csharp/Grpc.Core/packages.config
+++ b/src/csharp/Grpc.Core/packages.config
@@ -2,5 +2,6 @@
<packages>
<package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" />
<package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl.Immutable" version="1.0.34" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
index 87ccf07dd8..6e84add42b 100644
--- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
+++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
@@ -37,6 +37,10 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async, Version=1.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
diff --git a/src/csharp/Grpc.Examples.Tests/packages.config b/src/csharp/Grpc.Examples.Tests/packages.config
index 4d6ec63b3c..cc6e9af40f 100644
--- a/src/csharp/Grpc.Examples.Tests/packages.config
+++ b/src/csharp/Grpc.Examples.Tests/packages.config
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Examples/Grpc.Examples.csproj b/src/csharp/Grpc.Examples/Grpc.Examples.csproj
index 2c5019c214..5ce490f403 100644
--- a/src/csharp/Grpc.Examples/Grpc.Examples.csproj
+++ b/src/csharp/Grpc.Examples/Grpc.Examples.csproj
@@ -35,6 +35,9 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs
index 2546fd220d..b9efc44e8c 100644
--- a/src/csharp/Grpc.Examples/MathGrpc.cs
+++ b/src/csharp/Grpc.Examples/MathGrpc.cs
@@ -12,30 +12,30 @@ namespace math {
{
static readonly string __ServiceName = "math.Math";
- static readonly Marshaller<DivArgs> __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom);
- static readonly Marshaller<DivReply> __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom);
- static readonly Marshaller<FibArgs> __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), FibArgs.ParseFrom);
- static readonly Marshaller<Num> __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom);
+ static readonly Marshaller<global::math.DivArgs> __Marshaller_DivArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivArgs.ParseFrom);
+ static readonly Marshaller<global::math.DivReply> __Marshaller_DivReply = Marshallers.Create((arg) => arg.ToByteArray(), global::math.DivReply.ParseFrom);
+ static readonly Marshaller<global::math.FibArgs> __Marshaller_FibArgs = Marshallers.Create((arg) => arg.ToByteArray(), global::math.FibArgs.ParseFrom);
+ static readonly Marshaller<global::math.Num> __Marshaller_Num = Marshallers.Create((arg) => arg.ToByteArray(), global::math.Num.ParseFrom);
- static readonly Method<DivArgs, DivReply> __Method_Div = new Method<DivArgs, DivReply>(
+ static readonly Method<global::math.DivArgs, global::math.DivReply> __Method_Div = new Method<global::math.DivArgs, global::math.DivReply>(
MethodType.Unary,
"Div",
__Marshaller_DivArgs,
__Marshaller_DivReply);
- static readonly Method<DivArgs, DivReply> __Method_DivMany = new Method<DivArgs, DivReply>(
+ static readonly Method<global::math.DivArgs, global::math.DivReply> __Method_DivMany = new Method<global::math.DivArgs, global::math.DivReply>(
MethodType.DuplexStreaming,
"DivMany",
__Marshaller_DivArgs,
__Marshaller_DivReply);
- static readonly Method<FibArgs, Num> __Method_Fib = new Method<FibArgs, Num>(
+ static readonly Method<global::math.FibArgs, global::math.Num> __Method_Fib = new Method<global::math.FibArgs, global::math.Num>(
MethodType.ServerStreaming,
"Fib",
__Marshaller_FibArgs,
__Marshaller_Num);
- static readonly Method<Num, Num> __Method_Sum = new Method<Num, Num>(
+ static readonly Method<global::math.Num, global::math.Num> __Method_Sum = new Method<global::math.Num, global::math.Num>(
MethodType.ClientStreaming,
"Sum",
__Marshaller_Num,
@@ -44,20 +44,20 @@ namespace math {
// client-side stub interface
public interface IMathClient
{
- DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken));
- Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken));
- AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken));
- AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken));
- AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken));
+ global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken));
+ Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken));
+ AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken));
+ AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken));
+ AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken));
}
// server-side interface
public interface IMath
{
- Task<DivReply> Div(ServerCallContext context, DivArgs request);
- Task DivMany(ServerCallContext context, IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream);
- Task Fib(ServerCallContext context, FibArgs request, IServerStreamWriter<Num> responseStream);
- Task<Num> Sum(ServerCallContext context, IAsyncStreamReader<Num> requestStream);
+ Task<global::math.DivReply> Div(ServerCallContext context, global::math.DivArgs request);
+ Task DivMany(ServerCallContext context, IAsyncStreamReader<global::math.DivArgs> requestStream, IServerStreamWriter<global::math.DivReply> responseStream);
+ Task Fib(ServerCallContext context, global::math.FibArgs request, IServerStreamWriter<global::math.Num> responseStream);
+ Task<global::math.Num> Sum(ServerCallContext context, IAsyncStreamReader<global::math.Num> requestStream);
}
// client stub
@@ -69,27 +69,27 @@ namespace math {
public MathClient(Channel channel, StubConfiguration config) : base(channel, config)
{
}
- public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken))
+ public global::math.DivReply Div(global::math.DivArgs request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Div);
return Calls.BlockingUnaryCall(call, request, token);
}
- public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken))
+ public Task<global::math.DivReply> DivAsync(global::math.DivArgs request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Div);
return Calls.AsyncUnaryCall(call, request, token);
}
- public AsyncDuplexStreamingCall<DivArgs, DivReply> DivMany(CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_DivMany);
return Calls.AsyncDuplexStreamingCall(call, token);
}
- public AsyncServerStreamingCall<Num> Fib(FibArgs request, CancellationToken token = default(CancellationToken))
+ public AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Fib);
return Calls.AsyncServerStreamingCall(call, request, token);
}
- public AsyncClientStreamingCall<Num, Num> Sum(CancellationToken token = default(CancellationToken))
+ public AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Sum);
return Calls.AsyncClientStreamingCall(call, token);
diff --git a/src/csharp/Grpc.Examples/packages.config b/src/csharp/Grpc.Examples/packages.config
index 51c17bcd5e..4c8d60fa62 100644
--- a/src/csharp/Grpc.Examples/packages.config
+++ b/src/csharp/Grpc.Examples/packages.config
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 1ca3dd24e1..b3a0a2917b 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -54,6 +54,9 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
<Reference Include="System.Net" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Net.Http.Extensions">
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 02f8a369de..d907699698 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -256,48 +256,45 @@ namespace Grpc.IntegrationTesting
var call = client.FullDuplexCall();
- StreamingOutputCallResponse response;
-
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(31415, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9))
.SetPayload(CreateZerosPayload(8)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(9, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653))
.SetPayload(CreateZerosPayload(1828)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(2653, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979))
.SetPayload(CreateZerosPayload(45904)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(58979, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length);
- await call.RequestStream.Close();
+ await call.RequestStream.Complete();
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(null, response);
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
Console.WriteLine("Passed!");
}).Wait();
@@ -309,7 +306,7 @@ namespace Grpc.IntegrationTesting
{
Console.WriteLine("running empty_stream");
var call = client.FullDuplexCall();
- await call.Close();
+ await call.RequestStream.Complete();
var responseList = await call.ResponseStream.ToList();
Assert.AreEqual(0, responseList.Count);
@@ -392,22 +389,20 @@ namespace Grpc.IntegrationTesting
var cts = new CancellationTokenSource();
var call = client.FullDuplexCall(cts.Token);
- StreamingOutputCallResponse response;
-
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(31415, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
cts.Cancel();
try
{
- response = await call.ResponseStream.ReadNext();
+ await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException e)
diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
index 679aafb57a..ee077f9f56 100644
--- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
+++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
@@ -12,45 +12,45 @@ namespace grpc.testing {
{
static readonly string __ServiceName = "grpc.testing.TestService";
- static readonly Marshaller<Empty> __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), Empty.ParseFrom);
- static readonly Marshaller<SimpleRequest> __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), SimpleRequest.ParseFrom);
- static readonly Marshaller<SimpleResponse> __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), SimpleResponse.ParseFrom);
- static readonly Marshaller<StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallRequest.ParseFrom);
- static readonly Marshaller<StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingOutputCallResponse.ParseFrom);
- static readonly Marshaller<StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallRequest.ParseFrom);
- static readonly Marshaller<StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), StreamingInputCallResponse.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.Empty> __Marshaller_Empty = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.Empty.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.SimpleRequest> __Marshaller_SimpleRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleRequest.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.SimpleResponse> __Marshaller_SimpleResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.SimpleResponse.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallRequest.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingOutputCallResponse.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallRequest.ParseFrom);
+ static readonly Marshaller<global::grpc.testing.StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = Marshallers.Create((arg) => arg.ToByteArray(), global::grpc.testing.StreamingInputCallResponse.ParseFrom);
- static readonly Method<Empty, Empty> __Method_EmptyCall = new Method<Empty, Empty>(
+ static readonly Method<global::grpc.testing.Empty, global::grpc.testing.Empty> __Method_EmptyCall = new Method<global::grpc.testing.Empty, global::grpc.testing.Empty>(
MethodType.Unary,
"EmptyCall",
__Marshaller_Empty,
__Marshaller_Empty);
- static readonly Method<SimpleRequest, SimpleResponse> __Method_UnaryCall = new Method<SimpleRequest, SimpleResponse>(
+ static readonly Method<global::grpc.testing.SimpleRequest, global::grpc.testing.SimpleResponse> __Method_UnaryCall = new Method<global::grpc.testing.SimpleRequest, global::grpc.testing.SimpleResponse>(
MethodType.Unary,
"UnaryCall",
__Marshaller_SimpleRequest,
__Marshaller_SimpleResponse);
- static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
+ static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>(
MethodType.ServerStreaming,
"StreamingOutputCall",
__Marshaller_StreamingOutputCallRequest,
__Marshaller_StreamingOutputCallResponse);
- static readonly Method<StreamingInputCallRequest, StreamingInputCallResponse> __Method_StreamingInputCall = new Method<StreamingInputCallRequest, StreamingInputCallResponse>(
+ static readonly Method<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> __Method_StreamingInputCall = new Method<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse>(
MethodType.ClientStreaming,
"StreamingInputCall",
__Marshaller_StreamingInputCallRequest,
__Marshaller_StreamingInputCallResponse);
- static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_FullDuplexCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
+ static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_FullDuplexCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>(
MethodType.DuplexStreaming,
"FullDuplexCall",
__Marshaller_StreamingOutputCallRequest,
__Marshaller_StreamingOutputCallResponse);
- static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> __Method_HalfDuplexCall = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
+ static readonly Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> __Method_HalfDuplexCall = new Method<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse>(
MethodType.DuplexStreaming,
"HalfDuplexCall",
__Marshaller_StreamingOutputCallRequest,
@@ -59,25 +59,25 @@ namespace grpc.testing {
// client-side stub interface
public interface ITestServiceClient
{
- Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken));
- Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken));
- SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken));
- Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken));
- AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken));
- AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
- AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken));
- AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken));
+ global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken));
+ Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken));
+ global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken));
+ Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken));
+ AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken));
+ AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
+ AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken));
+ AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken));
}
// server-side interface
public interface ITestService
{
- Task<Empty> EmptyCall(ServerCallContext context, Empty request);
- Task<SimpleResponse> UnaryCall(ServerCallContext context, SimpleRequest request);
- Task StreamingOutputCall(ServerCallContext context, StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
- Task<StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<StreamingInputCallRequest> requestStream);
- Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
- Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
+ Task<global::grpc.testing.Empty> EmptyCall(ServerCallContext context, global::grpc.testing.Empty request);
+ Task<global::grpc.testing.SimpleResponse> UnaryCall(ServerCallContext context, global::grpc.testing.SimpleRequest request);
+ Task StreamingOutputCall(ServerCallContext context, global::grpc.testing.StreamingOutputCallRequest request, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream);
+ Task<global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingInputCallRequest> requestStream);
+ Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream);
+ Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream);
}
// client stub
@@ -89,42 +89,42 @@ namespace grpc.testing {
public TestServiceClient(Channel channel, StubConfiguration config) : base(channel, config)
{
}
- public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken))
+ public global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_EmptyCall);
return Calls.BlockingUnaryCall(call, request, token);
}
- public Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken))
+ public Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_EmptyCall);
return Calls.AsyncUnaryCall(call, request, token);
}
- public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken))
+ public global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_UnaryCall);
return Calls.BlockingUnaryCall(call, request, token);
}
- public Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken))
+ public Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_UnaryCall);
return Calls.AsyncUnaryCall(call, request, token);
}
- public AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken))
+ public AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_StreamingOutputCall);
return Calls.AsyncServerStreamingCall(call, request, token);
}
- public AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
+ public AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_StreamingInputCall);
return Calls.AsyncClientStreamingCall(call, token);
}
- public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_FullDuplexCall);
return Calls.AsyncDuplexStreamingCall(call, token);
}
- public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_HalfDuplexCall);
return Calls.AsyncDuplexStreamingCall(call, token);
diff --git a/src/csharp/Grpc.IntegrationTesting/packages.config b/src/csharp/Grpc.IntegrationTesting/packages.config
index e33b6e3e46..291b7b8599 100644
--- a/src/csharp/Grpc.IntegrationTesting/packages.config
+++ b/src/csharp/Grpc.IntegrationTesting/packages.config
@@ -3,6 +3,7 @@
<package id="Google.Apis.Auth" version="1.9.1" targetFramework="net45" />
<package id="Google.Apis.Core" version="1.9.1" targetFramework="net45" />
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl" version="1.1.9" targetFramework="net45" />
<package id="Microsoft.Bcl.Async" version="1.0.168" targetFramework="net45" />
<package id="Microsoft.Bcl.Build" version="1.0.14" targetFramework="net45" />
diff --git a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
index 7e063fddb4..8e0e11d23d 100644
--- a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
+++ b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
@@ -269,4 +269,37 @@
[self waitForExpectationsWithTimeout:1 handler:nil];
}
+- (void)testCancelAfterFirstResponseRPC {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterFirstResponse"];
+
+ // A buffered pipe to which we write a single value but never close
+ GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
+
+ __block BOOL receivedResponse = NO;
+
+ id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
+ requestedResponseSize:@31415];
+
+ [requestsBuffer writeValue:request];
+
+ __block ProtoRPC *call = [_service RPCToFullDuplexCallWithRequestsWriter:requestsBuffer
+ handler:^(BOOL done,
+ RMTStreamingOutputCallResponse *response,
+ NSError *error) {
+ if (receivedResponse) {
+ XCTAssert(done, @"Unexpected extra response %@", response);
+ XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
+ [expectation fulfill];
+ } else {
+ XCTAssertNil(error, @"Finished with unexpected error: %@", error);
+ XCTAssertFalse(done, @"Finished without response");
+ XCTAssertNotNil(response);
+ receivedResponse = YES;
+ [call cancel];
+ }
+ }];
+ [call start];
+ [self waitForExpectationsWithTimeout:4 handler:nil];
+}
+
@end
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 9f7d3b56a4..6959f980ae 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -44,9 +44,11 @@
#include <thread>
#include <deque>
#include <vector>
+#include <unistd.h>
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
using std::list;
using std::thread;
@@ -96,6 +98,16 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config,
// Spawn some local workers if desired
vector<unique_ptr<QpsWorker>> local_workers;
for (int i = 0; i < abs(spawn_local_worker_count); i++) {
+ // act as if we're a new test -- gets a good rng seed
+ static bool called_init = false;
+ if (!called_init) {
+ char args_buf[100];
+ strcpy(args_buf, "some-benchmark");
+ char *args[] = {args_buf};
+ grpc_test_init(1, args);
+ called_init = true;
+ }
+
int driver_port = grpc_pick_unused_port_or_die();
int benchmark_port = grpc_pick_unused_port_or_die();
local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index 30ddb2bee2..8ea3b934d1 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -599,6 +599,24 @@
{
"flaky": false,
"language": "c++",
+ "name": "async_streaming_ping_pong_test",
+ "platforms": [
+ "windows",
+ "posix"
+ ]
+ },
+ {
+ "flaky": false,
+ "language": "c++",
+ "name": "async_unary_ping_pong_test",
+ "platforms": [
+ "windows",
+ "posix"
+ ]
+ },
+ {
+ "flaky": false,
+ "language": "c++",
"name": "channel_arguments_test",
"platforms": [
"windows",
@@ -680,6 +698,24 @@
{
"flaky": false,
"language": "c++",
+ "name": "sync_streaming_ping_pong_test",
+ "platforms": [
+ "windows",
+ "posix"
+ ]
+ },
+ {
+ "flaky": false,
+ "language": "c++",
+ "name": "sync_unary_ping_pong_test",
+ "platforms": [
+ "windows",
+ "posix"
+ ]
+ },
+ {
+ "flaky": false,
+ "language": "c++",
"name": "thread_pool_test",
"platforms": [
"windows",