aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-05-24 07:34:49 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-05-24 07:34:49 -0700
commite2c1040f9c52d0fea341f390bce860ae2e3c85cc (patch)
treefb2cf032f1ea73f3ef75176b8e0b1b5a94d1efb5 /src/csharp/Grpc.Core/Internal
parent63d11752b7017d7fd677435d3e92a1a88d768aac (diff)
parentd7bbd38b27630f908fc4f1cb906607e44c8f30bb (diff)
Merge github.com:grpc/grpc into atm2
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs33
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs63
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs58
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs103
-rw-r--r--src/csharp/Grpc.Core/Internal/CallError.cs (renamed from src/csharp/Grpc.Core/Internal/Enums.cs)39
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientSideStatus.cs70
-rw-r--r--src/csharp/Grpc.Core/Internal/ClockType.cs53
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/INativeCall.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs98
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs81
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRequestStream.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRpcNew.cs109
-rw-r--r--src/csharp/Grpc.Core/Internal/Timespec.cs37
19 files changed, 487 insertions, 307 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 016e1b8587..55351869b5 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -57,7 +57,7 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
- // Indicates that steaming call has finished.
+ // Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// Response headers set here once received.
@@ -241,11 +241,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming response. Only one pending read action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate)
+ public Task<TResponse> ReadMessageAsync()
{
- StartReadMessageInternal(completionDelegate);
+ return ReadMessageInternalAsync();
}
/// <summary>
@@ -409,10 +408,13 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
{
+ // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
+ // success will be always set to true.
+
using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
{
TResponse msg = default(TResponse);
- var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null;
+ var deserializeException = TryDeserialize(receivedMessage, out msg);
lock (myLock)
{
@@ -425,14 +427,13 @@ namespace Grpc.Core.Internal
finishedStatus = receivedStatus;
ReleaseResourcesIfPossible();
-
}
responseHeadersTcs.SetResult(responseHeaders);
var status = receivedStatus.Status;
- if (!success || status.StatusCode != StatusCode.OK)
+ if (status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
return;
@@ -442,11 +443,27 @@ namespace Grpc.Core.Internal
}
}
+ protected override void CheckSendingAllowed(bool allowFinished)
+ {
+ base.CheckSendingAllowed(true);
+
+ // throwing RpcException if we already received status on client
+ // side makes the most sense.
+ // Note that this throws even for StatusCode.OK.
+ if (!allowFinished && finishedStatus.HasValue)
+ {
+ throw new RpcException(finishedStatus.Value.Status);
+ }
+ }
+
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
private void HandleFinished(bool success, ClientSideStatus receivedStatus)
{
+ // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
+ // success will be always set to true.
+
lock (myLock)
{
finished = true;
@@ -457,7 +474,7 @@ namespace Grpc.Core.Internal
var status = receivedStatus.Status;
- if (!success || status.StatusCode != StatusCode.OK)
+ if (status.StatusCode != StatusCode.OK)
{
streamingCallFinishedTcs.SetException(new RpcException(status));
return;
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index ccd047f469..4de23706b2 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -68,7 +68,8 @@ namespace Grpc.Core.Internal
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
- protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
+ protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
+ protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
@@ -150,15 +151,25 @@ namespace Grpc.Core.Internal
/// Initiates reading a message. Only one read operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate)
+ protected Task<TRead> ReadMessageInternalAsync()
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckReadingAllowed();
+ GrpcPreconditions.CheckState(started);
+ if (readingDone)
+ {
+ // the last read that returns null or throws an exception is idempotent
+ // and maintain its state.
+ GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
+ return streamingReadTcs.Task;
+ }
+
+ GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
+ GrpcPreconditions.CheckState(!disposed);
call.StartReceiveMessage(HandleReadFinished);
- readCompletionDelegate = completionDelegate;
+ streamingReadTcs = new TaskCompletionSource<TRead>();
+ return streamingReadTcs.Task;
}
}
@@ -202,7 +213,7 @@ namespace Grpc.Core.Internal
{
}
- protected void CheckSendingAllowed(bool allowFinished)
+ protected virtual void CheckSendingAllowed(bool allowFinished)
{
GrpcPreconditions.CheckState(started);
CheckNotCancelled();
@@ -213,15 +224,6 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
- protected virtual void CheckReadingAllowed()
- {
- GrpcPreconditions.CheckState(started);
- GrpcPreconditions.CheckState(!disposed);
-
- GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed.");
- GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
- }
-
protected void CheckNotCancelled()
{
if (cancelRequested)
@@ -322,22 +324,18 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendStatusFromServerFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
-
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server."));
+ sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server."));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ sendStatusFromServerTcs.SetResult(null);
}
}
@@ -346,15 +344,17 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleReadFinished(bool success, byte[] receivedMessage)
{
+ // if success == false, received message will be null. It that case we will
+ // treat this completion as the last read an rely on C core to handle the failed
+ // read (e.g. deliver approriate statusCode on the clientside).
+
TRead msg = default(TRead);
var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
- AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
+ TaskCompletionSource<TRead> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = readCompletionDelegate;
- readCompletionDelegate = null;
-
+ origTcs = streamingReadTcs;
if (receivedMessage == null)
{
// This was the last read.
@@ -364,20 +364,25 @@ namespace Grpc.Core.Internal
if (deserializeException != null && IsClient)
{
readingDone = true;
+
+ // TODO(jtattermusch): it might be too late to set the status
CancelWithStatus(DeserializeResponseFailureStatus);
}
+ if (!readingDone)
+ {
+ streamingReadTcs = null;
+ }
+
ReleaseResourcesIfPossible();
}
- // TODO: handle the case when success==false
-
if (deserializeException != null && !IsClient)
{
- FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException));
+ origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException));
return;
}
- FireCompletion(origCompletionDelegate, msg, null);
+ origTcs.SetResult(msg);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index bea2b3660c..b1566b44a7 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -65,6 +65,15 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Only for testing purposes.
+ /// </summary>
+ public void InitializeForTesting(INativeCall call)
+ {
+ server.AddCallReference(this);
+ InitializeInternal(call);
+ }
+
+ /// <summary>
/// Starts a server side call.
/// </summary>
public Task ServerSideCallAsync()
@@ -91,11 +100,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming request. Only one pending read action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate)
+ public Task<TRequest> ReadMessageAsync()
{
- StartReadMessageInternal(completionDelegate);
+ return ReadMessageInternalAsync();
}
/// <summary>
@@ -128,24 +136,33 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// Sends call result status, also indicating server is done with streaming responses.
- /// Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
+ /// Sends call result status, indicating we are done with writes.
+ /// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// </summary>
- public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple<TResponse, WriteFlags> optionalWrite)
{
+ byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null;
+ var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags);
+
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: false);
+ GrpcPreconditions.CheckState(started);
+ GrpcPreconditions.CheckState(!disposed);
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once.");
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
+ call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent,
+ payload, writeFlags);
}
halfcloseRequested = true;
- readingDone = true;
- sendCompletionDelegate = completionDelegate;
+ initialMetadataSent = true;
+ sendStatusFromServerTcs = new TaskCompletionSource<object>();
+ if (optionalWrite != null)
+ {
+ streamingWritesCounter++;
+ }
+ return sendStatusFromServerTcs.Task;
}
}
@@ -174,12 +191,6 @@ namespace Grpc.Core.Internal
get { return false; }
}
- protected override void CheckReadingAllowed()
- {
- base.CheckReadingAllowed();
- GrpcPreconditions.CheckArgument(!cancelRequested);
- }
-
protected override void OnAfterReleaseResources()
{
server.RemoveCallReference(this);
@@ -190,12 +201,21 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleFinishedServerside(bool success, bool cancelled)
{
+ // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
+ // success will be always set to true.
lock (myLock)
{
finished = true;
+ if (streamingReadTcs == null)
+ {
+ // if there's no pending read, readingDone=true will dispose now.
+ // if there is a pending read, we will dispose once that read finishes.
+ readingDone = true;
+ streamingReadTcs = new TaskCompletionSource<TRequest>();
+ streamingReadTcs.SetResult(default(TRequest));
+ }
ReleaseResourcesIfPossible();
}
- // TODO(jtattermusch): handle error
if (cancelled)
{
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index 66d2a66f99..c28a6f64d3 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -120,107 +120,4 @@ namespace Grpc.Core.Internal
return true;
}
}
-
- /// <summary>
- /// Status + metadata received on client side when call finishes.
- /// (when receive_status_on_client operation finishes).
- /// </summary>
- internal struct ClientSideStatus
- {
- readonly Status status;
- readonly Metadata trailers;
-
- public ClientSideStatus(Status status, Metadata trailers)
- {
- this.status = status;
- this.trailers = trailers;
- }
-
- public Status Status
- {
- get
- {
- return this.status;
- }
- }
-
- public Metadata Trailers
- {
- get
- {
- return this.trailers;
- }
- }
- }
-
- /// <summary>
- /// Details of a newly received RPC.
- /// </summary>
- internal struct ServerRpcNew
- {
- readonly Server server;
- readonly CallSafeHandle call;
- readonly string method;
- readonly string host;
- readonly Timespec deadline;
- readonly Metadata requestMetadata;
-
- public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
- {
- this.server = server;
- this.call = call;
- this.method = method;
- this.host = host;
- this.deadline = deadline;
- this.requestMetadata = requestMetadata;
- }
-
- public Server Server
- {
- get
- {
- return this.server;
- }
- }
-
- public CallSafeHandle Call
- {
- get
- {
- return this.call;
- }
- }
-
- public string Method
- {
- get
- {
- return this.method;
- }
- }
-
- public string Host
- {
- get
- {
- return this.host;
- }
- }
-
- public Timespec Deadline
- {
- get
- {
- return this.deadline;
- }
- }
-
- public Metadata RequestMetadata
- {
- get
- {
- return this.requestMetadata;
- }
- }
- }
}
diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/CallError.cs
index 74f86d2a30..541575f5e6 100644
--- a/src/csharp/Grpc.Core/Internal/Enums.cs
+++ b/src/csharp/Grpc.Core/Internal/CallError.cs
@@ -40,7 +40,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// grpc_call_error from grpc/grpc.h
/// </summary>
- internal enum GRPCCallError
+ internal enum CallError
{
/* everything went ok */
OK = 0,
@@ -70,42 +70,9 @@ namespace Grpc.Core.Internal
/// <summary>
/// Checks the call API invocation's result is OK.
/// </summary>
- public static void CheckOk(this GRPCCallError callError)
+ public static void CheckOk(this CallError callError)
{
- GrpcPreconditions.CheckState(callError == GRPCCallError.OK, "Call error: " + callError);
+ GrpcPreconditions.CheckState(callError == CallError.OK, "Call error: " + callError);
}
}
-
- /// <summary>
- /// grpc_completion_type from grpc/grpc.h
- /// </summary>
- internal enum GRPCCompletionType
- {
- /* Shutting down */
- Shutdown,
-
- /* No event before timeout */
- Timeout,
-
- /* operation completion */
- OpComplete
- }
-
- /// <summary>
- /// gpr_clock_type from grpc/support/time.h
- /// </summary>
- internal enum GPRClockType
- {
- /* Monotonic clock */
- Monotonic,
-
- /* Realtime clock */
- Realtime,
-
- /* Precise clock good for performance profiling. */
- Precise,
-
- /* Timespan - the distance between two time points */
- Timespan
- }
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 500653ba5d..244b97d4a4 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -135,13 +135,16 @@ namespace Grpc.Core.Internal
}
}
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalPayload, WriteFlags writeFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
+ var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
+ Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
+ optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index d6e34a0f04..ad9423ff58 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
- var taskSource = new AsyncCompletionTaskSource<TResponse>();
- call.StartReadMessage(taskSource.CompletionDelegate);
- var result = await taskSource.Task.ConfigureAwait(false);
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
if (result == null)
diff --git a/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs b/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs
new file mode 100644
index 0000000000..5727e3f11f
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs
@@ -0,0 +1,70 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using Grpc.Core;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Status + metadata received on client side when call finishes.
+ /// (when receive_status_on_client operation finishes).
+ /// </summary>
+ internal struct ClientSideStatus
+ {
+ readonly Status status;
+ readonly Metadata trailers;
+
+ public ClientSideStatus(Status status, Metadata trailers)
+ {
+ this.status = status;
+ this.trailers = trailers;
+ }
+
+ public Status Status
+ {
+ get
+ {
+ return this.status;
+ }
+ }
+
+ public Metadata Trailers
+ {
+ get
+ {
+ return this.trailers;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/ClockType.cs b/src/csharp/Grpc.Core/Internal/ClockType.cs
new file mode 100644
index 0000000000..57533c9d2f
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/ClockType.cs
@@ -0,0 +1,53 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// gpr_clock_type from grpc/support/time.h
+ /// </summary>
+ internal enum ClockType
+ {
+ /* Monotonic clock */
+ Monotonic,
+
+ /* Realtime clock */
+ Realtime,
+
+ /* Precise clock good for performance profiling. */
+ Precise,
+
+ /* Timespan - the distance between two time points */
+ Timespan
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
index 288680792a..a78e9b70f3 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
@@ -44,7 +44,7 @@ namespace Grpc.Core.Internal
{
static readonly NativeMethods Native = NativeMethods.Get();
- public GRPCCompletionType type;
+ public CompletionType type;
public int success;
public IntPtr tag;
@@ -55,5 +55,20 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_sizeof_grpc_event();
}
}
+
+ /// <summary>
+ /// grpc_completion_type from grpc/grpc.h
+ /// </summary>
+ internal enum CompletionType
+ {
+ /* Shutting down */
+ Shutdown,
+
+ /* No event before timeout */
+ Timeout,
+
+ /* operation completion */
+ OpComplete
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index 4b7124ee74..b538726fa1 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -118,7 +118,7 @@ namespace Grpc.Core.Internal
do
{
ev = cq.Next();
- if (ev.type == GRPCCompletionType.OpComplete)
+ if (ev.type == CompletionQueueEvent.CompletionType.OpComplete)
{
bool success = (ev.success != 0);
IntPtr tag = ev.tag;
@@ -133,7 +133,7 @@ namespace Grpc.Core.Internal
}
}
}
- while (ev.type != GRPCCompletionType.Shutdown);
+ while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
index cbef599139..cd3719cb50 100644
--- a/src/csharp/Grpc.Core/Internal/INativeCall.cs
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -78,7 +78,7 @@ namespace Grpc.Core.Internal
void StartSendCloseFromClient(SendCompletionHandler callback);
- void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, Grpc.Core.WriteFlags writeFlags);
void StartServerSide(ReceivedCloseOnServerHandler callback);
}
diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
index 25735d5262..dc9f62fdab 100644
--- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
@@ -50,6 +50,11 @@ namespace Grpc.Core.Internal
{
using (Profilers.ForCurrentThread().NewScope("MetadataArraySafeHandle.Create"))
{
+ if (metadata.Count == 0)
+ {
+ return new MetadataArraySafeHandle();
+ }
+
// TODO(jtattermusch): we might wanna check that the metadata is readonly
var metadataArray = Native.grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
for (int i = 0; i < metadata.Count; i++)
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index 9ee0ba3bc0..42fd4d4dc6 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -418,32 +418,33 @@ namespace Grpc.Core.Internal
public delegate CallCredentialsSafeHandle grpcsharp_composite_call_credentials_create_delegate(CallCredentialsSafeHandle creds1, CallCredentialsSafeHandle creds2);
public delegate void grpcsharp_call_credentials_release_delegate(IntPtr credentials);
- public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call);
- public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description);
- public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
- public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_cancel_delegate(CallSafeHandle call);
+ public delegate CallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description);
+ public delegate CallError grpcsharp_call_start_unary_delegate(CallSafeHandle call,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ public delegate CallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
- public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ public delegate CallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
- public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
- public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
- public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_message_delegate(CallSafeHandle call,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ public delegate CallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
- public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
+ public delegate CallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_start_serverside_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_start_serverside_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_send_initial_metadata_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_initial_metadata_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
- public delegate GRPCCallError grpcsharp_call_set_credentials_delegate(CallSafeHandle call, CallCredentialsSafeHandle credentials);
+ public delegate CallError grpcsharp_call_set_credentials_delegate(CallSafeHandle call, CallCredentialsSafeHandle credentials);
public delegate CStringSafeHandle grpcsharp_call_get_peer_delegate(CallSafeHandle call);
public delegate void grpcsharp_call_destroy_delegate(IntPtr call);
@@ -496,19 +497,19 @@ namespace Grpc.Core.Internal
public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr);
public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server);
- public delegate GRPCCallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+ public delegate CallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
public delegate void grpcsharp_server_cancel_all_calls_delegate(ServerSafeHandle server);
public delegate void grpcsharp_server_shutdown_and_notify_callback_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
public delegate void grpcsharp_server_destroy_delegate(IntPtr server);
- public delegate Timespec gprsharp_now_delegate(GPRClockType clockType);
- public delegate Timespec gprsharp_inf_future_delegate(GPRClockType clockType);
- public delegate Timespec gprsharp_inf_past_delegate(GPRClockType clockType);
+ public delegate Timespec gprsharp_now_delegate(ClockType clockType);
+ public delegate Timespec gprsharp_inf_future_delegate(ClockType clockType);
+ public delegate Timespec gprsharp_inf_past_delegate(ClockType clockType);
- public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, GPRClockType targetClock);
+ public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, ClockType targetClock);
public delegate int gprsharp_sizeof_timespec_delegate();
- public delegate GRPCCallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
+ public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
public delegate IntPtr grpcsharp_test_nop_delegate(IntPtr ptr);
}
@@ -586,58 +587,59 @@ namespace Grpc.Core.Internal
// CallSafeHandle
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
+ public static extern CallError grpcsharp_call_cancel(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
+ public static extern CallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ public static extern CallError grpcsharp_call_start_unary(CallSafeHandle call,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ public static extern CallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ public static extern CallError grpcsharp_call_send_message(CallSafeHandle call,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ public static extern CallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_recv_message(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_serverside(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_set_credentials(CallSafeHandle call, CallCredentialsSafeHandle credentials);
+ public static extern CallError grpcsharp_call_set_credentials(CallSafeHandle call, CallCredentialsSafeHandle credentials);
[DllImport("grpc_csharp_ext.dll")]
public static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
@@ -783,7 +785,7 @@ namespace Grpc.Core.Internal
public static extern void grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+ public static extern CallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
public static extern void grpcsharp_server_cancel_all_calls(ServerSafeHandle server);
@@ -797,16 +799,16 @@ namespace Grpc.Core.Internal
// Timespec
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_now(GPRClockType clockType);
+ public static extern Timespec gprsharp_now(ClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_inf_future(GPRClockType clockType);
+ public static extern Timespec gprsharp_inf_future(ClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_inf_past(GPRClockType clockType);
+ public static extern Timespec gprsharp_inf_past(ClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_convert_clock_type(Timespec t, GPRClockType targetClock);
+ public static extern Timespec gprsharp_convert_clock_type(Timespec t, ClockType targetClock);
[DllImport("grpc_csharp_ext.dll")]
public static extern int gprsharp_sizeof_timespec();
@@ -814,7 +816,7 @@ namespace Grpc.Core.Internal
// Testing
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
+ public static extern CallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
public static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 1f83e51548..febebba209 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -75,29 +75,32 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ Tuple<TResponse,WriteFlags> responseTuple = null;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
- // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
- var result = await handler(request, context).ConfigureAwait(false);
+ var response = await handler(request, context).ConfigureAwait(false);
status = context.Status;
- await responseStream.WriteAsync(result).ConfigureAwait(false);
+ responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -136,24 +139,26 @@ namespace Grpc.Core.Internal
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
- // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
await handler(request, responseStream, context).ConfigureAwait(false);
status = context.Status;
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -187,33 +192,31 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ Tuple<TResponse,WriteFlags> responseTuple = null;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
- var result = await handler(requestStream, context).ConfigureAwait(false);
+ var response = await handler(requestStream, context).ConfigureAwait(false);
status = context.Status;
- try
- {
- await responseStream.WriteAsync(result).ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- status = Status.DefaultCancelled;
- }
+ responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -255,16 +258,20 @@ namespace Grpc.Core.Internal
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -282,9 +289,7 @@ namespace Grpc.Core.Internal
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
- var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
-
- await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
}
}
@@ -300,15 +305,19 @@ namespace Grpc.Core.Internal
return rpcException.Status;
}
- // TODO(jtattermusch): what is the right status code here?
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
+ public static WriteFlags GetWriteFlags(WriteOptions writeOptions)
+ {
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
+ }
+
public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
where TRequest : class
where TResponse : class
{
- DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
+ DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime();
return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index e7be82c318..d76030d1ad 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
- var taskSource = new AsyncCompletionTaskSource<TRequest>();
- call.StartReadMessage(taskSource.CompletionDelegate);
- var result = await taskSource.Task.ConfigureAwait(false);
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
return result != null;
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 03e39efc02..ecfee0bfdd 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -57,13 +57,6 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
- public Task WriteStatusAsync(Status status, Metadata trailers)
- {
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
- return taskSource.Task;
- }
-
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();
diff --git a/src/csharp/Grpc.Core/Internal/ServerRpcNew.cs b/src/csharp/Grpc.Core/Internal/ServerRpcNew.cs
new file mode 100644
index 0000000000..e4f1880bdb
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/ServerRpcNew.cs
@@ -0,0 +1,109 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using Grpc.Core;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Details of a newly received RPC.
+ /// </summary>
+ internal struct ServerRpcNew
+ {
+ readonly Server server;
+ readonly CallSafeHandle call;
+ readonly string method;
+ readonly string host;
+ readonly Timespec deadline;
+ readonly Metadata requestMetadata;
+
+ public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
+ {
+ this.server = server;
+ this.call = call;
+ this.method = method;
+ this.host = host;
+ this.deadline = deadline;
+ this.requestMetadata = requestMetadata;
+ }
+
+ public Server Server
+ {
+ get
+ {
+ return this.server;
+ }
+ }
+
+ public CallSafeHandle Call
+ {
+ get
+ {
+ return this.call;
+ }
+ }
+
+ public string Method
+ {
+ get
+ {
+ return this.method;
+ }
+ }
+
+ public string Host
+ {
+ get
+ {
+ return this.host;
+ }
+ }
+
+ public Timespec Deadline
+ {
+ get
+ {
+ return this.deadline;
+ }
+ }
+
+ public Metadata RequestMetadata
+ {
+ get
+ {
+ return this.requestMetadata;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs
index 56172a5dda..c9fd710e1e 100644
--- a/src/csharp/Grpc.Core/Internal/Timespec.cs
+++ b/src/csharp/Grpc.Core/Internal/Timespec.cs
@@ -49,11 +49,11 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
- public Timespec(long tv_sec, int tv_nsec) : this(tv_sec, tv_nsec, GPRClockType.Realtime)
+ public Timespec(long tv_sec, int tv_nsec) : this(tv_sec, tv_nsec, ClockType.Realtime)
{
}
- public Timespec(long tv_sec, int tv_nsec, GPRClockType clock_type)
+ public Timespec(long tv_sec, int tv_nsec, ClockType clock_type)
{
this.tv_sec = tv_sec;
this.tv_nsec = tv_nsec;
@@ -62,7 +62,7 @@ namespace Grpc.Core.Internal
private long tv_sec;
private int tv_nsec;
- private GPRClockType clock_type;
+ private ClockType clock_type;
/// <summary>
/// Timespec a long time in the future.
@@ -71,7 +71,7 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_inf_future(GPRClockType.Realtime);
+ return new Timespec(long.MaxValue, 0, ClockType.Realtime);
}
}
@@ -82,7 +82,7 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_inf_past(GPRClockType.Realtime);
+ return new Timespec(long.MinValue, 0, ClockType.Realtime);
}
}
@@ -93,7 +93,7 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_now(GPRClockType.Realtime);
+ return Native.gprsharp_now(ClockType.Realtime);
}
}
@@ -122,7 +122,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Converts the timespec to desired clock type.
/// </summary>
- public Timespec ToClockType(GPRClockType targetClock)
+ public Timespec ToClockType(ClockType targetClock)
{
return Native.gprsharp_convert_clock_type(this, targetClock);
}
@@ -142,7 +142,7 @@ namespace Grpc.Core.Internal
public DateTime ToDateTime()
{
GrpcPreconditions.CheckState(tv_nsec >= 0 && tv_nsec < NanosPerSecond);
- GrpcPreconditions.CheckState(clock_type == GPRClockType.Realtime);
+ GrpcPreconditions.CheckState(clock_type == ClockType.Realtime);
// fast path for InfFuture
if (this.Equals(InfFuture))
@@ -227,10 +227,11 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_now(GPRClockType.Precise);
+ return Native.gprsharp_now(ClockType.Precise);
}
}
+ // for tests only
internal static int NativeSize
{
get
@@ -238,5 +239,23 @@ namespace Grpc.Core.Internal
return Native.gprsharp_sizeof_timespec();
}
}
+
+ // for tests only
+ internal static Timespec NativeInfFuture
+ {
+ get
+ {
+ return Native.gprsharp_inf_future(ClockType.Realtime);
+ }
+ }
+
+ // for tests only
+ public static Timespec NativeInfPast
+ {
+ get
+ {
+ return Native.gprsharp_inf_past(ClockType.Realtime);
+ }
+ }
}
}