aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-08-07 01:18:37 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-08-07 17:36:06 -0700
commit8368b2e4b911a0e47cb2a2304513939ab34c74c3 (patch)
tree7d5311c55310aa1474d8a6f175cda58adfa7b051 /src/csharp/Grpc.Core/Internal
parentbff90ac384cd19186e4ccde629ad8c6b7a17cd5d (diff)
implemented sending initial metadata
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs1
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs30
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs14
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs1
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs6
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs8
7 files changed, 57 insertions, 11 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index c26b0773ba..c8c2449ee6 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -64,6 +64,7 @@ namespace Grpc.Core.Internal
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails;
+ this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 0c7694e9a5..9fa0baca87 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -71,6 +71,9 @@ namespace Grpc.Core.Internal
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer.
+ protected bool initialMetadataSent;
+ protected long streamingWritesCounter;
+
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
@@ -132,8 +135,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(HandleSendFinished, payload, writeFlags);
+ call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
+
sendCompletionDelegate = completionDelegate;
+ initialMetadataSent = true;
+ streamingWritesCounter++;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 8d7bdf65aa..9eac7f7b61 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -98,6 +98,34 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Initiates sending a initial metadata.
+ /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
+ /// to make things simpler.
+ /// completionDelegate is invoked upon completion.
+ /// </summary>
+ public void StartSendInitialMetadata(Metadata headers, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
+ Preconditions.CheckState(streamingWritesCounter > 0, "Response headers can only be sent before the first write starts.");
+ CheckSendingAllowed();
+
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ {
+ call.StartSendInitialMetadata(HandleSendFinished, metadataArray, writeFlags);
+ }
+
+ this.initialMetadataSent = true;
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
@@ -111,7 +139,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags);
+ call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index be1426feb4..02502a6f01 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -70,7 +70,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -78,7 +78,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@@ -142,11 +142,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, writeFlags).CheckOk();
}
- public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags)
+ public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags).CheckOk();
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback, WriteFlags writeFlags)
@@ -156,11 +156,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx, writeFlags).CheckOk();
}
- public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags).CheckOk();
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@@ -177,7 +177,7 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
- public void SendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 4a14694977..dd7f4256c4 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -68,6 +68,7 @@ namespace Grpc.Core.Internal
{
return this.writeOptions;
}
+
set
{
writeOptions = value;
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 34db03cc38..74af19dc01 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -304,13 +304,15 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
- public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, IHasWriteOptions writeOptionsHolder, CancellationToken cancellationToken)
+ public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
+ where TRequest : class
+ where TResponse : class
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
return new ServerCallContext(
newRpc.Method, newRpc.Host, peer, realtimeDeadline,
- newRpc.RequestMetadata, cancellationToken, writeOptionsHolder);
+ newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 1d79241f77..5dcd5a7220 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -64,12 +64,20 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ var taskSource = new AsyncCompletionTaskSource<object>();
+ call.StartSendInitialMetadata(responseHeaders, GetWriteFlags(), taskSource.CompletionDelegate);
+ return taskSource.Task;
+ }
+
public WriteOptions WriteOptions
{
get
{
return writeOptions;
}
+
set
{
writeOptions = value;