aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-05-31 13:59:37 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2016-05-31 13:59:37 -0700
commit8bec6f6a55afaa1e321f29bf816d2a010f510b3c (patch)
tree525e06b564b9d23d8340e25e7ddb9915fec0b1e5 /src/csharp/Grpc.Core/Internal
parent582f4350ed755aac0b07f12b499ad18f86f2a1b7 (diff)
parent0d6196025e62aea5aabc6341459f2c370e264230 (diff)
Merge branch 'master' of github.com:grpc/grpc into lb_pollset_propagation
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs102
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs81
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs36
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs103
-rw-r--r--src/csharp/Grpc.Core/Internal/CallError.cs (renamed from src/csharp/Grpc.Core/Internal/Enums.cs)39
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs26
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs9
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientSideStatus.cs70
-rw-r--r--src/csharp/Grpc.Core/Internal/ClockType.cs53
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs16
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs72
-rw-r--r--src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs91
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs34
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRpcNew.cs (renamed from src/csharp/Grpc.Core/Internal/AsyncCompletion.cs)75
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs27
-rw-r--r--src/csharp/Grpc.Core/Internal/Timespec.cs37
20 files changed, 502 insertions, 407 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 55351869b5..895be690a5 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -32,12 +32,7 @@
#endregion
using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
using System.Threading.Tasks;
-using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils;
@@ -57,9 +52,11 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
+ // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls.
// Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+ // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
// Response headers set here once received.
TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
@@ -67,7 +64,7 @@ namespace Grpc.Core.Internal
ClientSideStatus? finishedStatus;
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
- : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
+ : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
@@ -144,7 +141,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@@ -171,7 +168,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
readingDone = true;
@@ -195,7 +192,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
@@ -220,7 +217,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
@@ -232,11 +229,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming request. Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
{
- StartSendMessageInternal(msg, writeFlags, completionDelegate);
+ return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@@ -250,29 +246,32 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends halfclose, indicating client is done with streaming requests.
/// Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendCloseFromClientAsync()
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: true);
+ GrpcPreconditions.CheckState(started);
- if (!disposed && !finished)
+ var earlyResult = CheckSendPreconditionsClientSide();
+ if (earlyResult != null)
{
- call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
+ return earlyResult;
}
- else
+
+ if (disposed || finished)
{
// In case the call has already been finished by the serverside,
- // the halfclose has already been done implicitly, so we only
- // emit the notification for the completion delegate.
- Task.Run(() => HandleSendCloseFromClientFinished(true));
+ // the halfclose has already been done implicitly, so just return
+ // completed task here.
+ halfcloseRequested = true;
+ return Task.FromResult<object>(null);
}
+ call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
halfcloseRequested = true;
- sendCompletionDelegate = completionDelegate;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
@@ -342,6 +341,45 @@ namespace Grpc.Core.Internal
get { return true; }
}
+ protected override Task CheckSendAllowedOrEarlyResult()
+ {
+ var earlyResult = CheckSendPreconditionsClientSide();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
+
+ if (finishedStatus.HasValue)
+ {
+ // throwing RpcException if we already received status on client
+ // side makes the most sense.
+ // Note that this throws even for StatusCode.OK.
+ // Writing after the call has finished is not a programming error because server can close
+ // the call anytime, so don't throw directly, but let the write task finish with an error.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetException(new RpcException(finishedStatus.Value.Status));
+ return tcs.Task;
+ }
+
+ return null;
+ }
+
+ private Task CheckSendPreconditionsClientSide()
+ {
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
+
+ if (cancelRequested)
+ {
+ // Return a cancelled task.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetCanceled();
+ return tcs.Task;
+ }
+
+ return null;
+ }
+
private void Initialize(CompletionQueueSafeHandle cq)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
@@ -368,7 +406,7 @@ namespace Grpc.Core.Internal
var credentials = details.Options.Credentials;
using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
{
- var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry,
+ var result = details.Channel.Handle.CreateCall(
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
return result;
@@ -400,6 +438,7 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
+ // TODO(jtattermusch): handle success==false
responseHeadersTcs.SetResult(responseHeaders);
}
@@ -443,19 +482,6 @@ namespace Grpc.Core.Internal
}
}
- protected override void CheckSendingAllowed(bool allowFinished)
- {
- base.CheckSendingAllowed(true);
-
- // throwing RpcException if we already received status on client
- // side makes the most sense.
- // Note that this throws even for StatusCode.OK.
- if (!allowFinished && finishedStatus.HasValue)
- {
- throw new RpcException(finishedStatus.Value.Status);
- }
- }
-
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 4de23706b2..cb8366c216 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -58,7 +58,6 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
- protected readonly GrpcEnvironment environment;
protected readonly object myLock = new object();
protected INativeCall call;
@@ -67,8 +66,8 @@ namespace Grpc.Core.Internal
protected bool started;
protected bool cancelRequested;
- protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
+ protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
@@ -78,11 +77,10 @@ namespace Grpc.Core.Internal
protected bool initialMetadataSent;
protected long streamingWritesCounter; // Number of streaming send operations started so far.
- public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
+ public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = GrpcPreconditions.CheckNotNull(serializer);
this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
- this.environment = GrpcPreconditions.CheckNotNull(environment);
}
/// <summary>
@@ -128,28 +126,31 @@ namespace Grpc.Core.Internal
/// <summary>
/// Initiates sending a message. Only one send operation can be active at a time.
- /// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)
{
byte[] payload = UnsafeSerialize(msg);
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: false);
+ GrpcPreconditions.CheckState(started);
+ var earlyResult = CheckSendAllowedOrEarlyResult();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
- sendCompletionDelegate = completionDelegate;
initialMetadataSent = true;
streamingWritesCounter++;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
/// <summary>
/// Initiates reading a message. Only one read operation can be active at a time.
- /// completionDelegate is invoked upon completion.
/// </summary>
protected Task<TRead> ReadMessageInternalAsync()
{
@@ -159,7 +160,7 @@ namespace Grpc.Core.Internal
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
- // and maintain its state.
+ // and maintains its state.
GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
return streamingReadTcs.Task;
}
@@ -183,7 +184,7 @@ namespace Grpc.Core.Internal
{
if (!disposed && call != null)
{
- bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
+ bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
@@ -213,24 +214,11 @@ namespace Grpc.Core.Internal
{
}
- protected virtual void CheckSendingAllowed(bool allowFinished)
- {
- GrpcPreconditions.CheckState(started);
- CheckNotCancelled();
- GrpcPreconditions.CheckState(!disposed || allowFinished);
-
- GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
- GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished.");
- GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
- }
-
- protected void CheckNotCancelled()
- {
- if (cancelRequested)
- {
- throw new OperationCanceledException("Remote call has been cancelled.");
- }
- }
+ /// <summary>
+ /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send
+ /// logic by directly returning the write operation result task. Normally, null is returned.
+ /// </summary>
+ protected abstract Task CheckSendAllowedOrEarlyResult();
protected byte[] UnsafeSerialize(TWrite msg)
{
@@ -259,39 +247,27 @@ namespace Grpc.Core.Internal
}
}
- protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error)
- {
- try
- {
- completionDelegate(value, error);
- }
- catch (Exception e)
- {
- Logger.Error(e, "Exception occured while invoking completion delegate.");
- }
- }
-
/// <summary>
/// Handles send completion.
/// </summary>
protected void HandleSendFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
+ TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
+ origTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed"));
+ origTcs.SetException(new InvalidOperationException("Send failed"));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ origTcs.SetResult(null);
}
}
@@ -300,22 +276,23 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendCloseFromClientFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
+ TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
+ origTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed."));
+ // TODO(jtattermusch): this method is same as HandleSendFinished (only the error message differs).
+ origTcs.SetException(new InvalidOperationException("Sending close from client has failed."));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ origTcs.SetResult(null);
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index b1566b44a7..56c23ba3ef 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -51,14 +51,14 @@ namespace Grpc.Core.Internal
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly Server server;
- public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment)
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer)
{
this.server = GrpcPreconditions.CheckNotNull(server);
}
- public void Initialize(CallSafeHandle call)
+ public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)
{
- call.Initialize(environment.CompletionRegistry, environment.CompletionQueue);
+ call.Initialize(completionQueue);
server.AddCallReference(this);
InitializeInternal(call);
@@ -91,11 +91,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming response. Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags)
{
- StartSendMessageInternal(msg, writeFlags, completionDelegate);
+ return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@@ -110,20 +109,22 @@ namespace Grpc.Core.Internal
/// Initiates sending a initial metadata.
/// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
/// to make things simpler.
- /// completionDelegate is invoked upon completion.
/// </summary>
- public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendInitialMetadataAsync(Metadata headers)
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(headers, "metadata");
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
- CheckSendingAllowed(allowFinished: false);
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ var earlyResult = CheckSendAllowedOrEarlyResult();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
@@ -131,7 +132,8 @@ namespace Grpc.Core.Internal
}
this.initialMetadataSent = true;
- sendCompletionDelegate = completionDelegate;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
@@ -196,6 +198,16 @@ namespace Grpc.Core.Internal
server.RemoveCallReference(this);
}
+ protected override Task CheckSendAllowedOrEarlyResult()
+ {
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
+ GrpcPreconditions.CheckState(!finished, "Already finished.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
+ GrpcPreconditions.CheckState(!disposed);
+
+ return null;
+ }
+
/// <summary>
/// Handles the server side close completion.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index 66d2a66f99..c28a6f64d3 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -120,107 +120,4 @@ namespace Grpc.Core.Internal
return true;
}
}
-
- /// <summary>
- /// Status + metadata received on client side when call finishes.
- /// (when receive_status_on_client operation finishes).
- /// </summary>
- internal struct ClientSideStatus
- {
- readonly Status status;
- readonly Metadata trailers;
-
- public ClientSideStatus(Status status, Metadata trailers)
- {
- this.status = status;
- this.trailers = trailers;
- }
-
- public Status Status
- {
- get
- {
- return this.status;
- }
- }
-
- public Metadata Trailers
- {
- get
- {
- return this.trailers;
- }
- }
- }
-
- /// <summary>
- /// Details of a newly received RPC.
- /// </summary>
- internal struct ServerRpcNew
- {
- readonly Server server;
- readonly CallSafeHandle call;
- readonly string method;
- readonly string host;
- readonly Timespec deadline;
- readonly Metadata requestMetadata;
-
- public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
- {
- this.server = server;
- this.call = call;
- this.method = method;
- this.host = host;
- this.deadline = deadline;
- this.requestMetadata = requestMetadata;
- }
-
- public Server Server
- {
- get
- {
- return this.server;
- }
- }
-
- public CallSafeHandle Call
- {
- get
- {
- return this.call;
- }
- }
-
- public string Method
- {
- get
- {
- return this.method;
- }
- }
-
- public string Host
- {
- get
- {
- return this.host;
- }
- }
-
- public Timespec Deadline
- {
- get
- {
- return this.deadline;
- }
- }
-
- public Metadata RequestMetadata
- {
- get
- {
- return this.requestMetadata;
- }
- }
- }
}
diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/CallError.cs
index 74f86d2a30..541575f5e6 100644
--- a/src/csharp/Grpc.Core/Internal/Enums.cs
+++ b/src/csharp/Grpc.Core/Internal/CallError.cs
@@ -40,7 +40,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// grpc_call_error from grpc/grpc.h
/// </summary>
- internal enum GRPCCallError
+ internal enum CallError
{
/* everything went ok */
OK = 0,
@@ -70,42 +70,9 @@ namespace Grpc.Core.Internal
/// <summary>
/// Checks the call API invocation's result is OK.
/// </summary>
- public static void CheckOk(this GRPCCallError callError)
+ public static void CheckOk(this CallError callError)
{
- GrpcPreconditions.CheckState(callError == GRPCCallError.OK, "Call error: " + callError);
+ GrpcPreconditions.CheckState(callError == CallError.OK, "Call error: " + callError);
}
}
-
- /// <summary>
- /// grpc_completion_type from grpc/grpc.h
- /// </summary>
- internal enum GRPCCompletionType
- {
- /* Shutting down */
- Shutdown,
-
- /* No event before timeout */
- Timeout,
-
- /* operation completion */
- OpComplete
- }
-
- /// <summary>
- /// gpr_clock_type from grpc/support/time.h
- /// </summary>
- internal enum GPRClockType
- {
- /* Monotonic clock */
- Monotonic,
-
- /* Realtime clock */
- Realtime,
-
- /* Precise clock good for performance profiling. */
- Precise,
-
- /* Timespan - the distance between two time points */
- Timespan
- }
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 244b97d4a4..82361f5797 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -47,16 +47,14 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
const uint GRPC_WRITE_BUFFER_HINT = 1;
- CompletionRegistry completionRegistry;
CompletionQueueSafeHandle completionQueue;
private CallSafeHandle()
{
}
- public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue)
+ public void Initialize(CompletionQueueSafeHandle completionQueue)
{
- this.completionRegistry = completionRegistry;
this.completionQueue = completionQueue;
}
@@ -70,7 +68,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@@ -90,7 +88,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
}
@@ -100,7 +98,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
}
@@ -110,7 +108,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
}
@@ -120,7 +118,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
}
@@ -130,7 +128,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
}
@@ -142,7 +140,7 @@ namespace Grpc.Core.Internal
{
var ctx = BatchContextSafeHandle.Create();
var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
@@ -153,7 +151,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
Native.grpcsharp_call_recv_message(this, ctx).CheckOk();
}
}
@@ -163,7 +161,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
}
@@ -173,7 +171,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
Native.grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
}
@@ -183,7 +181,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 1dbd1f4e34..62864dff0c 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
- public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
+ public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
{
using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall"))
{
@@ -72,7 +72,7 @@ namespace Grpc.Core.Internal
{
result.SetCredentials(credentials);
}
- result.Initialize(registry, cq);
+ result.Initialize(cq);
return result;
}
}
@@ -82,11 +82,10 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
}
- public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq,
- CompletionRegistry completionRegistry, BatchCompletionDelegate callback)
+ public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 013f00ff6f..924de028f5 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -50,16 +50,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TRequest message)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendMessageAsync(message, GetWriteFlags());
}
public Task CompleteAsync()
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendCloseFromClient(taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendCloseFromClientAsync();
}
public WriteOptions WriteOptions
diff --git a/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs b/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs
new file mode 100644
index 0000000000..5727e3f11f
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs
@@ -0,0 +1,70 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using Grpc.Core;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Status + metadata received on client side when call finishes.
+ /// (when receive_status_on_client operation finishes).
+ /// </summary>
+ internal struct ClientSideStatus
+ {
+ readonly Status status;
+ readonly Metadata trailers;
+
+ public ClientSideStatus(Status status, Metadata trailers)
+ {
+ this.status = status;
+ this.trailers = trailers;
+ }
+
+ public Status Status
+ {
+ get
+ {
+ return this.status;
+ }
+ }
+
+ public Metadata Trailers
+ {
+ get
+ {
+ return this.trailers;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/ClockType.cs b/src/csharp/Grpc.Core/Internal/ClockType.cs
new file mode 100644
index 0000000000..57533c9d2f
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/ClockType.cs
@@ -0,0 +1,53 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// gpr_clock_type from grpc/support/time.h
+ /// </summary>
+ internal enum ClockType
+ {
+ /* Monotonic clock */
+ Monotonic,
+
+ /* Realtime clock */
+ Realtime,
+
+ /* Precise clock good for performance profiling. */
+ Precise,
+
+ /* Timespan - the distance between two time points */
+ Timespan
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
index 288680792a..a78e9b70f3 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
@@ -44,7 +44,7 @@ namespace Grpc.Core.Internal
{
static readonly NativeMethods Native = NativeMethods.Get();
- public GRPCCompletionType type;
+ public CompletionType type;
public int success;
public IntPtr tag;
@@ -55,5 +55,20 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_sizeof_grpc_event();
}
}
+
+ /// <summary>
+ /// grpc_completion_type from grpc/grpc.h
+ /// </summary>
+ internal enum CompletionType
+ {
+ /* Shutting down */
+ Shutdown,
+
+ /* No event before timeout */
+ Timeout,
+
+ /* operation completion */
+ OpComplete
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 91364cdc70..46f5624223 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -45,6 +45,7 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
AtomicCounter shutdownRefcount = new AtomicCounter(1);
+ CompletionRegistry completionRegistry;
private CompletionQueueSafeHandle()
{
@@ -53,7 +54,13 @@ namespace Grpc.Core.Internal
public static CompletionQueueSafeHandle Create()
{
return Native.grpcsharp_completion_queue_create();
+ }
+ public static CompletionQueueSafeHandle Create(CompletionRegistry completionRegistry)
+ {
+ var cq = Native.grpcsharp_completion_queue_create();
+ cq.completionRegistry = completionRegistry;
+ return cq;
}
public CompletionQueueEvent Next()
@@ -83,6 +90,15 @@ namespace Grpc.Core.Internal
DecrementShutdownRefcount();
}
+ /// <summary>
+ /// Completion registry associated with this completion queue.
+ /// Doesn't need to be set if only using Pluck() operations.
+ /// </summary>
+ public CompletionRegistry CompletionRegistry
+ {
+ get { return completionRegistry; }
+ }
+
protected override bool ReleaseHandle()
{
Native.grpcsharp_completion_queue_destroy(handle);
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index 4b7124ee74..4de543bef7 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -33,15 +33,15 @@
using System;
using System.Collections.Generic;
-using System.Runtime.InteropServices;
+using System.Linq;
using System.Threading;
-using System.Threading.Tasks;
using Grpc.Core.Logging;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// Pool of threads polling on the same completion queue.
+ /// Pool of threads polling on a set of completions queues.
/// </summary>
internal class GrpcThreadPool
{
@@ -51,25 +51,31 @@ namespace Grpc.Core.Internal
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
+ readonly int completionQueueCount;
- CompletionQueueSafeHandle cq;
+ IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
- public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
+ /// <summary>
+ /// Creates a thread pool threads polling on a set of completions queues.
+ /// </summary>
+ /// <param name="environment">Environment.</param>
+ /// <param name="poolSize">Pool size.</param>
+ /// <param name="completionQueueCount">Completion queue count.</param>
+ public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
{
this.environment = environment;
this.poolSize = poolSize;
+ this.completionQueueCount = completionQueueCount;
+ GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
+ "Thread pool size cannot be smaller than the number of completion queues used.");
}
public void Start()
{
lock (myLock)
{
- if (cq != null)
- {
- throw new InvalidOperationException("Already started.");
- }
-
- cq = CompletionQueueSafeHandle.Create();
+ GrpcPreconditions.CheckState(completionQueues == null, "Already started.");
+ completionQueues = CreateCompletionQueueList(environment, completionQueueCount);
for (int i = 0; i < poolSize; i++)
{
@@ -82,49 +88,60 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
- cq.Shutdown();
+ foreach (var cq in completionQueues)
+ {
+ cq.Shutdown();
+ }
+
foreach (var thread in threads)
{
thread.Join();
}
- cq.Dispose();
+ foreach (var cq in completionQueues)
+ {
+ cq.Dispose();
+ }
}
}
- internal CompletionQueueSafeHandle CompletionQueue
+ internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
- return cq;
+ return completionQueues;
}
}
- private Thread CreateAndStartThread(int i)
+ private Thread CreateAndStartThread(int threadIndex)
{
- var thread = new Thread(new ThreadStart(RunHandlerLoop));
+ var cqIndex = threadIndex % completionQueues.Count;
+ var cq = completionQueues.ElementAt(cqIndex);
+
+ var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
thread.IsBackground = false;
+ thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start();
- thread.Name = "grpc " + i;
+
return thread;
}
/// <summary>
/// Body of the polling thread.
/// </summary>
- private void RunHandlerLoop()
+ private void RunHandlerLoop(CompletionQueueSafeHandle cq)
{
CompletionQueueEvent ev;
do
{
ev = cq.Next();
- if (ev.type == GRPCCompletionType.OpComplete)
+ if (ev.type == CompletionQueueEvent.CompletionType.OpComplete)
{
bool success = (ev.success != 0);
IntPtr tag = ev.tag;
try
{
- var callback = environment.CompletionRegistry.Extract(tag);
+ var callback = cq.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
@@ -133,7 +150,18 @@ namespace Grpc.Core.Internal
}
}
}
- while (ev.type != GRPCCompletionType.Shutdown);
+ while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
+ }
+
+ private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)
+ {
+ var list = new List<CompletionQueueSafeHandle>();
+ for (int i = 0; i < completionQueueCount; i++)
+ {
+ var completionRegistry = new CompletionRegistry(environment);
+ list.Add(CompletionQueueSafeHandle.Create(completionRegistry));
+ }
+ return list.AsReadOnly();
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
index 25735d5262..dc9f62fdab 100644
--- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
@@ -50,6 +50,11 @@ namespace Grpc.Core.Internal
{
using (Profilers.ForCurrentThread().NewScope("MetadataArraySafeHandle.Create"))
{
+ if (metadata.Count == 0)
+ {
+ return new MetadataArraySafeHandle();
+ }
+
// TODO(jtattermusch): we might wanna check that the metadata is readonly
var metadataArray = Native.grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
for (int i = 0; i < metadata.Count; i++)
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index c277c73ef0..65607ed120 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -137,6 +137,7 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_server_credentials_release_delegate grpcsharp_server_credentials_release;
public readonly Delegates.grpcsharp_server_create_delegate grpcsharp_server_create;
+ public readonly Delegates.grpcsharp_server_register_completion_queue_delegate grpcsharp_server_register_completion_queue;
public readonly Delegates.grpcsharp_server_add_insecure_http2_port_delegate grpcsharp_server_add_insecure_http2_port;
public readonly Delegates.grpcsharp_server_add_secure_http2_port_delegate grpcsharp_server_add_secure_http2_port;
public readonly Delegates.grpcsharp_server_start_delegate grpcsharp_server_start;
@@ -244,6 +245,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = GetMethodDelegate<Delegates.grpcsharp_server_credentials_release_delegate>(library);
this.grpcsharp_server_create = GetMethodDelegate<Delegates.grpcsharp_server_create_delegate>(library);
+ this.grpcsharp_server_register_completion_queue = GetMethodDelegate<Delegates.grpcsharp_server_register_completion_queue_delegate>(library);
this.grpcsharp_server_add_insecure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_insecure_http2_port_delegate>(library);
this.grpcsharp_server_add_secure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_secure_http2_port_delegate>(library);
this.grpcsharp_server_start = GetMethodDelegate<Delegates.grpcsharp_server_start_delegate>(library);
@@ -348,6 +350,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = PInvokeMethods.grpcsharp_server_credentials_release;
this.grpcsharp_server_create = PInvokeMethods.grpcsharp_server_create;
+ this.grpcsharp_server_register_completion_queue = PInvokeMethods.grpcsharp_server_register_completion_queue;
this.grpcsharp_server_add_insecure_http2_port = PInvokeMethods.grpcsharp_server_add_insecure_http2_port;
this.grpcsharp_server_add_secure_http2_port = PInvokeMethods.grpcsharp_server_add_secure_http2_port;
this.grpcsharp_server_start = PInvokeMethods.grpcsharp_server_start;
@@ -418,33 +421,33 @@ namespace Grpc.Core.Internal
public delegate CallCredentialsSafeHandle grpcsharp_composite_call_credentials_create_delegate(CallCredentialsSafeHandle creds1, CallCredentialsSafeHandle creds2);
public delegate void grpcsharp_call_credentials_release_delegate(IntPtr credentials);
- public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call);
- public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description);
- public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_cancel_delegate(CallSafeHandle call);
+ public delegate CallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description);
+ public delegate CallError grpcsharp_call_start_unary_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
- public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
- public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
- public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
- public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_message_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
- public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
- public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_start_serverside_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_start_serverside_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_send_initial_metadata_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_initial_metadata_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
- public delegate GRPCCallError grpcsharp_call_set_credentials_delegate(CallSafeHandle call, CallCredentialsSafeHandle credentials);
+ public delegate CallError grpcsharp_call_set_credentials_delegate(CallSafeHandle call, CallCredentialsSafeHandle credentials);
public delegate CStringSafeHandle grpcsharp_call_get_peer_delegate(CallSafeHandle call);
public delegate void grpcsharp_call_destroy_delegate(IntPtr call);
@@ -493,23 +496,24 @@ namespace Grpc.Core.Internal
public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth);
public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials);
- public delegate ServerSafeHandle grpcsharp_server_create_delegate(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
+ public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args);
+ public delegate void grpcsharp_server_register_completion_queue_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq);
public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr);
public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server);
- public delegate GRPCCallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+ public delegate CallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
public delegate void grpcsharp_server_cancel_all_calls_delegate(ServerSafeHandle server);
public delegate void grpcsharp_server_shutdown_and_notify_callback_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
public delegate void grpcsharp_server_destroy_delegate(IntPtr server);
- public delegate Timespec gprsharp_now_delegate(GPRClockType clockType);
- public delegate Timespec gprsharp_inf_future_delegate(GPRClockType clockType);
- public delegate Timespec gprsharp_inf_past_delegate(GPRClockType clockType);
+ public delegate Timespec gprsharp_now_delegate(ClockType clockType);
+ public delegate Timespec gprsharp_inf_future_delegate(ClockType clockType);
+ public delegate Timespec gprsharp_inf_past_delegate(ClockType clockType);
- public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, GPRClockType targetClock);
+ public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, ClockType targetClock);
public delegate int gprsharp_sizeof_timespec_delegate();
- public delegate GRPCCallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
+ public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
public delegate IntPtr grpcsharp_test_nop_delegate(IntPtr ptr);
}
@@ -587,59 +591,59 @@ namespace Grpc.Core.Internal
// CallSafeHandle
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
+ public static extern CallError grpcsharp_call_cancel(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
+ public static extern CallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_unary(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_send_message(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_recv_message(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_serverside(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_set_credentials(CallSafeHandle call, CallCredentialsSafeHandle credentials);
+ public static extern CallError grpcsharp_call_set_credentials(CallSafeHandle call, CallCredentialsSafeHandle credentials);
[DllImport("grpc_csharp_ext.dll")]
public static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
@@ -773,7 +777,10 @@ namespace Grpc.Core.Internal
// ServerSafeHandle
[DllImport("grpc_csharp_ext.dll")]
- public static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
+ public static extern ServerSafeHandle grpcsharp_server_create(ChannelArgsSafeHandle args);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ public static extern void grpcsharp_server_register_completion_queue(ServerSafeHandle server, CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
public static extern int grpcsharp_server_add_insecure_http2_port(ServerSafeHandle server, string addr);
@@ -785,7 +792,7 @@ namespace Grpc.Core.Internal
public static extern void grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+ public static extern CallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
public static extern void grpcsharp_server_cancel_all_calls(ServerSafeHandle server);
@@ -799,16 +806,16 @@ namespace Grpc.Core.Internal
// Timespec
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_now(GPRClockType clockType);
+ public static extern Timespec gprsharp_now(ClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_inf_future(GPRClockType clockType);
+ public static extern Timespec gprsharp_inf_future(ClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_inf_past(GPRClockType clockType);
+ public static extern Timespec gprsharp_inf_past(ClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_convert_clock_type(Timespec t, GPRClockType targetClock);
+ public static extern Timespec gprsharp_convert_clock_type(Timespec t, ClockType targetClock);
[DllImport("grpc_csharp_ext.dll")]
public static extern int gprsharp_sizeof_timespec();
@@ -816,7 +823,7 @@ namespace Grpc.Core.Internal
// Testing
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
+ public static extern CallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
public static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 85b7a4b01e..6a2f520163 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -44,7 +44,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
- Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment);
+ Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
@@ -62,14 +62,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -121,14 +121,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -179,14 +179,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -237,14 +237,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -281,13 +281,13 @@ namespace Grpc.Core.Internal
{
public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
- (payload) => payload, (payload) => payload, environment, newRpc.Server);
+ (payload) => payload, (payload) => payload, newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
@@ -317,7 +317,7 @@ namespace Grpc.Core.Internal
where TRequest : class
where TResponse : class
{
- DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
+ DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime();
return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index ecfee0bfdd..25b79b4398 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -52,16 +52,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendMessageAsync(message, GetWriteFlags());
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendInitialMetadataAsync(responseHeaders);
}
public WriteOptions WriteOptions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/ServerRpcNew.cs
index 7e86fddb4d..e4f1880bdb 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRpcNew.cs
@@ -32,63 +32,78 @@
#endregion
using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
-using System.Threading.Tasks;
-using Grpc.Core.Internal;
-using Grpc.Core.Utils;
+using Grpc.Core;
namespace Grpc.Core.Internal
{
/// <summary>
- /// If error != null, there's been an error or operation has been cancelled.
+ /// Details of a newly received RPC.
/// </summary>
- internal delegate void AsyncCompletionDelegate<T>(T result, Exception error);
-
- /// <summary>
- /// Helper for transforming AsyncCompletionDelegate into full-fledged Task.
- /// </summary>
- internal class AsyncCompletionTaskSource<T>
+ internal struct ServerRpcNew
{
- readonly TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
- readonly AsyncCompletionDelegate<T> completionDelegate;
+ readonly Server server;
+ readonly CallSafeHandle call;
+ readonly string method;
+ readonly string host;
+ readonly Timespec deadline;
+ readonly Metadata requestMetadata;
+
+ public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
+ {
+ this.server = server;
+ this.call = call;
+ this.method = method;
+ this.host = host;
+ this.deadline = deadline;
+ this.requestMetadata = requestMetadata;
+ }
+
+ public Server Server
+ {
+ get
+ {
+ return this.server;
+ }
+ }
- public AsyncCompletionTaskSource()
+ public CallSafeHandle Call
{
- completionDelegate = new AsyncCompletionDelegate<T>(HandleCompletion);
+ get
+ {
+ return this.call;
+ }
}
- public Task<T> Task
+ public string Method
{
get
{
- return tcs.Task;
+ return this.method;
}
}
- public AsyncCompletionDelegate<T> CompletionDelegate
+ public string Host
{
get
{
- return completionDelegate;
+ return this.host;
}
}
- private void HandleCompletion(T value, Exception error)
+ public Timespec Deadline
{
- if (error == null)
+ get
{
- tcs.SetResult(value);
- return;
+ return this.deadline;
}
- if (error is OperationCanceledException)
+ }
+
+ public Metadata RequestMetadata
+ {
+ get
{
- tcs.SetCanceled();
- return;
+ return this.requestMetadata;
}
- tcs.SetException(error);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index 6b5f70e220..8581302706 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -31,12 +31,6 @@
#endregion
-using System;
-using System.Collections.Concurrent;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
-using Grpc.Core.Utils;
-
namespace Grpc.Core.Internal
{
/// <summary>
@@ -50,12 +44,17 @@ namespace Grpc.Core.Internal
{
}
- public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
+ public static ServerSafeHandle NewServer(ChannelArgsSafeHandle args)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
- return Native.grpcsharp_server_create(cq, args);
+ return Native.grpcsharp_server_create(args);
+ }
+
+ public void RegisterCompletionQueue(CompletionQueueSafeHandle cq)
+ {
+ Native.grpcsharp_server_register_completion_queue(this, cq);
}
public int AddInsecurePort(string addr)
@@ -73,18 +72,18 @@ namespace Grpc.Core.Internal
Native.grpcsharp_server_start(this);
}
- public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment)
+ public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
- environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
- Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx);
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
}
- public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment)
+ public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
- environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
- Native.grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk();
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
}
protected override bool ReleaseHandle()
diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs
index 56172a5dda..c9fd710e1e 100644
--- a/src/csharp/Grpc.Core/Internal/Timespec.cs
+++ b/src/csharp/Grpc.Core/Internal/Timespec.cs
@@ -49,11 +49,11 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
- public Timespec(long tv_sec, int tv_nsec) : this(tv_sec, tv_nsec, GPRClockType.Realtime)
+ public Timespec(long tv_sec, int tv_nsec) : this(tv_sec, tv_nsec, ClockType.Realtime)
{
}
- public Timespec(long tv_sec, int tv_nsec, GPRClockType clock_type)
+ public Timespec(long tv_sec, int tv_nsec, ClockType clock_type)
{
this.tv_sec = tv_sec;
this.tv_nsec = tv_nsec;
@@ -62,7 +62,7 @@ namespace Grpc.Core.Internal
private long tv_sec;
private int tv_nsec;
- private GPRClockType clock_type;
+ private ClockType clock_type;
/// <summary>
/// Timespec a long time in the future.
@@ -71,7 +71,7 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_inf_future(GPRClockType.Realtime);
+ return new Timespec(long.MaxValue, 0, ClockType.Realtime);
}
}
@@ -82,7 +82,7 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_inf_past(GPRClockType.Realtime);
+ return new Timespec(long.MinValue, 0, ClockType.Realtime);
}
}
@@ -93,7 +93,7 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_now(GPRClockType.Realtime);
+ return Native.gprsharp_now(ClockType.Realtime);
}
}
@@ -122,7 +122,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Converts the timespec to desired clock type.
/// </summary>
- public Timespec ToClockType(GPRClockType targetClock)
+ public Timespec ToClockType(ClockType targetClock)
{
return Native.gprsharp_convert_clock_type(this, targetClock);
}
@@ -142,7 +142,7 @@ namespace Grpc.Core.Internal
public DateTime ToDateTime()
{
GrpcPreconditions.CheckState(tv_nsec >= 0 && tv_nsec < NanosPerSecond);
- GrpcPreconditions.CheckState(clock_type == GPRClockType.Realtime);
+ GrpcPreconditions.CheckState(clock_type == ClockType.Realtime);
// fast path for InfFuture
if (this.Equals(InfFuture))
@@ -227,10 +227,11 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_now(GPRClockType.Precise);
+ return Native.gprsharp_now(ClockType.Precise);
}
}
+ // for tests only
internal static int NativeSize
{
get
@@ -238,5 +239,23 @@ namespace Grpc.Core.Internal
return Native.gprsharp_sizeof_timespec();
}
}
+
+ // for tests only
+ internal static Timespec NativeInfFuture
+ {
+ get
+ {
+ return Native.gprsharp_inf_future(ClockType.Realtime);
+ }
+ }
+
+ // for tests only
+ public static Timespec NativeInfPast
+ {
+ get
+ {
+ return Native.gprsharp_inf_past(ClockType.Realtime);
+ }
+ }
}
}