From bff90ac384cd19186e4ccde629ad8c6b7a17cd5d Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 6 Aug 2015 21:30:26 -0700 Subject: added WriteOptions support and enabled NoCompress flag to be used for all writes --- .../Grpc.Core/Internal/ServerResponseStream.cs | 25 +++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) (limited to 'src/csharp/Grpc.Core/Internal/ServerResponseStream.cs') diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 756dcee87f..1d79241f77 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -38,11 +38,12 @@ namespace Grpc.Core.Internal /// /// Writes responses asynchronously to an underlying AsyncCallServer object. /// - internal class ServerResponseStream : IServerStreamWriter + internal class ServerResponseStream : IServerStreamWriter, IHasWriteOptions where TRequest : class where TResponse : class { readonly AsyncCallServer call; + WriteOptions writeOptions; public ServerResponseStream(AsyncCallServer call) { @@ -52,15 +53,33 @@ namespace Grpc.Core.Internal public Task WriteAsync(TResponse message) { var taskSource = new AsyncCompletionTaskSource(); - call.StartSendMessage(message, taskSource.CompletionDelegate); + call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); return taskSource.Task; } public Task WriteStatusAsync(Status status, Metadata trailers) { var taskSource = new AsyncCompletionTaskSource(); - call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); + call.StartSendStatusFromServer(status, trailers, GetWriteFlags(), taskSource.CompletionDelegate); return taskSource.Task; } + + public WriteOptions WriteOptions + { + get + { + return writeOptions; + } + set + { + writeOptions = value; + } + } + + private WriteFlags GetWriteFlags() + { + var options = writeOptions; + return options != null ? options.Flags : default(WriteFlags); + } } } -- cgit v1.2.3 From 8368b2e4b911a0e47cb2a2304513939ab34c74c3 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 7 Aug 2015 01:18:37 -0700 Subject: implemented sending initial metadata --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 1 + src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 8 +++++- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 30 +++++++++++++++++++++- src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 14 +++++----- .../Grpc.Core/Internal/ClientRequestStream.cs | 1 + src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 6 +++-- .../Grpc.Core/Internal/ServerResponseStream.cs | 8 ++++++ src/csharp/Grpc.Core/ServerCallContext.cs | 17 ++++++++---- src/csharp/ext/grpc_csharp_ext.c | 25 +++++++++++++----- 9 files changed, 88 insertions(+), 22 deletions(-) (limited to 'src/csharp/Grpc.Core/Internal/ServerResponseStream.cs') diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index c26b0773ba..c8c2449ee6 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -64,6 +64,7 @@ namespace Grpc.Core.Internal : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) { this.details = callDetails; + this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. } // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 0c7694e9a5..9fa0baca87 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -71,6 +71,9 @@ namespace Grpc.Core.Internal protected bool halfclosed; protected bool finished; // True if close has been received from the peer. + protected bool initialMetadataSent; + protected long streamingWritesCounter; + public AsyncCallBase(Func serializer, Func deserializer) { this.serializer = Preconditions.CheckNotNull(serializer); @@ -132,8 +135,11 @@ namespace Grpc.Core.Internal Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); - call.StartSendMessage(HandleSendFinished, payload, writeFlags); + call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); + sendCompletionDelegate = completionDelegate; + initialMetadataSent = true; + streamingWritesCounter++; } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 8d7bdf65aa..9eac7f7b61 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -97,6 +97,34 @@ namespace Grpc.Core.Internal StartReadMessageInternal(completionDelegate); } + /// + /// Initiates sending a initial metadata. + /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation + /// to make things simpler. + /// completionDelegate is invoked upon completion. + /// + public void StartSendInitialMetadata(Metadata headers, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate) + { + lock (myLock) + { + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + + Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call."); + Preconditions.CheckState(streamingWritesCounter > 0, "Response headers can only be sent before the first write starts."); + CheckSendingAllowed(); + + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + + using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + { + call.StartSendInitialMetadata(HandleSendFinished, metadataArray, writeFlags); + } + + this.initialMetadataSent = true; + sendCompletionDelegate = completionDelegate; + } + } + /// /// Sends call result status, also indicating server is done with streaming responses. /// Only one pending send action is allowed at any given time. @@ -111,7 +139,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags); + call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags, !initialMetadataSent); } halfcloseRequested = true; readingDone = true; diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index be1426feb4..02502a6f01 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -70,7 +70,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags); + BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, @@ -78,7 +78,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, @@ -142,11 +142,11 @@ namespace Grpc.Core.Internal grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, writeFlags).CheckOk(); } - public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags) + public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags).CheckOk(); + grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); } public void StartSendCloseFromClient(BatchCompletionDelegate callback, WriteFlags writeFlags) @@ -156,11 +156,11 @@ namespace Grpc.Core.Internal grpcsharp_call_send_close_from_client(this, ctx, writeFlags).CheckOk(); } - public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags).CheckOk(); + grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags, sendEmptyInitialMetadata).CheckOk(); } public void StartReceiveMessage(BatchCompletionDelegate callback) @@ -177,7 +177,7 @@ namespace Grpc.Core.Internal grpcsharp_call_start_serverside(this, ctx).CheckOk(); } - public void SendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 4a14694977..dd7f4256c4 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -68,6 +68,7 @@ namespace Grpc.Core.Internal { return this.writeOptions; } + set { writeOptions = value; diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 34db03cc38..74af19dc01 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -304,13 +304,15 @@ namespace Grpc.Core.Internal return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } - public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, IHasWriteOptions writeOptionsHolder, CancellationToken cancellationToken) + public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, ServerResponseStream serverResponseStream, CancellationToken cancellationToken) + where TRequest : class + where TResponse : class { DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime(); return new ServerCallContext( newRpc.Method, newRpc.Host, peer, realtimeDeadline, - newRpc.RequestMetadata, cancellationToken, writeOptionsHolder); + newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream); } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 1d79241f77..5dcd5a7220 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -64,12 +64,20 @@ namespace Grpc.Core.Internal return taskSource.Task; } + public Task WriteResponseHeadersAsync(Metadata responseHeaders) + { + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendInitialMetadata(responseHeaders, GetWriteFlags(), taskSource.CompletionDelegate); + return taskSource.Task; + } + public WriteOptions WriteOptions { get { return writeOptions; } + set { writeOptions = value; diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index 5657eb8513..7849df9bb4 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -43,10 +43,8 @@ namespace Grpc.Core /// /// Context for a server-side call. /// - public sealed class ServerCallContext + public class ServerCallContext { - // TODO(jtattermusch): expose method to send initial metadata back to client - private readonly string method; private readonly string host; private readonly string peer; @@ -56,9 +54,11 @@ namespace Grpc.Core private readonly Metadata responseTrailers = new Metadata(); private Status status = Status.DefaultSuccess; + private Func writeHeadersFunc; private IHasWriteOptions writeOptionsHolder; - public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, IHasWriteOptions writeOptionsHolder) + public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, + Func writeHeadersFunc, IHasWriteOptions writeOptionsHolder) { this.method = method; this.host = host; @@ -66,8 +66,14 @@ namespace Grpc.Core this.deadline = deadline; this.requestHeaders = requestHeaders; this.cancellationToken = cancellationToken; + this.writeHeadersFunc = writeHeadersFunc; this.writeOptionsHolder = writeOptionsHolder; } + + public Task WriteResponseHeadersAsync(Metadata responseHeaders) + { + return writeHeadersFunc(responseHeaders); + } /// Name of method called in this RPC. public string Method @@ -114,7 +120,7 @@ namespace Grpc.Core } } - ///Cancellation token signals when call is cancelled. + /// Cancellation token signals when call is cancelled. public CancellationToken CancellationToken { get @@ -157,6 +163,7 @@ namespace Grpc.Core { return writeOptionsHolder.WriteOptions; } + set { writeOptionsHolder.WriteOptions = value; diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 165459371b..cb138064e1 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -651,15 +651,22 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, - const char *send_buffer, size_t send_buffer_len, gpr_uint32 write_flags) { + const char *send_buffer, size_t send_buffer_len, + gpr_uint32 write_flags, + gpr_int32 send_empty_initial_metadata) { /* TODO: don't use magic number */ - grpc_op ops[1]; + grpc_op ops[2]; + size_t nops = send_empty_initial_metadata ? 2 : 1; ops[0].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[0].data.send_message = ctx->send_message; ops[0].flags = write_flags; + ops[1].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[1].data.send_initial_metadata.count = 0; + ops[1].data.send_initial_metadata.metadata = NULL; + ops[1].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, nops, ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -675,9 +682,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call, GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code, - const char *status_details, grpc_metadata_array *trailing_metadata, gpr_uint32 write_flags) { + const char *status_details, grpc_metadata_array *trailing_metadata, + gpr_uint32 write_flags, gpr_int32 send_empty_initial_metadata) { /* TODO: don't use magic number */ - grpc_op ops[1]; + grpc_op ops[2]; + size_t nops = send_empty_initial_metadata ? 2 : 1; ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].data.send_status_from_server.status = status_code; ops[0].data.send_status_from_server.status_details = @@ -689,8 +698,12 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( ops[0].data.send_status_from_server.trailing_metadata = ctx->send_status_from_server.trailing_metadata.metadata; ops[0].flags = write_flags; + ops[1].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[1].data.send_initial_metadata.count = 0; + ops[1].data.send_initial_metadata.metadata = NULL; + ops[1].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); + return grpc_call_start_batch(call, ops, nops, ctx); } GPR_EXPORT grpc_call_error GPR_CALLTYPE -- cgit v1.2.3 From 5321d49b51e00d42af730dddeb8c85f12afeb8ea Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 7 Aug 2015 23:21:27 -0700 Subject: fixed writeOptions and added test --- src/csharp/Grpc.Core.Tests/CompressionTest.cs | 128 +++++++++++++++++++++ src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj | 1 + src/csharp/Grpc.Core/CallOptions.cs | 1 + src/csharp/Grpc.Core/IAsyncStreamWriter.cs | 4 - src/csharp/Grpc.Core/Internal/AsyncCall.cs | 8 +- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 8 +- src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 30 ++--- .../Grpc.Core/Internal/ClientRequestStream.cs | 2 +- .../Grpc.Core/Internal/ServerResponseStream.cs | 4 +- src/csharp/ext/grpc_csharp_ext.c | 29 +++-- 10 files changed, 170 insertions(+), 45 deletions(-) create mode 100644 src/csharp/Grpc.Core.Tests/CompressionTest.cs (limited to 'src/csharp/Grpc.Core/Internal/ServerResponseStream.cs') diff --git a/src/csharp/Grpc.Core.Tests/CompressionTest.cs b/src/csharp/Grpc.Core.Tests/CompressionTest.cs new file mode 100644 index 0000000000..492369968e --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/CompressionTest.cs @@ -0,0 +1,128 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class CompressionTest + { + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(); + + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.Dispose(); + server.ShutdownAsync().Wait(); + } + + [TestFixtureTearDown] + public void CleanupClass() + { + GrpcEnvironment.Shutdown(); + } + + [Test] + public void WriteOptions_Unary() + { + helper.UnaryHandler = new UnaryServerMethod(async (request, context) => + { + context.WriteOptions = new WriteOptions(WriteFlags.NoCompress); + return request; + }); + + var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress)); + Calls.BlockingUnaryCall(helper.CreateUnaryCall(callOptions), "abc"); + } + + [Test] + public async Task WriteOptions_DuplexStreaming() + { + helper.DuplexStreamingHandler = new DuplexStreamingServerMethod(async (requestStream, responseStream, context) => + { + await requestStream.ToList(); + + context.WriteOptions = new WriteOptions(WriteFlags.NoCompress); + + await context.WriteResponseHeadersAsync(new Metadata { new Metadata.Entry("ascii-header", "abcdefg") }); + + await responseStream.WriteAsync("X"); + + responseStream.WriteOptions = null; + await responseStream.WriteAsync("Y"); + + responseStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress); + await responseStream.WriteAsync("Z"); + }); + + var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress)); + var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(callOptions)); + + // check that write options from call options are propagated to request stream. + Assert.IsTrue((call.RequestStream.WriteOptions.Flags & WriteFlags.NoCompress) != 0); + + call.RequestStream.WriteOptions = new WriteOptions(); + await call.RequestStream.WriteAsync("A"); + + call.RequestStream.WriteOptions = null; + await call.RequestStream.WriteAsync("B"); + + call.RequestStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress); + await call.RequestStream.WriteAsync("C"); + + await call.RequestStream.CompleteAsync(); + + await call.ResponseStream.ToList(); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 4692d958a0..58fa7c645f 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -79,6 +79,7 @@ + diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs index e8d0b0647f..a08986d77e 100644 --- a/src/csharp/Grpc.Core/CallOptions.cs +++ b/src/csharp/Grpc.Core/CallOptions.cs @@ -63,6 +63,7 @@ namespace Grpc.Core // TODO(jtattermusch): allow null value of deadline? this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue; this.cancellationToken = cancellationToken; + this.writeOptions = writeOptions; } /// diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs index b554b6e266..4e2acb9c71 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs @@ -56,10 +56,6 @@ namespace Grpc.Core /// If null, default options will be used. /// Once set, this property maintains its value across subsequent /// writes. - /// Internally, closing the stream is on client and sending - /// status from server is treated as a write, so write options - /// are also applied to these operations. - /// /// The write options. WriteOptions WriteOptions { get; set; } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index df5c07e4c4..dee31c670e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -165,7 +165,7 @@ namespace Grpc.Core.Internal unaryResponseTcs = new TaskCompletionSource(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartClientStreaming(HandleUnaryResponse, metadataArray, GetWriteFlagsForCall()); + call.StartClientStreaming(HandleUnaryResponse, metadataArray); } return unaryResponseTcs.Task; @@ -211,7 +211,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartDuplexStreaming(HandleFinished, metadataArray, GetWriteFlagsForCall()); + call.StartDuplexStreaming(HandleFinished, metadataArray); } } } @@ -239,14 +239,14 @@ namespace Grpc.Core.Internal /// Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// - public void StartSendCloseFromClient(WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate) + public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate) { lock (myLock) { Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); - call.StartSendCloseFromClient(HandleHalfclosed, writeFlags); + call.StartSendCloseFromClient(HandleHalfclosed); halfcloseRequested = true; sendCompletionDelegate = completionDelegate; diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 1704b9afbf..3710a65d6b 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -103,7 +103,7 @@ namespace Grpc.Core.Internal /// to make things simpler. /// completionDelegate is invoked upon completion. /// - public void StartSendInitialMetadata(Metadata headers, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate) + public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate completionDelegate) { lock (myLock) { @@ -118,7 +118,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { - call.StartSendInitialMetadata(HandleSendFinished, metadataArray, writeFlags); + call.StartSendInitialMetadata(HandleSendFinished, metadataArray); } this.initialMetadataSent = true; @@ -131,7 +131,7 @@ namespace Grpc.Core.Internal /// Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// - public void StartSendStatusFromServer(Status status, Metadata trailers, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate) + public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate completionDelegate) { lock (myLock) { @@ -140,7 +140,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags, !initialMetadataSent); + call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent); } halfcloseRequested = true; readingDone = true; diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 02502a6f01..1b9d0abbc4 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -57,7 +57,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, - BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, @@ -66,7 +66,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, - BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, @@ -74,11 +74,11 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, - BatchContextSafeHandle ctx, WriteFlags writeFlags); + BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, @@ -90,7 +90,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call, - BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call); @@ -121,11 +121,11 @@ namespace Grpc.Core.Internal .CheckOk(); } - public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_start_client_streaming(this, ctx, metadataArray, writeFlags).CheckOk(); + grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) @@ -135,11 +135,11 @@ namespace Grpc.Core.Internal grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); } - public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, writeFlags).CheckOk(); + grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) @@ -149,18 +149,18 @@ namespace Grpc.Core.Internal grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); } - public void StartSendCloseFromClient(BatchCompletionDelegate callback, WriteFlags writeFlags) + public void StartSendCloseFromClient(BatchCompletionDelegate callback) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_close_from_client(this, ctx, writeFlags).CheckOk(); + grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } - public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags, sendEmptyInitialMetadata).CheckOk(); + grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); } public void StartReceiveMessage(BatchCompletionDelegate callback) @@ -177,11 +177,11 @@ namespace Grpc.Core.Internal grpcsharp_call_start_serverside(this, ctx).CheckOk(); } - public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_initial_metadata(this, ctx, metadataArray, writeFlags).CheckOk(); + grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } public void Cancel() diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index dd7f4256c4..013f00ff6f 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -58,7 +58,7 @@ namespace Grpc.Core.Internal public Task CompleteAsync() { var taskSource = new AsyncCompletionTaskSource(); - call.StartSendCloseFromClient(GetWriteFlags(), taskSource.CompletionDelegate); + call.StartSendCloseFromClient(taskSource.CompletionDelegate); return taskSource.Task; } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 5dcd5a7220..03e39efc02 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -60,14 +60,14 @@ namespace Grpc.Core.Internal public Task WriteStatusAsync(Status status, Metadata trailers) { var taskSource = new AsyncCompletionTaskSource(); - call.StartSendStatusFromServer(status, trailers, GetWriteFlags(), taskSource.CompletionDelegate); + call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); return taskSource.Task; } public Task WriteResponseHeadersAsync(Metadata responseHeaders) { var taskSource = new AsyncCompletionTaskSource(); - call.StartSendInitialMetadata(responseHeaders, GetWriteFlags(), taskSource.CompletionDelegate); + call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate); return taskSource.Task; } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index cb138064e1..5d17360d6a 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -506,7 +506,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; - ops[0].flags = write_flags; + ops[0].flags = 0; ops[1].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); @@ -514,7 +514,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, ops[1].flags = write_flags; ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - ops[2].flags = write_flags; + ops[2].flags = 0; ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); @@ -542,7 +542,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_client_streaming(grpc_call *call, grpcsharp_batch_context *ctx, - grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) { + grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[4]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; @@ -551,7 +551,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call, ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; - ops[0].flags = write_flags; + ops[0].flags = 0; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); @@ -587,7 +587,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; - ops[0].flags = write_flags; + ops[0].flags = 0; ops[1].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); @@ -595,7 +595,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( ops[1].flags = write_flags; ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - ops[2].flags = write_flags; + ops[2].flags = 0; ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); @@ -619,7 +619,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_duplex_streaming(grpc_call *call, grpcsharp_batch_context *ctx, - grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) { + grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[3]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; @@ -628,7 +628,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; - ops[0].flags = write_flags; + ops[0].flags = 0; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); @@ -671,11 +671,11 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_close_from_client(grpc_call *call, - grpcsharp_batch_context *ctx, gpr_uint32 write_flags) { + grpcsharp_batch_context *ctx) { /* TODO: don't use magic number */ grpc_op ops[1]; ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - ops[0].flags = write_flags; + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } @@ -683,7 +683,7 @@ grpcsharp_call_send_close_from_client(grpc_call *call, GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code, const char *status_details, grpc_metadata_array *trailing_metadata, - gpr_uint32 write_flags, gpr_int32 send_empty_initial_metadata) { + gpr_int32 send_empty_initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[2]; size_t nops = send_empty_initial_metadata ? 2 : 1; @@ -697,7 +697,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( ctx->send_status_from_server.trailing_metadata.count; ops[0].data.send_status_from_server.trailing_metadata = ctx->send_status_from_server.trailing_metadata.metadata; - ops[0].flags = write_flags; + ops[0].flags = 0; ops[1].op = GRPC_OP_SEND_INITIAL_METADATA; ops[1].data.send_initial_metadata.count = 0; ops[1].data.send_initial_metadata.metadata = NULL; @@ -731,8 +731,7 @@ grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) { GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_initial_metadata(grpc_call *call, grpcsharp_batch_context *ctx, - grpc_metadata_array *initial_metadata, - gpr_uint32 write_flags) { + grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[1]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; @@ -741,7 +740,7 @@ grpcsharp_call_send_initial_metadata(grpc_call *call, ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; ops[0].data.send_initial_metadata.metadata = ctx->send_initial_metadata.metadata; - ops[0].flags = write_flags; + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); } -- cgit v1.2.3