diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs | 45 |
1 files changed, 23 insertions, 22 deletions
diff --git a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs index 08d9921475..9873dc9c71 100644 --- a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs +++ b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs @@ -1,5 +1,4 @@ #region Copyright notice and license - // Copyright 2015, Google Inc. // All rights reserved. // @@ -28,9 +27,7 @@ // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - #endregion - using System; using Grpc.Core.Internal; @@ -40,32 +37,36 @@ namespace Grpc.Core.Internal /// Observer that writes all arriving messages to a call abstraction (in blocking fashion) /// and then halfcloses the call. Used for server-side call handling. /// </summary> - internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite> - { - readonly AsyncCall<TWrite, TRead> call; + internal class ServerStreamingOutputObserver<TRequest, TResponse> : IObserver<TResponse> + { + readonly AsyncCallServer<TRequest, TResponse> call; - public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call) - { + public ServerStreamingOutputObserver(AsyncCallServer<TRequest, TResponse> call) + { this.call = call; - } + } - public void OnCompleted() - { + public void OnCompleted() + { + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendStatusFromServer(new Status(StatusCode.OK, ""), taskSource.CompletionDelegate); // TODO: how bad is the Wait here? - call.SendStatusFromServerAsync(new Status(StatusCode.OK, "")).Wait(); - } + taskSource.Task.Wait(); + } - public void OnError(Exception error) - { + public void OnError(Exception error) + { // TODO: implement this... - throw new InvalidOperationException("This should never be called."); - } + throw new InvalidOperationException("This should never be called."); + } - public void OnNext(TWrite value) - { + public void OnNext(TResponse value) + { + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendMessage(value, taskSource.CompletionDelegate); // TODO: how bad is the Wait here? - call.SendMessageAsync(value).Wait(); - } - } + taskSource.Task.Wait(); + } + } } |