aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs45
1 files changed, 23 insertions, 22 deletions
diff --git a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
index 08d9921475..9873dc9c71 100644
--- a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs
@@ -1,5 +1,4 @@
#region Copyright notice and license
-
// Copyright 2015, Google Inc.
// All rights reserved.
//
@@ -28,9 +27,7 @@
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
#endregion
-
using System;
using Grpc.Core.Internal;
@@ -40,32 +37,36 @@ namespace Grpc.Core.Internal
/// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
/// and then halfcloses the call. Used for server-side call handling.
/// </summary>
- internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite>
- {
- readonly AsyncCall<TWrite, TRead> call;
+ internal class ServerStreamingOutputObserver<TRequest, TResponse> : IObserver<TResponse>
+ {
+ readonly AsyncCallServer<TRequest, TResponse> call;
- public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call)
- {
+ public ServerStreamingOutputObserver(AsyncCallServer<TRequest, TResponse> call)
+ {
this.call = call;
- }
+ }
- public void OnCompleted()
- {
+ public void OnCompleted()
+ {
+ var taskSource = new AsyncCompletionTaskSource();
+ call.StartSendStatusFromServer(new Status(StatusCode.OK, ""), taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
- call.SendStatusFromServerAsync(new Status(StatusCode.OK, "")).Wait();
- }
+ taskSource.Task.Wait();
+ }
- public void OnError(Exception error)
- {
+ public void OnError(Exception error)
+ {
// TODO: implement this...
- throw new InvalidOperationException("This should never be called.");
- }
+ throw new InvalidOperationException("This should never be called.");
+ }
- public void OnNext(TWrite value)
- {
+ public void OnNext(TResponse value)
+ {
+ var taskSource = new AsyncCompletionTaskSource();
+ call.StartSendMessage(value, taskSource.CompletionDelegate);
// TODO: how bad is the Wait here?
- call.SendMessageAsync(value).Wait();
- }
- }
+ taskSource.Task.Wait();
+ }
+ }
}