aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/GrpcCore/Internal
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-05 21:13:02 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-05 21:13:02 -0800
commitc230a7451dbfc0627bdd3173118a91b075856619 (patch)
tree58b339eacaf811a608fb09e66d5e879d3a8eecb8 /src/csharp/GrpcCore/Internal
parent3054756462f41e6811d7866bd231982f0613279f (diff)
parent3d6fa14f1a94fcb5c9b917b2e6e76cea6d13f2bb (diff)
Merge github.com:google/grpc into async-api-new
Diffstat (limited to 'src/csharp/GrpcCore/Internal')
-rw-r--r--src/csharp/GrpcCore/Internal/AsyncCall.cs8
-rw-r--r--src/csharp/GrpcCore/Internal/ServerSafeHandle.cs9
-rw-r--r--src/csharp/GrpcCore/Internal/ServerWritingObserver.cs38
-rw-r--r--src/csharp/GrpcCore/Internal/StreamingInputObserver.cs2
4 files changed, 54 insertions, 3 deletions
diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs
index e83ca0eaa9..c38363bb2b 100644
--- a/src/csharp/GrpcCore/Internal/AsyncCall.cs
+++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs
@@ -86,6 +86,14 @@ namespace Google.GRPC.Core.Internal
return StartRead().Task;
}
+ public Task Halfclosed
+ {
+ get
+ {
+ return halfcloseTcs.Task;
+ }
+ }
+
public Task<Status> Finished
{
get
diff --git a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
index 0d38bce63e..08d4cf0192 100644
--- a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
+++ b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
@@ -30,8 +30,8 @@ namespace Google.GRPC.Core.Internal
[DllImport("libgrpc.so")]
static extern void grpc_server_shutdown(ServerSafeHandle server);
- [DllImport("libgrpc.so")]
- static extern void grpc_server_shutdown_and_notify(ServerSafeHandle server, IntPtr tag);
+ [DllImport("libgrpc.so", EntryPoint = "grpc_server_shutdown_and_notify")]
+ static extern void grpc_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
[DllImport("libgrpc.so")]
static extern void grpc_server_destroy(IntPtr server);
@@ -62,6 +62,11 @@ namespace Google.GRPC.Core.Internal
grpc_server_shutdown(this);
}
+ public void ShutdownAndNotify(EventCallbackDelegate callback)
+ {
+ grpc_server_shutdown_and_notify_CALLBACK(this, callback);
+ }
+
public GRPCCallError RequestCall(EventCallbackDelegate callback)
{
return grpc_server_request_call_old_CALLBACK(this, callback);
diff --git a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs b/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs
new file mode 100644
index 0000000000..2b46e9c53d
--- /dev/null
+++ b/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs
@@ -0,0 +1,38 @@
+using System;
+using Google.GRPC.Core.Internal;
+
+namespace Google.GRPC.Core.Internal
+{
+ /// <summary>
+ /// Observer that writes all arriving messages to a call abstraction (in blocking fashion)
+ /// and then halfcloses the call. Used for server-side call handling.
+ /// </summary>
+ internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite>
+ {
+ readonly AsyncCall<TWrite, TRead> call;
+
+ public ServerWritingObserver(AsyncCall<TWrite, TRead> call)
+ {
+ this.call = call;
+ }
+
+ public void OnCompleted()
+ {
+ // TODO: how bad is the Wait here?
+ call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
+ }
+
+ public void OnError(Exception error)
+ {
+ // TODO: handle this...
+ throw new InvalidOperationException("This should never be called.");
+ }
+
+ public void OnNext(TWrite value)
+ {
+ // TODO: how bad is the Wait here?
+ call.WriteAsync(value).Wait();
+ }
+ }
+}
+
diff --git a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs b/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs
index d483e53a2d..c5de979351 100644
--- a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs
+++ b/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs
@@ -1,7 +1,7 @@
using System;
using Google.GRPC.Core.Internal;
-namespace Google.GRPC.Core
+namespace Google.GRPC.Core.Internal
{
internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
{