aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Tim Emiola <tbetbetbe@users.noreply.github.com>2015-04-24 09:48:16 -0400
committerGravatar Tim Emiola <tbetbetbe@users.noreply.github.com>2015-04-24 09:48:16 -0400
commit68180a2b92d4d2c09a098cf6fa75f53590147f35 (patch)
treec1d823c698c37db586fc3d39121690bd8263aa1f
parent3afd92ff511f52db3ecf892d9af65053323c89cb (diff)
parent97e294aadc251c0fac5b417e55f207cedb10561e (diff)
Merge pull request #1358 from jtattermusch/csharp_server_improvements
Improving C# server
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs3
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs10
-rw-r--r--src/csharp/Grpc.Core/Server.cs176
-rw-r--r--src/csharp/Grpc.Examples.MathServer/MathServer.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj4
5 files changed, 107 insertions, 88 deletions
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 14add60c72..c97a3bc2b1 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -33,6 +33,7 @@ using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Grpc.Core;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
@@ -180,7 +181,7 @@ namespace Grpc.Core.Internal
private static void AssertCallOk(GRPCCallError callError)
{
- Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
+ Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
private static uint GetFlags(bool buffered)
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index a59da09822..8080643d8c 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -35,6 +35,7 @@ using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
@@ -105,9 +106,9 @@ namespace Grpc.Core.Internal
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
}
- public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
+ public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
{
- return grpcsharp_server_request_call(this, cq, callback);
+ AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
}
protected override bool ReleaseHandle()
@@ -115,5 +116,10 @@ namespace Grpc.Core.Internal
grpcsharp_server_destroy(handle);
return true;
}
+
+ private static void AssertCallOk(GRPCCallError callError)
+ {
+ Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index f086fa8beb..e686cdddef 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -38,27 +38,29 @@ using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
+using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
- /// Server is implemented only to be able to do
- /// in-process testing.
+ /// A gRPC server.
/// </summary>
public class Server
{
- // TODO: make sure the delegate doesn't get garbage collected while
+ // TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly ServerShutdownCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
- readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
+ readonly object myLock = new object();
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
-
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
+ bool startRequested;
+ bool shutdownRequested;
+
public Server()
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
@@ -66,71 +68,81 @@ namespace Grpc.Core
this.serverShutdownHandler = HandleServerShutdown;
}
- // only call this before Start()
+ /// <summary>
+ /// Adds a service definition to the server. This is how you register
+ /// handlers for a service with the server.
+ /// Only call this before Start().
+ /// </summary>
public void AddServiceDefinition(ServerServiceDefinition serviceDefinition)
{
- foreach (var entry in serviceDefinition.CallHandlers)
+ lock (myLock)
{
- callHandlers.Add(entry.Key, entry.Value);
+ Preconditions.CheckState(!startRequested);
+ foreach (var entry in serviceDefinition.CallHandlers)
+ {
+ callHandlers.Add(entry.Key, entry.Value);
+ }
}
}
- // only call before Start()
+ /// <summary>
+ /// Add a non-secure port on which server should listen.
+ /// Only call this before Start().
+ /// </summary>
public int AddListeningPort(string addr)
{
- return handle.AddListeningPort(addr);
- }
-
- // only call before Start()
- public int AddListeningPort(string addr, ServerCredentials credentials)
- {
- using (var nativeCredentials = credentials.ToNativeCredentials())
+ lock (myLock)
{
- return handle.AddListeningPort(addr, nativeCredentials);
+ Preconditions.CheckState(!startRequested);
+ return handle.AddListeningPort(addr);
}
}
- public void Start()
- {
- handle.Start();
-
- // TODO: this basically means the server is single threaded....
- StartHandlingRpcs();
- }
-
/// <summary>
- /// Requests and handles single RPC call.
+ /// Add a secure port on which server should listen.
+ /// Only call this before Start().
/// </summary>
- internal void RunRpc()
+ public int AddListeningPort(string addr, ServerCredentials credentials)
{
- AllowOneRpc();
-
- try
+ lock (myLock)
{
- var rpcInfo = newRpcQueue.Take();
-
- // Console.WriteLine("Server received RPC " + rpcInfo.Method);
-
- IServerCallHandler callHandler;
- if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
+ Preconditions.CheckState(!startRequested);
+ using (var nativeCredentials = credentials.ToNativeCredentials())
{
- callHandler = new NoSuchMethodCallHandler();
+ return handle.AddListeningPort(addr, nativeCredentials);
}
- callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue());
}
- catch (Exception e)
+ }
+
+ /// <summary>
+ /// Starts the server.
+ /// </summary>
+ public void Start()
+ {
+ lock (myLock)
{
- Console.WriteLine("Exception while handling RPC: " + e);
+ Preconditions.CheckState(!startRequested);
+ startRequested = true;
+
+ handle.Start();
+ AllowOneRpc();
}
}
/// <summary>
/// Requests server shutdown and when there are no more calls being serviced,
- /// cleans up used resources.
+ /// cleans up used resources. The returned task finishes when shutdown procedure
+ /// is complete.
/// </summary>
- /// <returns>The async.</returns>
public async Task ShutdownAsync()
{
+ lock (myLock)
+ {
+ Preconditions.CheckState(startRequested);
+ Preconditions.CheckState(!shutdownRequested);
+ shutdownRequested = true;
+ }
+
handle.ShutdownAndNotify(serverShutdownHandler);
await shutdownTcs.Task;
handle.Dispose();
@@ -152,19 +164,43 @@ namespace Grpc.Core
handle.Dispose();
}
- private async Task StartHandlingRpcs()
+ /// <summary>
+ /// Allows one new RPC call to be received by server.
+ /// </summary>
+ private void AllowOneRpc()
{
- while (true)
+ lock (myLock)
{
- await Task.Factory.StartNew(RunRpc);
+ if (!shutdownRequested)
+ {
+ handle.RequestCall(GetCompletionQueue(), newServerRpcHandler);
+ }
}
}
- private void AllowOneRpc()
+ /// <summary>
+ /// Selects corresponding handler for given call and handles the call.
+ /// </summary>
+ private void InvokeCallHandler(CallSafeHandle call, string method)
{
- AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler));
+ try
+ {
+ IServerCallHandler callHandler;
+ if (!callHandlers.TryGetValue(method, out callHandler))
+ {
+ callHandler = new NoSuchMethodCallHandler();
+ }
+ callHandler.StartCall(method, call, GetCompletionQueue());
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception while handling RPC: " + e);
+ }
}
+ /// <summary>
+ /// Handles the native callback.
+ /// </summary>
private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr)
{
try
@@ -176,13 +212,16 @@ namespace Grpc.Core
// TODO: handle error
}
- var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod());
+ CallSafeHandle call = ctx.GetServerRpcNewCall();
+ string method = ctx.GetServerRpcNewMethod();
// after server shutdown, the callback returns with null call
- if (!rpcInfo.Call.IsInvalid)
+ if (!call.IsInvalid)
{
- newRpcQueue.Add(rpcInfo);
+ Task.Run(() => InvokeCallHandler(call, method));
}
+
+ AllowOneRpc();
}
catch (Exception e)
{
@@ -190,6 +229,10 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Handles native callback.
+ /// </summary>
+ /// <param name="eventPtr"></param>
private void HandleServerShutdown(IntPtr eventPtr)
{
try
@@ -202,42 +245,9 @@ namespace Grpc.Core
}
}
- private static void AssertCallOk(GRPCCallError callError)
- {
- Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
- }
-
private static CompletionQueueSafeHandle GetCompletionQueue()
{
return GrpcEnvironment.ThreadPool.CompletionQueue;
}
-
- private struct NewRpcInfo
- {
- private CallSafeHandle call;
- private string method;
-
- public NewRpcInfo(CallSafeHandle call, string method)
- {
- this.call = call;
- this.method = method;
- }
-
- public CallSafeHandle Call
- {
- get
- {
- return this.call;
- }
- }
-
- public string Method
- {
- get
- {
- return this.method;
- }
- }
- }
}
}
diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
index f7429fb43f..abc7ef05e4 100644
--- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs
+++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
@@ -40,7 +40,7 @@ namespace math
{
public static void Main(string[] args)
{
- String host = "0.0.0.0";
+ string host = "0.0.0.0";
GrpcEnvironment.Initialize();
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 584bf1068d..6ae8041fb7 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -80,5 +80,7 @@
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
- <ItemGroup />
+ <ItemGroup>
+ <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
+ </ItemGroup>
</Project> \ No newline at end of file