diff options
author | 2015-08-24 12:05:13 -0700 | |
---|---|---|
committer | 2015-08-24 12:08:38 -0700 | |
commit | c43648f250dd6cb0f086e2366e468372a6de26ae (patch) | |
tree | 0353cb50e39de0338553b97e5fcb1276f48cf4a5 /src/csharp/Grpc.Core | |
parent | beac88ca56f4710e86668f2cbbd80e02e0607f9c (diff) | |
parent | 04715888e60c6195a2c1d9d6b31f7a82f0d717e2 (diff) |
Merge branch 'master' of github.com:grpc/grpc into compression-accept-encoding
Diffstat (limited to 'src/csharp/Grpc.Core')
53 files changed, 1640 insertions, 500 deletions
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index bf020cd627..dbaa3085c5 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -44,14 +44,16 @@ namespace Grpc.Core { readonly IClientStreamWriter<TRequest> requestStream; readonly Task<TResponse> responseAsync; + readonly Task<Metadata> responseHeadersAsync; readonly Func<Status> getStatusFunc; readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.responseAsync = responseAsync; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -69,6 +71,17 @@ namespace Grpc.Core } /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + + /// <summary> /// Async stream to send streaming requests. /// </summary> public IClientStreamWriter<TRequest> RequestStream @@ -89,6 +102,24 @@ namespace Grpc.Core } /// <summary> + /// Gets the call status if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Status GetStatus() + { + return getStatusFunc(); + } + + /// <summary> + /// Gets the call trailing metadata if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Metadata GetTrailers() + { + return getTrailersFunc(); + } + + /// <summary> /// Provides means to cleanup after the call. /// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything. /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index 0979de606f..ee7ba29695 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -32,7 +32,6 @@ #endregion using System; -using System.Runtime.CompilerServices; using System.Threading.Tasks; namespace Grpc.Core @@ -44,14 +43,16 @@ namespace Grpc.Core { readonly IClientStreamWriter<TRequest> requestStream; readonly IAsyncStreamReader<TResponse> responseStream; + readonly Task<Metadata> responseHeadersAsync; readonly Func<Status> getStatusFunc; readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -80,6 +81,17 @@ namespace Grpc.Core } /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + + /// <summary> /// Gets the call status if the call has already finished. /// Throws InvalidOperationException otherwise. /// </summary> diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index 380efcdb0e..2853a79ce6 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -32,7 +32,6 @@ #endregion using System; -using System.Runtime.CompilerServices; using System.Threading.Tasks; namespace Grpc.Core @@ -43,13 +42,15 @@ namespace Grpc.Core public sealed class AsyncServerStreamingCall<TResponse> : IDisposable { readonly IAsyncStreamReader<TResponse> responseStream; + readonly Task<Metadata> responseHeadersAsync; readonly Func<Status> getStatusFunc; readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.responseStream = responseStream; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -67,6 +68,17 @@ namespace Grpc.Core } /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + + /// <summary> /// Gets the call status if the call has already finished. /// Throws InvalidOperationException otherwise. /// </summary> diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs index 224e343916..154a17a33e 100644 --- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs +++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs @@ -43,13 +43,15 @@ namespace Grpc.Core public sealed class AsyncUnaryCall<TResponse> : IDisposable { readonly Task<TResponse> responseAsync; + readonly Task<Metadata> responseHeadersAsync; readonly Func<Status> getStatusFunc; readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncUnaryCall(Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + public AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.responseAsync = responseAsync; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -67,6 +69,17 @@ namespace Grpc.Core } /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + + /// <summary> /// Allows awaiting this object directly. /// </summary> public TaskAwaiter<TResponse> GetAwaiter() diff --git a/src/csharp/Grpc.Core/CallInvocationDetails.cs b/src/csharp/Grpc.Core/CallInvocationDetails.cs index eb23a3a209..6565073fc5 100644 --- a/src/csharp/Grpc.Core/CallInvocationDetails.cs +++ b/src/csharp/Grpc.Core/CallInvocationDetails.cs @@ -40,30 +40,60 @@ namespace Grpc.Core /// <summary> /// Details about a client-side call to be invoked. /// </summary> - public class CallInvocationDetails<TRequest, TResponse> + public struct CallInvocationDetails<TRequest, TResponse> { readonly Channel channel; readonly string method; readonly string host; readonly Marshaller<TRequest> requestMarshaller; readonly Marshaller<TResponse> responseMarshaller; - readonly CallOptions options; + CallOptions options; + /// <summary> + /// Initializes a new instance of the <see cref="Grpc.Core.CallInvocationDetails`2"/> struct. + /// </summary> + /// <param name="channel">Channel to use for this call.</param> + /// <param name="method">Method to call.</param> + /// <param name="options">Call options.</param> public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, CallOptions options) : - this(channel, method.FullName, null, method.RequestMarshaller, method.ResponseMarshaller, options) + this(channel, method, null, options) { } + /// <summary> + /// Initializes a new instance of the <see cref="Grpc.Core.CallInvocationDetails`2"/> struct. + /// </summary> + /// <param name="channel">Channel to use for this call.</param> + /// <param name="method">Method to call.</param> + /// <param name="host">Host that contains the method. if <c>null</c>, default host will be used.</param> + /// <param name="options">Call options.</param> + public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, string host, CallOptions options) : + this(channel, method.FullName, host, method.RequestMarshaller, method.ResponseMarshaller, options) + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="Grpc.Core.CallInvocationDetails`2"/> struct. + /// </summary> + /// <param name="channel">Channel to use for this call.</param> + /// <param name="method">Qualified method name.</param> + /// <param name="host">Host that contains the method.</param> + /// <param name="requestMarshaller">Request marshaller.</param> + /// <param name="responseMarshaller">Response marshaller.</param> + /// <param name="options">Call options.</param> public CallInvocationDetails(Channel channel, string method, string host, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller, CallOptions options) { - this.channel = Preconditions.CheckNotNull(channel); - this.method = Preconditions.CheckNotNull(method); + this.channel = Preconditions.CheckNotNull(channel, "channel"); + this.method = Preconditions.CheckNotNull(method, "method"); this.host = host; - this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller); - this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller); - this.options = Preconditions.CheckNotNull(options); + this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller, "requestMarshaller"); + this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller, "responseMarshaller"); + this.options = options; } + /// <summary> + /// Get channel associated with this call. + /// </summary> public Channel Channel { get @@ -72,6 +102,9 @@ namespace Grpc.Core } } + /// <summary> + /// Gets name of method to be called. + /// </summary> public string Method { get @@ -80,6 +113,9 @@ namespace Grpc.Core } } + /// <summary> + /// Get name of host. + /// </summary> public string Host { get @@ -88,6 +124,9 @@ namespace Grpc.Core } } + /// <summary> + /// Gets marshaller used to serialize requests. + /// </summary> public Marshaller<TRequest> RequestMarshaller { get @@ -96,6 +135,9 @@ namespace Grpc.Core } } + /// <summary> + /// Gets marshaller used to deserialized responses. + /// </summary> public Marshaller<TResponse> ResponseMarshaller { get @@ -104,6 +146,9 @@ namespace Grpc.Core } } + /// <summary> + /// Gets the call options. + /// </summary> public CallOptions Options { get @@ -111,5 +156,16 @@ namespace Grpc.Core return options; } } + + /// <summary> + /// Returns new instance of <see cref="CallInvocationDetails"/> with + /// <c>Options</c> set to the value provided. Values of all other fields are preserved. + /// </summary> + public CallInvocationDetails<TRequest, TResponse> WithOptions(CallOptions options) + { + var newDetails = this; + newDetails.options = options; + return newDetails; + } } } diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs index 8e9739335f..3dfe80b48c 100644 --- a/src/csharp/Grpc.Core/CallOptions.cs +++ b/src/csharp/Grpc.Core/CallOptions.cs @@ -42,24 +42,30 @@ namespace Grpc.Core /// <summary> /// Options for calls made by client. /// </summary> - public class CallOptions + public struct CallOptions { - readonly Metadata headers; - readonly DateTime deadline; - readonly CancellationToken cancellationToken; + Metadata headers; + DateTime? deadline; + CancellationToken cancellationToken; + WriteOptions writeOptions; + ContextPropagationToken propagationToken; /// <summary> - /// Creates a new instance of <c>CallOptions</c>. + /// Creates a new instance of <c>CallOptions</c> struct. /// </summary> /// <param name="headers">Headers to be sent with the call.</param> /// <param name="deadline">Deadline for the call to finish. null means no deadline.</param> /// <param name="cancellationToken">Can be used to request cancellation of the call.</param> - public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) + /// <param name="writeOptions">Write options that will be used for this call.</param> + /// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param> + public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken), + WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null) { - // TODO(jtattermusch): consider only creating metadata object once it's really needed. - this.headers = headers != null ? headers : new Metadata(); - this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue; + this.headers = headers; + this.deadline = deadline; this.cancellationToken = cancellationToken; + this.writeOptions = writeOptions; + this.propagationToken = propagationToken; } /// <summary> @@ -73,7 +79,7 @@ namespace Grpc.Core /// <summary> /// Call deadline. /// </summary> - public DateTime Deadline + public DateTime? Deadline { get { return deadline; } } @@ -85,5 +91,88 @@ namespace Grpc.Core { get { return cancellationToken; } } + + /// <summary> + /// Write options that will be used for this call. + /// </summary> + public WriteOptions WriteOptions + { + get + { + return this.writeOptions; + } + } + + /// <summary> + /// Token for propagating parent call context. + /// </summary> + public ContextPropagationToken PropagationToken + { + get + { + return this.propagationToken; + } + } + + /// <summary> + /// Returns new instance of <see cref="CallOptions"/> with + /// <c>Headers</c> set to the value provided. Values of all other fields are preserved. + /// </summary> + public CallOptions WithHeaders(Metadata headers) + { + var newOptions = this; + newOptions.headers = headers; + return newOptions; + } + + /// <summary> + /// Returns new instance of <see cref="CallOptions"/> with + /// <c>Deadline</c> set to the value provided. Values of all other fields are preserved. + /// </summary> + public CallOptions WithDeadline(DateTime deadline) + { + var newOptions = this; + newOptions.deadline = deadline; + return newOptions; + } + + /// <summary> + /// Returns new instance of <see cref="CallOptions"/> with + /// <c>CancellationToken</c> set to the value provided. Values of all other fields are preserved. + /// </summary> + public CallOptions WithCancellationToken(CancellationToken cancellationToken) + { + var newOptions = this; + newOptions.cancellationToken = cancellationToken; + return newOptions; + } + + /// <summary> + /// Returns a new instance of <see cref="CallOptions"/> with + /// all previously unset values set to their defaults and deadline and cancellation + /// token propagated when appropriate. + /// </summary> + internal CallOptions Normalize() + { + var newOptions = this; + if (propagationToken != null) + { + if (propagationToken.Options.IsPropagateDeadline) + { + Preconditions.CheckArgument(!newOptions.deadline.HasValue, + "Cannot propagate deadline from parent call. The deadline has already been set explicitly."); + newOptions.deadline = propagationToken.ParentDeadline; + } + if (propagationToken.Options.IsPropagateCancellation) + { + Preconditions.CheckArgument(!newOptions.cancellationToken.CanBeCanceled, + "Cannot propagate cancellation token from parent call. The cancellation token has already been set to a non-default value."); + } + } + + newOptions.headers = newOptions.headers ?? Metadata.Empty; + newOptions.deadline = newOptions.deadline ?? DateTime.MaxValue; + return newOptions; + } } } diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index 00a8cabf82..e57ac89db3 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -31,8 +31,6 @@ #endregion -using System; -using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -40,9 +38,20 @@ namespace Grpc.Core { /// <summary> /// Helper methods for generated clients to make RPC calls. + /// Most users will use this class only indirectly and will be + /// making calls using client object generated from protocol + /// buffer definition files. /// </summary> public static class Calls { + /// <summary> + /// Invokes a simple remote call in a blocking fashion. + /// </summary> + /// <returns>The response.</returns> + /// <param name="call">The call defintion.</param> + /// <param name="req">Request message.</param> + /// <typeparam name="TRequest">Type of request message.</typeparam> + /// <typeparam name="TResponse">The of response message.</typeparam> public static TResponse BlockingUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class @@ -51,15 +60,32 @@ namespace Grpc.Core return asyncCall.UnaryCall(req); } + /// <summary> + /// Invokes a simple remote call asynchronously. + /// </summary> + /// <returns>An awaitable call object providing access to the response.</returns> + /// <param name="call">The call defintion.</param> + /// <param name="req">Request message.</param> + /// <typeparam name="TRequest">Type of request message.</typeparam> + /// <typeparam name="TResponse">The of response message.</typeparam> public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { var asyncCall = new AsyncCall<TRequest, TResponse>(call); var asyncResult = asyncCall.UnaryCallAsync(req); - return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } + /// <summary> + /// Invokes a server streaming call asynchronously. + /// In server streaming scenario, client sends on request and server responds with a stream of responses. + /// </summary> + /// <returns>A call object providing access to the asynchronous response stream.</returns> + /// <param name="call">The call defintion.</param> + /// <param name="req">Request message.</param> + /// <typeparam name="TRequest">Type of request message.</typeparam> + /// <typeparam name="TResponse">The of response messages.</typeparam> public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class @@ -67,9 +93,16 @@ namespace Grpc.Core var asyncCall = new AsyncCall<TRequest, TResponse>(call); asyncCall.StartServerStreamingCall(req); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } + /// <summary> + /// Invokes a client streaming call asynchronously. + /// In client streaming scenario, client sends a stream of requests and server responds with a single response. + /// </summary> + /// <returns>An awaitable call object providing access to the response.</returns> + /// <typeparam name="TRequest">Type of request messages.</typeparam> + /// <typeparam name="TResponse">The of response message.</typeparam> public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call) where TRequest : class where TResponse : class @@ -77,9 +110,18 @@ namespace Grpc.Core var asyncCall = new AsyncCall<TRequest, TResponse>(call); var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); - return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } + /// <summary> + /// Invokes a duplex streaming call asynchronously. + /// In duplex streaming scenario, client sends a stream of requests and server responds with a stream of responses. + /// The response stream is completely independent and both side can be sending messages at the same time. + /// </summary> + /// <returns>A call object providing access to the asynchronous request and response streams.</returns> + /// <param name="call">The call definition.</param> + /// <typeparam name="TRequest">Type of request messages.</typeparam> + /// <typeparam name="TResponse">Type of reponse messages.</typeparam> public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call) where TRequest : class where TResponse : class @@ -88,7 +130,7 @@ namespace Grpc.Core asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 9273ea4582..c11b320a64 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -45,26 +45,31 @@ namespace Grpc.Core /// <summary> /// gRPC Channel /// </summary> - public class Channel : IDisposable + public class Channel { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>(); + readonly object myLock = new object(); + readonly AtomicCounter activeCallCounter = new AtomicCounter(); + + readonly string target; readonly GrpcEnvironment environment; readonly ChannelSafeHandle handle; readonly List<ChannelOption> options; - bool disposed; + + bool shutdownRequested; /// <summary> /// Creates a channel that connects to a specific host. /// Port will default to 80 for an unsecure channel and to 443 for a secure channel. /// </summary> - /// <param name="host">The name or IP address of the host.</param> + /// <param name="target">Target of the channel.</param> /// <param name="credentials">Credentials to secure the channel.</param> /// <param name="options">Channel options.</param> - public Channel(string host, Credentials credentials, IEnumerable<ChannelOption> options = null) + public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null) { - Preconditions.CheckNotNull(host); - this.environment = GrpcEnvironment.GetInstance(); + this.target = Preconditions.CheckNotNull(target, "target"); + this.environment = GrpcEnvironment.AddRef(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); EnsureUserAgentChannelOption(this.options); @@ -73,11 +78,11 @@ namespace Grpc.Core { if (nativeCredentials != null) { - this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, host, nativeChannelArgs); + this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs); } else { - this.handle = ChannelSafeHandle.CreateInsecure(host, nativeChannelArgs); + this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs); } } } @@ -131,8 +136,8 @@ namespace Grpc.Core return tcs.Task; } - /// <summary> Address of the remote endpoint in URI format.</summary> - public string Target + /// <summary>Resolved address of the remote endpoint in URI format.</summary> + public string ResolvedTarget { get { @@ -140,6 +145,15 @@ namespace Grpc.Core } } + /// <summary>The original target used to create the channel.</summary> + public string Target + { + get + { + return this.target; + } + } + /// <summary> /// Allows explicitly requesting channel to connect without starting an RPC. /// Returned task completes once state Ready was seen. If the deadline is reached, @@ -162,12 +176,26 @@ namespace Grpc.Core } /// <summary> - /// Destroys the underlying channel. + /// Waits until there are no more active calls for this channel and then cleans up + /// resources used by this channel. /// </summary> - public void Dispose() + public async Task ShutdownAsync() { - Dispose(true); - GC.SuppressFinalize(this); + lock (myLock) + { + Preconditions.CheckState(!shutdownRequested); + shutdownRequested = true; + } + + var activeCallCount = activeCallCounter.Count; + if (activeCallCount > 0) + { + Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount); + } + + handle.Dispose(); + + await Task.Run(() => GrpcEnvironment.Release()); } internal ChannelSafeHandle Handle @@ -186,13 +214,20 @@ namespace Grpc.Core } } - protected virtual void Dispose(bool disposing) + internal void AddCallReference(object call) { - if (disposing && handle != null && !disposed) - { - disposed = true; - handle.Dispose(); - } + activeCallCounter.Increment(); + + bool success = false; + handle.DangerousAddRef(ref success); + Preconditions.CheckState(success); + } + + internal void RemoveCallReference(object call) + { + handle.DangerousRelease(); + + activeCallCounter.Decrement(); } private static void EnsureUserAgentChannelOption(List<ChannelOption> options) diff --git a/src/csharp/Grpc.Core/ChannelOptions.cs b/src/csharp/Grpc.Core/ChannelOptions.cs index 1e0f90287a..ad54b46ad5 100644 --- a/src/csharp/Grpc.Core/ChannelOptions.cs +++ b/src/csharp/Grpc.Core/ChannelOptions.cs @@ -63,19 +63,19 @@ namespace Grpc.Core public ChannelOption(string name, string stringValue) { this.type = OptionType.String; - this.name = Preconditions.CheckNotNull(name); - this.stringValue = Preconditions.CheckNotNull(stringValue); + this.name = Preconditions.CheckNotNull(name, "name"); + this.stringValue = Preconditions.CheckNotNull(stringValue, "stringValue"); } /// <summary> /// Creates a channel option with an integer value. /// </summary> /// <param name="name">Name.</param> - /// <param name="stringValue">String value.</param> + /// <param name="intValue">Integer value.</param> public ChannelOption(string name, int intValue) { this.type = OptionType.Integer; - this.name = Preconditions.CheckNotNull(name); + this.name = Preconditions.CheckNotNull(name, "name"); this.intValue = intValue; } diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index 88494bb4ac..903449439b 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -32,31 +32,50 @@ #endregion using System; -using System.Collections.Generic; - -using Grpc.Core.Internal; +using System.Text.RegularExpressions; +using System.Threading.Tasks; namespace Grpc.Core { - public delegate void MetadataInterceptorDelegate(Metadata metadata); + /// <summary> + /// Interceptor for call headers. + /// </summary> + public delegate void HeaderInterceptor(IMethod method, string authUri, Metadata metadata); /// <summary> /// Base class for client-side stubs. /// </summary> public abstract class ClientBase { + // Regex for removal of the optional DNS scheme, trailing port, and trailing backslash + static readonly Regex ChannelTargetPattern = new Regex(@"^(dns:\/{3})?([^:\/]+)(:\d+)?\/?$"); + readonly Channel channel; + readonly string authUriBase; public ClientBase(Channel channel) { this.channel = channel; + this.authUriBase = GetAuthUriBase(channel.Target); + } + + /// <summary> + /// Can be used to register a custom header (request metadata) interceptor. + /// The interceptor is invoked each time a new call on this client is started. + /// </summary> + public HeaderInterceptor HeaderInterceptor + { + get; + set; } /// <summary> - /// Can be used to register a custom header (initial metadata) interceptor. - /// The delegate each time before a new call on this client is started. + /// gRPC supports multiple "hosts" being served by a single server. + /// This property can be used to set the target host explicitly. + /// By default, this will be set to <c>null</c> with the meaning + /// "use default host". /// </summary> - public MetadataInterceptorDelegate HeaderInterceptor + public string Host { get; set; @@ -83,10 +102,28 @@ namespace Grpc.Core var interceptor = HeaderInterceptor; if (interceptor != null) { - interceptor(options.Headers); - options.Headers.Freeze(); + if (options.Headers == null) + { + options = options.WithHeaders(new Metadata()); + } + var authUri = authUriBase != null ? authUriBase + method.ServiceName : null; + interceptor(method, authUri, options.Headers); + } + return new CallInvocationDetails<TRequest, TResponse>(channel, method, Host, options); + } + + /// <summary> + /// Creates Auth URI base from channel's target (the one passed at channel creation). + /// Fully-qualified service name is to be appended to this. + /// </summary> + internal static string GetAuthUriBase(string target) + { + var match = ChannelTargetPattern.Match(target); + if (!match.Success) + { + return null; } - return new CallInvocationDetails<TRequest, TResponse>(channel, method, options); + return "https://" + match.Groups[2].Value + "/"; } } } diff --git a/src/csharp/Grpc.Core/OperationFailedException.cs b/src/csharp/Grpc.Core/CompressionLevel.cs index 9b1c24d0c1..399652b85e 100644 --- a/src/csharp/Grpc.Core/OperationFailedException.cs +++ b/src/csharp/Grpc.Core/CompressionLevel.cs @@ -36,12 +36,28 @@ using System; namespace Grpc.Core { /// <summary> - /// Thrown when gRPC operation fails. + /// Compression level based on grpc_compression_level from grpc/compression.h /// </summary> - public class OperationFailedException : Exception + public enum CompressionLevel { - public OperationFailedException(string message) : base(message) - { - } + /// <summary> + /// No compression. + /// </summary> + None = 0, + + /// <summary> + /// Low compression. + /// </summary> + Low, + + /// <summary> + /// Medium compression. + /// </summary> + Medium, + + /// <summary> + /// High compression. + /// </summary> + High, } } diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs new file mode 100644 index 0000000000..a5bf1b5a70 --- /dev/null +++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs @@ -0,0 +1,170 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading; + +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core +{ + /// <summary> + /// Token for propagating context of server side handlers to child calls. + /// In situations when a backend is making calls to another backend, + /// it makes sense to propagate properties like deadline and cancellation + /// token of the server call to the child call. + /// C core provides some other contexts (like tracing context) that + /// are not accessible to C# layer, but this token still allows propagating them. + /// </summary> + public class ContextPropagationToken + { + /// <summary> + /// Default propagation mask used by C core. + /// </summary> + private const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff; + + /// <summary> + /// Default propagation mask used by C# - we want to propagate deadline + /// and cancellation token by our own means. + /// </summary> + internal const ContextPropagationFlags DefaultMask = DefaultCoreMask + & ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation; + + readonly CallSafeHandle parentCall; + readonly DateTime deadline; + readonly CancellationToken cancellationToken; + readonly ContextPropagationOptions options; + + internal ContextPropagationToken(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options) + { + this.parentCall = Preconditions.CheckNotNull(parentCall); + this.deadline = deadline; + this.cancellationToken = cancellationToken; + this.options = options ?? ContextPropagationOptions.Default; + } + + /// <summary> + /// Gets the native handle of the parent call. + /// </summary> + internal CallSafeHandle ParentCall + { + get + { + return this.parentCall; + } + } + + /// <summary> + /// Gets the parent call's deadline. + /// </summary> + internal DateTime ParentDeadline + { + get + { + return this.deadline; + } + } + + /// <summary> + /// Gets the parent call's cancellation token. + /// </summary> + internal CancellationToken ParentCancellationToken + { + get + { + return this.cancellationToken; + } + } + + /// <summary> + /// Get the context propagation options. + /// </summary> + internal ContextPropagationOptions Options + { + get + { + return this.options; + } + } + } + + /// <summary> + /// Options for <see cref="ContextPropagationToken"/>. + /// </summary> + public class ContextPropagationOptions + { + /// <summary> + /// The context propagation options that will be used by default. + /// </summary> + public static readonly ContextPropagationOptions Default = new ContextPropagationOptions(); + + bool propagateDeadline; + bool propagateCancellation; + + /// <summary> + /// Creates new context propagation options. + /// </summary> + /// <param name="propagateDeadline">If set to <c>true</c> parent call's deadline will be propagated to the child call.</param> + /// <param name="propagateCancellation">If set to <c>true</c> parent call's cancellation token will be propagated to the child call.</param> + public ContextPropagationOptions(bool propagateDeadline = true, bool propagateCancellation = true) + { + this.propagateDeadline = propagateDeadline; + this.propagateCancellation = propagateCancellation; + } + + /// <value><c>true</c> if parent call's deadline should be propagated to the child call.</value> + public bool IsPropagateDeadline + { + get { return this.propagateDeadline; } + } + + /// <value><c>true</c> if parent call's cancellation token should be propagated to the child call.</value> + public bool IsPropagateCancellation + { + get { return this.propagateCancellation; } + } + } + + /// <summary> + /// Context propagation flags from grpc/grpc.h. + /// </summary> + [Flags] + internal enum ContextPropagationFlags + { + Deadline = 1, + CensusStatsContext = 2, + CensusTracingContext = 4, + Cancellation = 8 + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 52defd1965..ad2af17bc7 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -49,6 +49,7 @@ <Compile Include="AsyncDuplexStreamingCall.cs" /> <Compile Include="AsyncServerStreamingCall.cs" /> <Compile Include="IClientStreamWriter.cs" /> + <Compile Include="Internal\INativeCall.cs" /> <Compile Include="IServerStreamWriter.cs" /> <Compile Include="IAsyncStreamWriter.cs" /> <Compile Include="IAsyncStreamReader.cs" /> @@ -77,14 +78,12 @@ <Compile Include="ServerServiceDefinition.cs" /> <Compile Include="Utils\AsyncStreamExtensions.cs" /> <Compile Include="Utils\BenchmarkUtil.cs" /> - <Compile Include="Utils\ExceptionHelper.cs" /> <Compile Include="Internal\CredentialsSafeHandle.cs" /> <Compile Include="Credentials.cs" /> <Compile Include="Internal\ChannelArgsSafeHandle.cs" /> <Compile Include="Internal\AsyncCompletion.cs" /> <Compile Include="Internal\AsyncCallBase.cs" /> <Compile Include="Internal\AsyncCallServer.cs" /> - <Compile Include="OperationFailedException.cs" /> <Compile Include="Internal\AsyncCall.cs" /> <Compile Include="Utils\Preconditions.cs" /> <Compile Include="Internal\ServerCredentialsSafeHandle.cs" /> @@ -115,6 +114,9 @@ <Compile Include="ChannelState.cs" /> <Compile Include="CallInvocationDetails.cs" /> <Compile Include="CallOptions.cs" /> + <Compile Include="CompressionLevel.cs" /> + <Compile Include="WriteOptions.cs" /> + <Compile Include="ContextPropagationToken.cs" /> </ItemGroup> <ItemGroup> <None Include="Grpc.Core.nuspec" /> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 034a66be3c..e7c04185c2 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -53,8 +53,12 @@ namespace Grpc.Core [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_shutdown(); + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_version_string(); // returns not-owned const char* + static object staticLock = new object(); static GrpcEnvironment instance; + static int refCount; static ILogger logger = new ConsoleLogger(); @@ -64,13 +68,14 @@ namespace Grpc.Core bool isClosed; /// <summary> - /// Returns an instance of initialized gRPC environment. - /// Subsequent invocations return the same instance unless Shutdown has been called first. + /// Returns a reference-counted instance of initialized gRPC environment. + /// Subsequent invocations return the same instance unless reference count has dropped to zero previously. /// </summary> - internal static GrpcEnvironment GetInstance() + internal static GrpcEnvironment AddRef() { lock (staticLock) { + refCount++; if (instance == null) { instance = new GrpcEnvironment(); @@ -80,14 +85,16 @@ namespace Grpc.Core } /// <summary> - /// Shuts down the gRPC environment if it was initialized before. - /// Blocks until the environment has been fully shutdown. + /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero. + /// (and blocks until the environment has been fully shutdown). /// </summary> - public static void Shutdown() + internal static void Release() { lock (staticLock) { - if (instance != null) + Preconditions.CheckState(refCount > 0); + refCount--; + if (refCount == 0) { instance.Close(); instance = null; @@ -95,6 +102,14 @@ namespace Grpc.Core } } + internal static int GetRefCount() + { + lock (staticLock) + { + return refCount; + } + } + /// <summary> /// Gets application-wide logger used by gRPC. /// </summary> @@ -112,7 +127,7 @@ namespace Grpc.Core /// </summary> public static void SetLogger(ILogger customLogger) { - Preconditions.CheckNotNull(customLogger); + Preconditions.CheckNotNull(customLogger, "customLogger"); logger = customLogger; } @@ -122,12 +137,10 @@ namespace Grpc.Core private GrpcEnvironment() { NativeLogRedirector.Redirect(); - grpcsharp_init(); + GrpcNativeInit(); completionRegistry = new CompletionRegistry(this); threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE); threadPool.Start(); - // TODO: use proper logging here - Logger.Info("gRPC initialized."); } /// <summary> @@ -164,6 +177,25 @@ namespace Grpc.Core } /// <summary> + /// Gets version of gRPC C core. + /// </summary> + internal static string GetCoreVersionString() + { + var ptr = grpcsharp_version_string(); // the pointer is not owned + return Marshal.PtrToStringAnsi(ptr); + } + + internal static void GrpcNativeInit() + { + grpcsharp_init(); + } + + internal static void GrpcNativeShutdown() + { + grpcsharp_shutdown(); + } + + /// <summary> /// Shuts down this environment. /// </summary> private void Close() @@ -173,30 +205,10 @@ namespace Grpc.Core throw new InvalidOperationException("Close has already been called"); } threadPool.Stop(); - grpcsharp_shutdown(); + GrpcNativeShutdown(); isClosed = true; debugStats.CheckOK(); - - Logger.Info("gRPC shutdown."); - } - - /// <summary> - /// Shuts down this environment asynchronously. - /// </summary> - private Task CloseAsync() - { - return Task.Run(() => - { - try - { - Close(); - } - catch (Exception e) - { - Logger.Error(e, "Error occured while shutting down GrpcEnvironment."); - } - }); } } } diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 371fbf27ce..c0a0674e50 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -43,7 +43,7 @@ namespace Grpc.Core /// A stream of messages to be read. /// </summary> /// <typeparam name="T"></typeparam> - public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse> + public interface IAsyncStreamReader<T> : IAsyncEnumerator<T> { // TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface. } diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs index 2000210252..4e2acb9c71 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs @@ -50,5 +50,13 @@ namespace Grpc.Core /// </summary> /// <param name="message">the message to be written. Cannot be null.</param> Task WriteAsync(T message); + + /// <summary> + /// Write options that will be used for the next write. + /// If null, default options will be used. + /// Once set, this property maintains its value across subsequent + /// writes. + /// <value>The write options.</value> + WriteOptions WriteOptions { get; set; } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 414b5c4282..be5d611a53 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -50,20 +50,34 @@ namespace Grpc.Core.Internal { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); - readonly CallInvocationDetails<TRequest, TResponse> callDetails; + readonly CallInvocationDetails<TRequest, TResponse> details; + readonly INativeCall injectedNativeCall; // for testing // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; + // Indicates that steaming call has finished. + TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>(); + + // Response headers set here once received. + TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>(); + // Set after status is received. Used for both unary and streaming response calls. ClientSideStatus? finishedStatus; - bool readObserverCompleted; // True if readObserver has already been completed. - public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) - : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) + : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment) { - this.callDetails = callDetails; + this.details = callDetails.WithOptions(callDetails.Options.Normalize()); + this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. + } + + /// <summary> + /// This constructor should only be used for testing. + /// </summary> + public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails) + { + this.injectedNativeCall = injectedNativeCall; } // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but @@ -89,17 +103,17 @@ namespace Grpc.Core.Internal readingDone = true; } - using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { using (var ctx = BatchContextSafeHandle.Create()) { - call.StartUnary(payload, ctx, metadataArray); + call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall()); var ev = cq.Pluck(ctx.Handle); bool success = (ev.success != 0); try { - HandleUnaryResponse(success, ctx); + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); } catch (Exception e) { @@ -108,15 +122,9 @@ namespace Grpc.Core.Internal } } - try - { - // Once the blocking call returns, the result should be available synchronously. - return unaryResponseTcs.Task.Result; - } - catch (AggregateException ae) - { - throw ExceptionHelper.UnwrapRpcException(ae); - } + // Once the blocking call returns, the result should be available synchronously. + // Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException. + return unaryResponseTcs.Task.GetAwaiter().GetResult(); } } @@ -130,7 +138,7 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(callDetails.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); halfcloseRequested = true; readingDone = true; @@ -138,9 +146,9 @@ namespace Grpc.Core.Internal byte[] payload = UnsafeSerialize(msg); unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartUnary(payload, HandleUnaryResponse, metadataArray); + call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall()); } return unaryResponseTcs.Task; } @@ -157,12 +165,12 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(callDetails.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); readingDone = true; unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartClientStreaming(HandleUnaryResponse, metadataArray); } @@ -181,17 +189,17 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(callDetails.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); halfcloseRequested = true; - halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. byte[] payload = UnsafeSerialize(msg); - using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartServerStreaming(payload, HandleFinished, metadataArray); + call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall()); } + call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); } } @@ -206,12 +214,13 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(callDetails.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); - using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartDuplexStreaming(HandleFinished, metadataArray); } + call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); } } @@ -219,9 +228,9 @@ namespace Grpc.Core.Internal /// Sends a streaming request. Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate) + public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) { - StartSendMessageInternal(msg, completionDelegate); + StartSendMessageInternal(msg, writeFlags, completionDelegate); } /// <summary> @@ -253,6 +262,28 @@ namespace Grpc.Core.Internal } /// <summary> + /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise. + /// </summary> + public Task StreamingCallFinishedTask + { + get + { + return streamingCallFinishedTcs.Task; + } + } + + /// <summary> + /// Get the task that completes once response headers are received. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return responseHeadersTcs.Task; + } + } + + /// <summary> /// Gets the resulting status if the call has already finished. /// Throws InvalidOperationException otherwise. /// </summary> @@ -278,54 +309,45 @@ namespace Grpc.Core.Internal } } - /// <summary> - /// On client-side, we only fire readCompletionDelegate once all messages have been read - /// and status has been received. - /// </summary> - protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate) + public CallInvocationDetails<TRequest, TResponse> Details { - if (completionDelegate != null && readingDone && finishedStatus.HasValue) + get { - bool shouldComplete; - lock (myLock) - { - shouldComplete = !readObserverCompleted; - readObserverCompleted = true; - } - - if (shouldComplete) - { - var status = finishedStatus.Value.Status; - if (status.StatusCode != StatusCode.OK) - { - FireCompletion(completionDelegate, default(TResponse), new RpcException(status)); - } - else - { - FireCompletion(completionDelegate, default(TResponse), null); - } - } + return this.details; } } - protected override void OnReleaseResources() + protected override void OnAfterReleaseResources() { - callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement(); + details.Channel.RemoveCallReference(this); } private void Initialize(CompletionQueueSafeHandle cq) { - var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq, - callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Options.Deadline)); - callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment(); + var call = CreateNativeCall(cq); + details.Channel.AddCallReference(this); InitializeInternal(call); RegisterCancellationCallback(); } + private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq) + { + if (injectedNativeCall != null) + { + return injectedNativeCall; // allows injecting a mock INativeCall in tests. + } + + var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance; + + return details.Channel.Handle.CreateCall(environment.CompletionRegistry, + parentCall, ContextPropagationToken.DefaultMask, cq, + details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value)); + } + // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called. private void RegisterCancellationCallback() { - var token = callDetails.Options.CancellationToken; + var token = details.Options.CancellationToken; if (token.CanBeCanceled) { token.Register(() => this.Cancel()); @@ -333,31 +355,40 @@ namespace Grpc.Core.Internal } /// <summary> - /// Handler for unary response completion. + /// Gets WriteFlags set in callDetails.Options.WriteOptions + /// </summary> + private WriteFlags GetWriteFlagsForCall() + { + var writeOptions = details.Options.WriteOptions; + return writeOptions != null ? writeOptions.Flags : default(WriteFlags); + } + + /// <summary> + /// Handles receive status completion for calls with streaming response. /// </summary> - private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx) + private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders) { - var fullStatus = ctx.GetReceivedStatusOnClient(); + responseHeadersTcs.SetResult(responseHeaders); + } + /// <summary> + /// Handler for unary response completion. + /// </summary> + private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) + { lock (myLock) { finished = true; - finishedStatus = fullStatus; - - halfclosed = true; + finishedStatus = receivedStatus; ReleaseResourcesIfPossible(); } - if (!success) - { - unaryResponseTcs.SetException(new RpcException(new Status(StatusCode.Internal, "Internal error occured."))); - return; - } + responseHeadersTcs.SetResult(responseHeaders); - var status = fullStatus.Status; + var status = receivedStatus.Status; - if (status.StatusCode != StatusCode.OK) + if (!success || status.StatusCode != StatusCode.OK) { unaryResponseTcs.SetException(new RpcException(status)); return; @@ -365,7 +396,7 @@ namespace Grpc.Core.Internal // TODO: handle deserialization error TResponse msg; - TryDeserialize(ctx.GetReceivedMessage(), out msg); + TryDeserialize(receivedMessage, out msg); unaryResponseTcs.SetResult(msg); } @@ -373,22 +404,25 @@ namespace Grpc.Core.Internal /// <summary> /// Handles receive status completion for calls with streaming response. /// </summary> - private void HandleFinished(bool success, BatchContextSafeHandle ctx) + private void HandleFinished(bool success, ClientSideStatus receivedStatus) { - var fullStatus = ctx.GetReceivedStatusOnClient(); - - AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null; lock (myLock) { finished = true; - finishedStatus = fullStatus; - - origReadCompletionDelegate = readCompletionDelegate; + finishedStatus = receivedStatus; ReleaseResourcesIfPossible(); } - ProcessLastRead(origReadCompletionDelegate); + var status = receivedStatus.Status; + + if (!success || status.StatusCode != StatusCode.OK) + { + streamingCallFinishedTcs.SetException(new RpcException(status)); + return; + } + + streamingCallFinishedTcs.SetResult(null); } } }
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 38f2a5baeb..4d20394644 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -54,27 +54,30 @@ namespace Grpc.Core.Internal readonly Func<TWrite, byte[]> serializer; readonly Func<byte[], TRead> deserializer; + protected readonly GrpcEnvironment environment; protected readonly object myLock = new object(); - protected CallSafeHandle call; + protected INativeCall call; protected bool disposed; protected bool started; - protected bool errorOccured; protected bool cancelRequested; protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null. - protected bool readingDone; - protected bool halfcloseRequested; - protected bool halfclosed; + protected bool readingDone; // True if last read (i.e. read with null payload) was already received. + protected bool halfcloseRequested; // True if send close have been initiated. protected bool finished; // True if close has been received from the peer. - public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) + protected bool initialMetadataSent; + protected long streamingWritesCounter; // Number of streaming send operations started so far. + + public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment) { this.serializer = Preconditions.CheckNotNull(serializer); this.deserializer = Preconditions.CheckNotNull(deserializer); + this.environment = Preconditions.CheckNotNull(environment); } /// <summary> @@ -111,7 +114,7 @@ namespace Grpc.Core.Internal } } - protected void InitializeInternal(CallSafeHandle call) + protected void InitializeInternal(INativeCall call) { lock (myLock) { @@ -123,7 +126,7 @@ namespace Grpc.Core.Internal /// Initiates sending a message. Only one send operation can be active at a time. /// completionDelegate is invoked upon completion. /// </summary> - protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate) + protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) { byte[] payload = UnsafeSerialize(msg); @@ -132,8 +135,11 @@ namespace Grpc.Core.Internal Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); - call.StartSendMessage(payload, HandleSendFinished); + call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); + sendCompletionDelegate = completionDelegate; + initialMetadataSent = true; + streamingWritesCounter++; } } @@ -153,16 +159,6 @@ namespace Grpc.Core.Internal } } - // TODO(jtattermusch): find more fitting name for this method. - /// <summary> - /// Default behavior just completes the read observer, but more sofisticated behavior might be required - /// by subclasses. - /// </summary> - protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate) - { - FireCompletion(completionDelegate, default(TRead), null); - } - /// <summary> /// If there are no more pending actions and no new actions can be started, releases /// the underlying native resources. @@ -171,7 +167,7 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null); + bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); @@ -183,34 +179,33 @@ namespace Grpc.Core.Internal private void ReleaseResources() { - OnReleaseResources(); if (call != null) { call.Dispose(); } disposed = true; + OnAfterReleaseResources(); } - protected virtual void OnReleaseResources() + protected virtual void OnAfterReleaseResources() { } protected void CheckSendingAllowed() { Preconditions.CheckState(started); - Preconditions.CheckState(!errorOccured); CheckNotCancelled(); Preconditions.CheckState(!disposed); Preconditions.CheckState(!halfcloseRequested, "Already halfclosed."); + Preconditions.CheckState(!finished, "Already finished."); Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } - protected void CheckReadingAllowed() + protected virtual void CheckReadingAllowed() { Preconditions.CheckState(started); Preconditions.CheckState(!disposed); - Preconditions.CheckState(!errorOccured); Preconditions.CheckState(!readingDone, "Stream has already been closed."); Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); @@ -274,7 +269,7 @@ namespace Grpc.Core.Internal /// <summary> /// Handles send completion. /// </summary> - protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx) + protected void HandleSendFinished(bool success) { AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) @@ -287,7 +282,7 @@ namespace Grpc.Core.Internal if (!success) { - FireCompletion(origCompletionDelegate, null, new OperationFailedException("Send failed")); + FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed")); } else { @@ -298,12 +293,11 @@ namespace Grpc.Core.Internal /// <summary> /// Handles halfclose completion. /// </summary> - protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx) + protected void HandleHalfclosed(bool success) { AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) { - halfclosed = true; origCompletionDelegate = sendCompletionDelegate; sendCompletionDelegate = null; @@ -312,7 +306,7 @@ namespace Grpc.Core.Internal if (!success) { - FireCompletion(origCompletionDelegate, null, new OperationFailedException("Halfclose failed")); + FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Halfclose failed")); } else { @@ -323,23 +317,17 @@ namespace Grpc.Core.Internal /// <summary> /// Handles streaming read completion. /// </summary> - protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx) + protected void HandleReadFinished(bool success, byte[] receivedMessage) { - var payload = ctx.GetReceivedMessage(); - AsyncCompletionDelegate<TRead> origCompletionDelegate = null; lock (myLock) { origCompletionDelegate = readCompletionDelegate; - if (payload != null) - { - readCompletionDelegate = null; - } - else + readCompletionDelegate = null; + + if (receivedMessage == null) { - // This was the last read. Keeping the readCompletionDelegate - // to be either fired by this handler or by client-side finished - // handler. + // This was the last read. readingDone = true; } @@ -348,17 +336,17 @@ namespace Grpc.Core.Internal // TODO: handle the case when error occured... - if (payload != null) + if (receivedMessage != null) { // TODO: handle deserialization error TRead msg; - TryDeserialize(payload, out msg); + TryDeserialize(receivedMessage, out msg); FireCompletion(origCompletionDelegate, msg, null); } else { - ProcessLastRead(origCompletionDelegate); + FireCompletion(origCompletionDelegate, default(TRead), null); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 513902ee36..5c47251030 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -49,17 +49,18 @@ namespace Grpc.Core.Internal { readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); - readonly GrpcEnvironment environment; + readonly Server server; - public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer) + public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment) { - this.environment = Preconditions.CheckNotNull(environment); + this.server = Preconditions.CheckNotNull(server); } public void Initialize(CallSafeHandle call) { call.SetCompletionRegistry(environment.CompletionRegistry); - environment.DebugStats.ActiveServerCalls.Increment(); + + server.AddCallReference(this); InitializeInternal(call); } @@ -83,9 +84,9 @@ namespace Grpc.Core.Internal /// Sends a streaming response. Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate) + public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) { - StartSendMessageInternal(msg, completionDelegate); + StartSendMessageInternal(msg, writeFlags, completionDelegate); } /// <summary> @@ -98,6 +99,35 @@ namespace Grpc.Core.Internal } /// <summary> + /// Initiates sending a initial metadata. + /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation + /// to make things simpler. + /// completionDelegate is invoked upon completion. + /// </summary> + public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate) + { + lock (myLock) + { + Preconditions.CheckNotNull(headers, "metadata"); + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + + Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call."); + Preconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts."); + CheckSendingAllowed(); + + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + + using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + { + call.StartSendInitialMetadata(HandleSendFinished, metadataArray); + } + + this.initialMetadataSent = true; + sendCompletionDelegate = completionDelegate; + } + } + + /// <summary> /// Sends call result status, also indicating server is done with streaming responses. /// Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. @@ -111,7 +141,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray); + call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent); } halfcloseRequested = true; readingDone = true; @@ -139,18 +169,22 @@ namespace Grpc.Core.Internal } } - protected override void OnReleaseResources() + protected override void CheckReadingAllowed() { - environment.DebugStats.ActiveServerCalls.Decrement(); + base.CheckReadingAllowed(); + Preconditions.CheckArgument(!cancelRequested); + } + + protected override void OnAfterReleaseResources() + { + server.RemoveCallReference(this); } /// <summary> /// Handles the server side close completion. /// </summary> - private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx) + private void HandleFinishedServerside(bool success, bool cancelled) { - bool cancelled = ctx.GetReceivedCloseOnServerCancelled(); - lock (myLock) { finished = true; diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 6a2add54db..3a96414bea 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -134,7 +134,7 @@ namespace Grpc.Core.Internal } // Gets data of server_rpc_new completion. - public ServerRpcNew GetServerRpcNew() + public ServerRpcNew GetServerRpcNew(Server server) { var call = grpcsharp_batch_context_server_rpc_new_call(this); @@ -145,7 +145,7 @@ namespace Grpc.Core.Internal IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this); var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); - return new ServerRpcNew(call, method, host, deadline, metadata); + return new ServerRpcNew(server, call, method, host, deadline, metadata); } // Gets data of receive_close_on_server completion. @@ -198,14 +198,16 @@ namespace Grpc.Core.Internal /// </summary> internal struct ServerRpcNew { + readonly Server server; readonly CallSafeHandle call; readonly string method; readonly string host; readonly Timespec deadline; readonly Metadata requestMetadata; - public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) + public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) { + this.server = server; this.call = call; this.method = method; this.host = host; @@ -213,6 +215,14 @@ namespace Grpc.Core.Internal this.requestMetadata = requestMetadata; } + public Server Server + { + get + { + return this.server; + } + } + public CallSafeHandle Call { get diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 714749b171..0f187529e8 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -40,8 +40,10 @@ namespace Grpc.Core.Internal /// <summary> /// grpc_call from <grpc/grpc.h> /// </summary> - internal class CallSafeHandle : SafeHandleZeroIsInvalid + internal class CallSafeHandle : SafeHandleZeroIsInvalid, INativeCall { + public static readonly CallSafeHandle NullInstance = new CallSafeHandle(); + const uint GRPC_WRITE_BUFFER_HINT = 1; CompletionRegistry completionRegistry; @@ -53,7 +55,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray); + BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, @@ -62,7 +64,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, - MetadataArraySafeHandle metadataArray); + MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, @@ -70,7 +72,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len); + BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, @@ -78,17 +80,25 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call, + BatchContextSafeHandle ctx); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call, + BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call); [DllImport("grpc_csharp_ext.dll")] @@ -103,76 +113,90 @@ namespace Grpc.Core.Internal this.completionRegistry = completionRegistry; } - public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray) + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } - public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray) + public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { - grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray) + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } - public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); } - public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback) + public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); } - public void StartSendCloseFromClient(BatchCompletionDelegate callback) + public void StartSendCloseFromClient(SendCompletionHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } - public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); } - public void StartReceiveMessage(BatchCompletionDelegate callback) + public void StartReceiveMessage(ReceivedMessageHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); grpcsharp_call_recv_message(this, ctx).CheckOk(); } - public void StartServerSide(BatchCompletionDelegate callback) + public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); + grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); + } + + public void StartServerSide(ReceivedCloseOnServerHandler callback) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); grpcsharp_call_start_serverside(this, ctx).CheckOk(); } + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); + } + public void Cancel() { grpcsharp_call_cancel(this).CheckOk(); diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 7324ebdf57..8cef566c14 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -47,7 +47,7 @@ namespace Grpc.Core.Internal static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs); [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); + static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); [DllImport("grpc_csharp_ext.dll")] static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect); @@ -68,17 +68,23 @@ namespace Grpc.Core.Internal public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_insecure_channel_create(target, channelArgs); } public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_secure_channel_create(credentials, target, channelArgs); } - public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) + public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) { - var result = grpcsharp_channel_create_call(this, cq, method, host, deadline); + var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline); result.SetCompletionRegistry(registry); return result; } @@ -107,6 +113,7 @@ namespace Grpc.Core.Internal protected override bool ReleaseHandle() { grpcsharp_channel_destroy(handle); + GrpcEnvironment.GrpcNativeShutdown(); return true; } } diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 58f493463b..013f00ff6f 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -40,16 +40,18 @@ namespace Grpc.Core.Internal internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest> { readonly AsyncCall<TRequest, TResponse> call; + WriteOptions writeOptions; public ClientRequestStream(AsyncCall<TRequest, TResponse> call) { this.call = call; + this.writeOptions = call.Details.Options.WriteOptions; } public Task WriteAsync(TRequest message) { var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendMessage(message, taskSource.CompletionDelegate); + call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); return taskSource.Task; } @@ -59,5 +61,24 @@ namespace Grpc.Core.Internal call.StartSendCloseFromClient(taskSource.CompletionDelegate); return taskSource.Task; } + + public WriteOptions WriteOptions + { + get + { + return this.writeOptions; + } + + set + { + writeOptions = value; + } + } + + private WriteFlags GetWriteFlags() + { + var options = writeOptions; + return options != null ? options.Flags : default(WriteFlags); + } } } diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index 6c44521038..b4a7335c7c 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -72,7 +72,13 @@ namespace Grpc.Core.Internal call.StartReadMessage(taskSource.CompletionDelegate); var result = await taskSource.Task; this.current = result; - return result != null; + + if (result == null) + { + await call.StreamingCallFinishedTask; + return false; + } + return true; } public void Dispose() diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs index 8793450ff3..1bea1adf9e 100644 --- a/src/csharp/Grpc.Core/Internal/DebugStats.cs +++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs @@ -38,10 +38,6 @@ namespace Grpc.Core.Internal { internal class DebugStats { - public readonly AtomicCounter ActiveClientCalls = new AtomicCounter(); - - public readonly AtomicCounter ActiveServerCalls = new AtomicCounter(); - public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter(); /// <summary> @@ -49,16 +45,6 @@ namespace Grpc.Core.Internal /// </summary> public void CheckOK() { - var remainingClientCalls = ActiveClientCalls.Count; - if (remainingClientCalls != 0) - { - DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls)); - } - var remainingServerCalls = ActiveServerCalls.Count; - if (remainingServerCalls != 0) - { - DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls)); - } var pendingBatchCompletions = PendingBatchCompletions.Count; if (pendingBatchCompletions != 0) { diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index cb4c7c821e..4b7124ee74 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -83,8 +83,6 @@ namespace Grpc.Core.Internal lock (myLock) { cq.Shutdown(); - - Logger.Info("Waiting for GRPC threads to finish."); foreach (var thread in threads) { thread.Join(); @@ -136,7 +134,6 @@ namespace Grpc.Core.Internal } } while (ev.type != GRPCCompletionType.Shutdown); - Logger.Info("Completion queue has shutdown successfully, thread {0} exiting.", Thread.CurrentThread.Name); } } } diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs new file mode 100644 index 0000000000..cbef599139 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -0,0 +1,85 @@ +#region Copyright notice and license +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core.Internal +{ + internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders); + + // Received status for streaming response calls. + internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus); + + internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage); + + internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders); + + internal delegate void SendCompletionHandler(bool success); + + internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled); + + /// <summary> + /// Abstraction of a native call object. + /// </summary> + internal interface INativeCall : IDisposable + { + void Cancel(); + + void CancelWithStatus(Grpc.Core.Status status); + + string GetPeer(); + + void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags); + + void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags); + + void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray); + + void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags); + + void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray); + + void StartReceiveMessage(ReceivedMessageHandler callback); + + void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback); + + void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray); + + void StartSendMessage(SendCompletionHandler callback, byte[] payload, Grpc.Core.WriteFlags writeFlags, bool sendEmptyInitialMetadata); + + void StartSendCloseFromClient(SendCompletionHandler callback); + + void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + + void StartServerSide(ReceivedCloseOnServerHandler callback); + } +} diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs index 427c16fac6..83994f6762 100644 --- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs @@ -70,7 +70,8 @@ namespace Grpc.Core.Internal var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count)); for (int i = 0; i < metadata.Count; i++) { - grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, metadata[i].ValueBytes, new UIntPtr((ulong)metadata[i].ValueBytes.Length)); + var valueBytes = metadata[i].GetSerializedValueUnsafe(); + grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length)); } return metadataArray; } @@ -94,7 +95,7 @@ namespace Grpc.Core.Internal string key = Marshal.PtrToStringAnsi(grpcsharp_metadata_array_get_key(metadataArray, index)); var bytes = new byte[grpcsharp_metadata_array_get_value_length(metadataArray, index).ToUInt64()]; Marshal.Copy(grpcsharp_metadata_array_get_value(metadataArray, index), bytes, 0, bytes.Length); - metadata.Add(new Metadata.Entry(key, bytes)); + metadata.Add(Metadata.Entry.CreateUnsafe(key, bytes)); } return metadata; } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 19f0e3c57f..59f4c5727c 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -67,7 +67,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -75,7 +75,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { Preconditions.CheckArgument(await requestStream.MoveNext()); @@ -123,7 +123,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -131,7 +131,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { Preconditions.CheckArgument(await requestStream.MoveNext()); @@ -179,7 +179,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -187,7 +187,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { var result = await handler(requestStream, context); @@ -239,7 +239,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -247,7 +247,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { await handler(requestStream, responseStream, context); @@ -278,7 +278,7 @@ namespace Grpc.Core.Internal { // We don't care about the payload type here. var asyncCall = new AsyncCallServer<byte[], byte[]>( - (payload) => payload, (payload) => payload, environment); + (payload) => payload, (payload) => payload, environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -304,13 +304,14 @@ namespace Grpc.Core.Internal return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } - public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken) + public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken) + where TRequest : class + where TResponse : class { DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime(); - return new ServerCallContext( - newRpc.Method, newRpc.Host, peer, realtimeDeadline, - newRpc.RequestMetadata, cancellationToken); + return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline, + newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream); } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 756dcee87f..03e39efc02 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -38,11 +38,12 @@ namespace Grpc.Core.Internal /// <summary> /// Writes responses asynchronously to an underlying AsyncCallServer object. /// </summary> - internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse> + internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions where TRequest : class where TResponse : class { readonly AsyncCallServer<TRequest, TResponse> call; + WriteOptions writeOptions; public ServerResponseStream(AsyncCallServer<TRequest, TResponse> call) { @@ -52,7 +53,7 @@ namespace Grpc.Core.Internal public Task WriteAsync(TResponse message) { var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendMessage(message, taskSource.CompletionDelegate); + call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); return taskSource.Task; } @@ -62,5 +63,31 @@ namespace Grpc.Core.Internal call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); return taskSource.Task; } + + public Task WriteResponseHeadersAsync(Metadata responseHeaders) + { + var taskSource = new AsyncCompletionTaskSource<object>(); + call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate); + return taskSource.Task; + } + + public WriteOptions WriteOptions + { + get + { + return writeOptions; + } + + set + { + writeOptions = value; + } + } + + private WriteFlags GetWriteFlags() + { + var options = writeOptions; + return options != null ? options.Flags : default(WriteFlags); + } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index f9b44b1acf..5ee7ac14e8 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -74,6 +74,9 @@ namespace Grpc.Core.Internal public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_server_create(cq, args); } @@ -109,6 +112,7 @@ namespace Grpc.Core.Internal protected override bool ReleaseHandle() { grpcsharp_server_destroy(handle); + GrpcEnvironment.GrpcNativeShutdown(); return true; } diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs index e83d21f4a4..daf85d5f61 100644 --- a/src/csharp/Grpc.Core/Internal/Timespec.cs +++ b/src/csharp/Grpc.Core/Internal/Timespec.cs @@ -211,7 +211,7 @@ namespace Grpc.Core.Internal return Timespec.InfPast; } - Preconditions.CheckArgument(dateTime.Kind == DateTimeKind.Utc, "dateTime"); + Preconditions.CheckArgument(dateTime.Kind == DateTimeKind.Utc, "dateTime needs of kind DateTimeKind.Utc or be equal to DateTime.MaxValue or DateTime.MinValue."); try { diff --git a/src/csharp/Grpc.Core/KeyCertificatePair.cs b/src/csharp/Grpc.Core/KeyCertificatePair.cs index 5def15a656..6f691975e9 100644 --- a/src/csharp/Grpc.Core/KeyCertificatePair.cs +++ b/src/csharp/Grpc.Core/KeyCertificatePair.cs @@ -54,8 +54,8 @@ namespace Grpc.Core /// <param name="privateKey">PEM encoded private key.</param> public KeyCertificatePair(string certificateChain, string privateKey) { - this.certificateChain = Preconditions.CheckNotNull(certificateChain); - this.privateKey = Preconditions.CheckNotNull(privateKey); + this.certificateChain = Preconditions.CheckNotNull(certificateChain, "certificateChain"); + this.privateKey = Preconditions.CheckNotNull(privateKey, "privateKey"); } /// <summary> diff --git a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs index c67765c78d..35561d25d8 100644 --- a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs +++ b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs @@ -42,16 +42,33 @@ namespace Grpc.Core.Logging readonly Type forType; readonly string forTypeString; + /// <summary>Creates a console logger not associated to any specific type.</summary> public ConsoleLogger() : this(null) { } + /// <summary>Creates a console logger that logs messsage specific for given type.</summary> private ConsoleLogger(Type forType) { this.forType = forType; - this.forTypeString = forType != null ? forType.FullName + " " : ""; + if (forType != null) + { + var namespaceStr = forType.Namespace ?? ""; + if (namespaceStr.Length > 0) + { + namespaceStr += "."; + } + this.forTypeString = namespaceStr + forType.Name + " "; + } + else + { + this.forTypeString = ""; + } } - + + /// <summary> + /// Returns a logger associated with the specified type. + /// </summary> public ILogger ForType<T>() { if (typeof(T) == forType) @@ -61,31 +78,37 @@ namespace Grpc.Core.Logging return new ConsoleLogger(typeof(T)); } + /// <summary>Logs a message with severity Debug.</summary> public void Debug(string message, params object[] formatArgs) { Log("D", message, formatArgs); } + /// <summary>Logs a message with severity Info.</summary> public void Info(string message, params object[] formatArgs) { Log("I", message, formatArgs); } + /// <summary>Logs a message with severity Warning.</summary> public void Warning(string message, params object[] formatArgs) { Log("W", message, formatArgs); } + /// <summary>Logs a message and an associated exception with severity Warning.</summary> public void Warning(Exception exception, string message, params object[] formatArgs) { Log("W", message + " " + exception, formatArgs); } + /// <summary>Logs a message with severity Error.</summary> public void Error(string message, params object[] formatArgs) { Log("E", message, formatArgs); } + /// <summary>Logs a message and an associated exception with severity Error.</summary> public void Error(Exception exception, string message, params object[] formatArgs) { Log("E", message + " " + exception, formatArgs); diff --git a/src/csharp/Grpc.Core/Logging/ILogger.cs b/src/csharp/Grpc.Core/Logging/ILogger.cs index 0d58f133e3..61e0c91388 100644 --- a/src/csharp/Grpc.Core/Logging/ILogger.cs +++ b/src/csharp/Grpc.Core/Logging/ILogger.cs @@ -42,16 +42,22 @@ namespace Grpc.Core.Logging /// <summary>Returns a logger associated with the specified type.</summary> ILogger ForType<T>(); + /// <summary>Logs a message with severity Debug.</summary> void Debug(string message, params object[] formatArgs); + /// <summary>Logs a message with severity Info.</summary> void Info(string message, params object[] formatArgs); + /// <summary>Logs a message with severity Warning.</summary> void Warning(string message, params object[] formatArgs); + /// <summary>Logs a message and an associated exception with severity Warning.</summary> void Warning(Exception exception, string message, params object[] formatArgs); + /// <summary>Logs a message with severity Error.</summary> void Error(string message, params object[] formatArgs); + /// <summary>Logs a message and an associated exception with severity Error.</summary> void Error(Exception exception, string message, params object[] formatArgs); } } diff --git a/src/csharp/Grpc.Core/Marshaller.cs b/src/csharp/Grpc.Core/Marshaller.cs index 8b1a929074..f38cb0863f 100644 --- a/src/csharp/Grpc.Core/Marshaller.cs +++ b/src/csharp/Grpc.Core/Marshaller.cs @@ -37,19 +37,27 @@ using Grpc.Core.Utils; namespace Grpc.Core { /// <summary> - /// For serializing and deserializing messages. + /// Encapsulates the logic for serializing and deserializing messages. /// </summary> public struct Marshaller<T> { readonly Func<T, byte[]> serializer; readonly Func<byte[], T> deserializer; + /// <summary> + /// Initializes a new marshaller. + /// </summary> + /// <param name="serializer">Function that will be used to serialize messages.</param> + /// <param name="deserializer">Function that will be used to deserialize messages.</param> public Marshaller(Func<T, byte[]> serializer, Func<byte[], T> deserializer) { - this.serializer = Preconditions.CheckNotNull(serializer); - this.deserializer = Preconditions.CheckNotNull(deserializer); + this.serializer = Preconditions.CheckNotNull(serializer, "serializer"); + this.deserializer = Preconditions.CheckNotNull(deserializer, "deserializer"); } + /// <summary> + /// Gets the serializer function. + /// </summary> public Func<T, byte[]> Serializer { get @@ -58,6 +66,9 @@ namespace Grpc.Core } } + /// <summary> + /// Gets the deserializer function. + /// </summary> public Func<byte[], T> Deserializer { get @@ -72,11 +83,17 @@ namespace Grpc.Core /// </summary> public static class Marshallers { + /// <summary> + /// Creates a marshaller from specified serializer and deserializer. + /// </summary> public static Marshaller<T> Create<T>(Func<T, byte[]> serializer, Func<byte[], T> deserializer) { return new Marshaller<T>(serializer, deserializer); } + /// <summary> + /// Returns a marshaller for <c>string</c> type. This is useful for testing. + /// </summary> public static Marshaller<string> StringMarshaller { get diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs index 6fd0a7109d..a589b50caa 100644 --- a/src/csharp/Grpc.Core/Metadata.cs +++ b/src/csharp/Grpc.Core/Metadata.cs @@ -46,6 +46,11 @@ namespace Grpc.Core public sealed class Metadata : IList<Metadata.Entry> { /// <summary> + /// All binary headers should have this suffix. + /// </summary> + public const string BinaryHeaderSuffix = "-bin"; + + /// <summary> /// An read-only instance of metadata containing no entries. /// </summary> public static readonly Metadata Empty = new Metadata().Freeze(); @@ -114,6 +119,16 @@ namespace Grpc.Core entries.Add(item); } + public void Add(string key, string value) + { + Add(new Entry(key, value)); + } + + public void Add(string key, byte[] valueBytes) + { + Add(new Entry(key, valueBytes)); + } + public void Clear() { CheckWriteable(); @@ -171,23 +186,49 @@ namespace Grpc.Core private static readonly Encoding Encoding = Encoding.ASCII; readonly string key; - string value; - byte[] valueBytes; + readonly string value; + readonly byte[] valueBytes; + private Entry(string key, string value, byte[] valueBytes) + { + this.key = key; + this.value = value; + this.valueBytes = valueBytes; + } + + /// <summary> + /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct with a binary value. + /// </summary> + /// <param name="key">Metadata key, needs to have suffix indicating a binary valued metadata entry.</param> + /// <param name="valueBytes">Value bytes.</param> public Entry(string key, byte[] valueBytes) { - this.key = Preconditions.CheckNotNull(key); + this.key = NormalizeKey(key); + Preconditions.CheckArgument(this.key.EndsWith(BinaryHeaderSuffix), + "Key for binary valued metadata entry needs to have suffix indicating binary value."); this.value = null; - this.valueBytes = Preconditions.CheckNotNull(valueBytes); + Preconditions.CheckNotNull(valueBytes, "valueBytes"); + this.valueBytes = new byte[valueBytes.Length]; + Buffer.BlockCopy(valueBytes, 0, this.valueBytes, 0, valueBytes.Length); // defensive copy to guarantee immutability } + /// <summary> + /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct holding an ASCII value. + /// </summary> + /// <param name="key">Metadata key, must not use suffix indicating a binary valued metadata entry.</param> + /// <param name="value">Value string. Only ASCII characters are allowed.</param> public Entry(string key, string value) { - this.key = Preconditions.CheckNotNull(key); - this.value = Preconditions.CheckNotNull(value); + this.key = NormalizeKey(key); + Preconditions.CheckArgument(!this.key.EndsWith(BinaryHeaderSuffix), + "Key for ASCII valued metadata entry cannot have suffix indicating binary value."); + this.value = Preconditions.CheckNotNull(value, "value"); this.valueBytes = null; } + /// <summary> + /// Gets the metadata entry key. + /// </summary> public string Key { get @@ -196,33 +237,86 @@ namespace Grpc.Core } } + /// <summary> + /// Gets the binary value of this metadata entry. + /// </summary> public byte[] ValueBytes { get { if (valueBytes == null) { - valueBytes = Encoding.GetBytes(value); + return Encoding.GetBytes(value); } - return valueBytes; + + // defensive copy to guarantee immutability + var bytes = new byte[valueBytes.Length]; + Buffer.BlockCopy(valueBytes, 0, bytes, 0, valueBytes.Length); + return bytes; } } + /// <summary> + /// Gets the string value of this metadata entry. + /// </summary> public string Value { get { - if (value == null) - { - value = Encoding.GetString(valueBytes); - } - return value; + Preconditions.CheckState(!IsBinary, "Cannot access string value of a binary metadata entry"); + return value ?? Encoding.GetString(valueBytes); } } - + + /// <summary> + /// Returns <c>true</c> if this entry is a binary-value entry. + /// </summary> + public bool IsBinary + { + get + { + return value == null; + } + } + + /// <summary> + /// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Metadata+Entry"/>. + /// </summary> public override string ToString() { - return string.Format("[Entry: key={0}, value={1}]", Key, Value); + if (IsBinary) + { + return string.Format("[Entry: key={0}, valueBytes={1}]", key, valueBytes); + } + + return string.Format("[Entry: key={0}, value={1}]", key, value); + } + + /// <summary> + /// Gets the serialized value for this entry. For binary metadata entries, this leaks + /// the internal <c>valueBytes</c> byte array and caller must not change contents of it. + /// </summary> + internal byte[] GetSerializedValueUnsafe() + { + return valueBytes ?? Encoding.GetBytes(value); + } + + /// <summary> + /// Creates a binary value or ascii value metadata entry from data received from the native layer. + /// We trust C core to give us well-formed data, so we don't perform any checks or defensive copying. + /// </summary> + internal static Entry CreateUnsafe(string key, byte[] valueBytes) + { + if (key.EndsWith(BinaryHeaderSuffix)) + { + return new Entry(key, null, valueBytes); + } + return new Entry(key, Encoding.GetString(valueBytes), null); + } + + private static string NormalizeKey(string key) + { + return Preconditions.CheckNotNull(key, "key").ToLower(); } } } diff --git a/src/csharp/Grpc.Core/Method.cs b/src/csharp/Grpc.Core/Method.cs index cc047ac9f8..4c53285893 100644 --- a/src/csharp/Grpc.Core/Method.cs +++ b/src/csharp/Grpc.Core/Method.cs @@ -41,16 +41,50 @@ namespace Grpc.Core /// </summary> public enum MethodType { - Unary, // Unary request, unary response. - ClientStreaming, // Streaming request, unary response. - ServerStreaming, // Unary request, streaming response. - DuplexStreaming // Streaming request, streaming response. + /// <summary>Single request sent from client, single response received from server.</summary> + Unary, + + /// <summary>Stream of request sent from client, single response received from server.</summary> + ClientStreaming, + + /// <summary>Single request sent from client, stream of responses received from server.</summary> + ServerStreaming, + + /// <summary>Both server and client can stream arbitrary number of requests and responses simultaneously.</summary> + DuplexStreaming } /// <summary> - /// A description of a service method. + /// A non-generic representation of a remote method. /// </summary> - public class Method<TRequest, TResponse> + public interface IMethod + { + /// <summary> + /// Gets the type of the method. + /// </summary> + MethodType Type { get; } + + /// <summary> + /// Gets the name of the service to which this method belongs. + /// </summary> + string ServiceName { get; } + + /// <summary> + /// Gets the unqualified name of the method. + /// </summary> + string Name { get; } + + /// <summary> + /// Gets the fully qualified name of the method. On the server side, methods are dispatched + /// based on this name. + /// </summary> + string FullName { get; } + } + + /// <summary> + /// A description of a remote method. + /// </summary> + public class Method<TRequest, TResponse> : IMethod { readonly MethodType type; readonly string serviceName; @@ -59,16 +93,27 @@ namespace Grpc.Core readonly Marshaller<TResponse> responseMarshaller; readonly string fullName; + /// <summary> + /// Initializes a new instance of the <c>Method</c> class. + /// </summary> + /// <param name="type">Type of method.</param> + /// <param name="serviceName">Name of service this method belongs to.</param> + /// <param name="name">Unqualified name of the method.</param> + /// <param name="requestMarshaller">Marshaller used for request messages.</param> + /// <param name="responseMarshaller">Marshaller used for response messages.</param> public Method(MethodType type, string serviceName, string name, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller) { this.type = type; - this.serviceName = Preconditions.CheckNotNull(serviceName); - this.name = Preconditions.CheckNotNull(name); - this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller); - this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller); - this.fullName = GetFullName(serviceName); + this.serviceName = Preconditions.CheckNotNull(serviceName, "serviceName"); + this.name = Preconditions.CheckNotNull(name, "name"); + this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller, "requestMarshaller"); + this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller, "responseMarshaller"); + this.fullName = GetFullName(serviceName, name); } + /// <summary> + /// Gets the type of the method. + /// </summary> public MethodType Type { get @@ -77,6 +122,9 @@ namespace Grpc.Core } } + /// <summary> + /// Gets the name of the service to which this method belongs. + /// </summary> public string ServiceName { get @@ -85,6 +133,9 @@ namespace Grpc.Core } } + /// <summary> + /// Gets the unqualified name of the method. + /// </summary> public string Name { get @@ -93,6 +144,9 @@ namespace Grpc.Core } } + /// <summary> + /// Gets the marshaller used for request messages. + /// </summary> public Marshaller<TRequest> RequestMarshaller { get @@ -101,6 +155,9 @@ namespace Grpc.Core } } + /// <summary> + /// Gets the marshaller used for response messages. + /// </summary> public Marshaller<TResponse> ResponseMarshaller { get @@ -108,7 +165,11 @@ namespace Grpc.Core return this.responseMarshaller; } } - + + /// <summary> + /// Gets the fully qualified name of the method. On the server side, methods are dispatched + /// based on this name. + /// </summary> public string FullName { get @@ -120,9 +181,9 @@ namespace Grpc.Core /// <summary> /// Gets full name of the method including the service name. /// </summary> - internal string GetFullName(string serviceName) + internal static string GetFullName(string serviceName, string methodName) { - return "/" + Preconditions.CheckNotNull(serviceName) + "/" + this.Name; + return "/" + serviceName + "/" + methodName; } } } diff --git a/src/csharp/Grpc.Core/RpcException.cs b/src/csharp/Grpc.Core/RpcException.cs index c58578286b..cac417e626 100644 --- a/src/csharp/Grpc.Core/RpcException.cs +++ b/src/csharp/Grpc.Core/RpcException.cs @@ -36,22 +36,34 @@ using System; namespace Grpc.Core { /// <summary> - /// Thrown when remote procedure call fails. + /// Thrown when remote procedure call fails. Every <c>RpcException</c> is associated with a resulting <see cref="Status"/> of the call. /// </summary> public class RpcException : Exception { private readonly Status status; + /// <summary> + /// Creates a new <c>RpcException</c> associated with given status. + /// </summary> + /// <param name="status">Resulting status of a call.</param> public RpcException(Status status) : base(status.ToString()) { this.status = status; } + /// <summary> + /// Creates a new <c>RpcException</c> associated with given status and message. + /// </summary> + /// <param name="status">Resulting status of a call.</param> + /// <param name="message">The exception message.</param> public RpcException(Status status, string message) : base(message) { this.status = status; } + /// <summary> + /// Resulting status of the call. + /// </summary> public Status Status { get diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index eb5b043d1c..28f1686e20 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -50,6 +50,8 @@ namespace Grpc.Core { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); + readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly ServiceDefinitionCollection serviceDefinitions; readonly ServerPortCollection ports; readonly GrpcEnvironment environment; @@ -73,7 +75,7 @@ namespace Grpc.Core { this.serviceDefinitions = new ServiceDefinitionCollection(this); this.ports = new ServerPortCollection(this); - this.environment = GrpcEnvironment.GetInstance(); + this.environment = GrpcEnvironment.AddRef(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) { @@ -106,6 +108,17 @@ namespace Grpc.Core } /// <summary> + /// To allow awaiting termination of the server. + /// </summary> + public Task ShutdownTask + { + get + { + return shutdownTcs.Task; + } + } + + /// <summary> /// Starts the server. /// </summary> public void Start() @@ -136,18 +149,9 @@ namespace Grpc.Core handle.ShutdownAndNotify(HandleServerShutdown, environment); await shutdownTcs.Task; - handle.Dispose(); - } + DisposeHandle(); - /// <summary> - /// To allow awaiting termination of the server. - /// </summary> - public Task ShutdownTask - { - get - { - return shutdownTcs.Task; - } + await Task.Run(() => GrpcEnvironment.Release()); } /// <summary> @@ -166,7 +170,22 @@ namespace Grpc.Core handle.ShutdownAndNotify(HandleServerShutdown, environment); handle.CancelAllCalls(); await shutdownTcs.Task; - handle.Dispose(); + DisposeHandle(); + } + + internal void AddCallReference(object call) + { + activeCallCounter.Increment(); + + bool success = false; + handle.DangerousAddRef(ref success); + Preconditions.CheckState(success); + } + + internal void RemoveCallReference(object call) + { + handle.DangerousRelease(); + activeCallCounter.Decrement(); } /// <summary> @@ -192,7 +211,7 @@ namespace Grpc.Core { lock (myLock) { - Preconditions.CheckNotNull(serverPort.Credentials); + Preconditions.CheckNotNull(serverPort.Credentials, "serverPort"); Preconditions.CheckState(!startRequested); var address = string.Format("{0}:{1}", serverPort.Host, serverPort.Port); int boundPort; @@ -227,6 +246,16 @@ namespace Grpc.Core } } + private void DisposeHandle() + { + var activeCallCount = activeCallCounter.Count; + if (activeCallCount > 0) + { + Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount); + } + handle.Dispose(); + } + /// <summary> /// Selects corresponding handler for given call and handles the call. /// </summary> @@ -254,7 +283,7 @@ namespace Grpc.Core { if (success) { - ServerRpcNew newRpc = ctx.GetServerRpcNew(); + ServerRpcNew newRpc = ctx.GetServerRpcNew(this); // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index 032b1390db..75d81c64f3 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -36,15 +36,16 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using Grpc.Core.Internal; + namespace Grpc.Core { /// <summary> /// Context for a server-side call. /// </summary> - public sealed class ServerCallContext + public class ServerCallContext { - // TODO(jtattermusch): expose method to send initial metadata back to client - + private readonly CallSafeHandle callHandle; private readonly string method; private readonly string host; private readonly string peer; @@ -54,15 +55,34 @@ namespace Grpc.Core private readonly Metadata responseTrailers = new Metadata(); private Status status = Status.DefaultSuccess; + private Func<Metadata, Task> writeHeadersFunc; + private IHasWriteOptions writeOptionsHolder; - public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken) + internal ServerCallContext(CallSafeHandle callHandle, string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, + Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder) { + this.callHandle = callHandle; this.method = method; this.host = host; this.peer = peer; this.deadline = deadline; this.requestHeaders = requestHeaders; this.cancellationToken = cancellationToken; + this.writeHeadersFunc = writeHeadersFunc; + this.writeOptionsHolder = writeOptionsHolder; + } + + public Task WriteResponseHeadersAsync(Metadata responseHeaders) + { + return writeHeadersFunc(responseHeaders); + } + + /// <summary> + /// Creates a propagation token to be used to propagate call context to a child call. + /// </summary> + public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null) + { + return new ContextPropagationToken(callHandle, deadline, cancellationToken, options); } /// <summary>Name of method called in this RPC.</summary> @@ -110,7 +130,7 @@ namespace Grpc.Core } } - ///<summary>Cancellation token signals when call is cancelled.</summary> + /// <summary>Cancellation token signals when call is cancelled.</summary> public CancellationToken CancellationToken { get @@ -141,5 +161,31 @@ namespace Grpc.Core status = value; } } + + /// <summary> + /// Allows setting write options for the following write. + /// For streaming response calls, this property is also exposed as on IServerStreamWriter for convenience. + /// Both properties are backed by the same underlying value. + /// </summary> + public WriteOptions WriteOptions + { + get + { + return writeOptionsHolder.WriteOptions; + } + + set + { + writeOptionsHolder.WriteOptions = value; + } + } + } + + /// <summary> + /// Allows sharing write options between ServerCallContext and other objects. + /// </summary> + public interface IHasWriteOptions + { + WriteOptions WriteOptions { get; set; } } } diff --git a/src/csharp/Grpc.Core/ServerCredentials.cs b/src/csharp/Grpc.Core/ServerCredentials.cs index c11a1ede08..3c6703d30e 100644 --- a/src/csharp/Grpc.Core/ServerCredentials.cs +++ b/src/csharp/Grpc.Core/ServerCredentials.cs @@ -91,7 +91,7 @@ namespace Grpc.Core { this.keyCertificatePairs = new List<KeyCertificatePair>(keyCertificatePairs).AsReadOnly(); Preconditions.CheckArgument(this.keyCertificatePairs.Count > 0, - "At least one KeyCertificatePair needs to be provided"); + "At least one KeyCertificatePair needs to be provided."); if (forceClientAuth) { Preconditions.CheckNotNull(rootCertificates, diff --git a/src/csharp/Grpc.Core/ServerMethods.cs b/src/csharp/Grpc.Core/ServerMethods.cs index d457770203..1f119a80ff 100644 --- a/src/csharp/Grpc.Core/ServerMethods.cs +++ b/src/csharp/Grpc.Core/ServerMethods.cs @@ -31,12 +31,8 @@ #endregion -using System; -using System.Threading; using System.Threading.Tasks; -using Grpc.Core.Internal; - namespace Grpc.Core { /// <summary> diff --git a/src/csharp/Grpc.Core/ServerPort.cs b/src/csharp/Grpc.Core/ServerPort.cs index 55e4bd0062..598404d045 100644 --- a/src/csharp/Grpc.Core/ServerPort.cs +++ b/src/csharp/Grpc.Core/ServerPort.cs @@ -62,9 +62,9 @@ namespace Grpc.Core /// <param name="credentials">credentials to use to secure this port.</param> public ServerPort(string host, int port, ServerCredentials credentials) { - this.host = Preconditions.CheckNotNull(host); + this.host = Preconditions.CheckNotNull(host, "host"); this.port = port; - this.credentials = Preconditions.CheckNotNull(credentials); + this.credentials = Preconditions.CheckNotNull(credentials, "credentials"); } /// <summary> diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs index a00d156e52..94b0a320c3 100644 --- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs +++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs @@ -79,7 +79,7 @@ namespace Grpc.Core where TRequest : class where TResponse : class { - callHandlers.Add(method.GetFullName(serviceName), ServerCalls.UnaryCall(method, handler)); + callHandlers.Add(method.FullName, ServerCalls.UnaryCall(method, handler)); return this; } @@ -89,7 +89,7 @@ namespace Grpc.Core where TRequest : class where TResponse : class { - callHandlers.Add(method.GetFullName(serviceName), ServerCalls.ClientStreamingCall(method, handler)); + callHandlers.Add(method.FullName, ServerCalls.ClientStreamingCall(method, handler)); return this; } @@ -99,7 +99,7 @@ namespace Grpc.Core where TRequest : class where TResponse : class { - callHandlers.Add(method.GetFullName(serviceName), ServerCalls.ServerStreamingCall(method, handler)); + callHandlers.Add(method.FullName, ServerCalls.ServerStreamingCall(method, handler)); return this; } @@ -109,7 +109,7 @@ namespace Grpc.Core where TRequest : class where TResponse : class { - callHandlers.Add(method.GetFullName(serviceName), ServerCalls.DuplexStreamingCall(method, handler)); + callHandlers.Add(method.FullName, ServerCalls.DuplexStreamingCall(method, handler)); return this; } diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs index 754f6cb3ca..6bd8dc820b 100644 --- a/src/csharp/Grpc.Core/Status.cs +++ b/src/csharp/Grpc.Core/Status.cs @@ -29,13 +29,12 @@ // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endregion -using System; -using System.Runtime.InteropServices; +using Grpc.Core.Utils; namespace Grpc.Core { /// <summary> - /// Represents RPC result. + /// Represents RPC result, which consists of <see cref="StatusCode"/> and an optional detail string. /// </summary> public struct Status { @@ -52,6 +51,11 @@ namespace Grpc.Core readonly StatusCode statusCode; readonly string detail; + /// <summary> + /// Creates a new instance of <c>Status</c>. + /// </summary> + /// <param name="statusCode">Status code.</param> + /// <param name="detail">Detail.</param> public Status(StatusCode statusCode, string detail) { this.statusCode = statusCode; @@ -80,6 +84,9 @@ namespace Grpc.Core } } + /// <summary> + /// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Status"/>. + /// </summary> public override string ToString() { return string.Format("Status(StatusCode={0}, Detail=\"{1}\")", statusCode, detail); diff --git a/src/csharp/Grpc.Core/StatusCode.cs b/src/csharp/Grpc.Core/StatusCode.cs index a9696fa469..90606955af 100644 --- a/src/csharp/Grpc.Core/StatusCode.cs +++ b/src/csharp/Grpc.Core/StatusCode.cs @@ -31,8 +31,6 @@ #endregion -using System; - namespace Grpc.Core { /// <summary> @@ -41,101 +39,101 @@ namespace Grpc.Core /// </summary> public enum StatusCode { - /* Not an error; returned on success */ + /// <summary>Not an error; returned on success.</summary> OK = 0, - /* The operation was cancelled (typically by the caller). */ + + /// <summary>The operation was cancelled (typically by the caller).</summary> Cancelled = 1, - /* Unknown error. An example of where this error may be returned is - if a Status value received from another address space belongs to - an error-space that is not known in this address space. Also - errors raised by APIs that do not return enough error information - may be converted to this error. */ + + /// <summary> + /// Unknown error. An example of where this error may be returned is + /// if a Status value received from another address space belongs to + /// an error-space that is not known in this address space. Also + /// errors raised by APIs that do not return enough error information + /// may be converted to this error. + /// </summary> Unknown = 2, - /* Client specified an invalid argument. Note that this differs - from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments - that are problematic regardless of the state of the system - (e.g., a malformed file name). */ + + /// <summary> + /// Client specified an invalid argument. Note that this differs + /// from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments + /// that are problematic regardless of the state of the system + /// (e.g., a malformed file name). + /// </summary> InvalidArgument = 3, - /* Deadline expired before operation could complete. For operations - that change the state of the system, this error may be returned - even if the operation has completed successfully. For example, a - successful response from a server could have been delayed long - enough for the deadline to expire. */ + + /// <summary> + /// Deadline expired before operation could complete. For operations + /// that change the state of the system, this error may be returned + /// even if the operation has completed successfully. For example, a + /// successful response from a server could have been delayed long + /// enough for the deadline to expire. + /// </summary> DeadlineExceeded = 4, - /* Some requested entity (e.g., file or directory) was not found. */ + + /// <summary>Some requested entity (e.g., file or directory) was not found.</summary> NotFound = 5, - /* Some entity that we attempted to create (e.g., file or directory) - already exists. */ + + /// <summary>Some entity that we attempted to create (e.g., file or directory) already exists.</summary> AlreadyExists = 6, - /* The caller does not have permission to execute the specified - operation. PERMISSION_DENIED must not be used for rejections - caused by exhausting some resource (use RESOURCE_EXHAUSTED - instead for those errors). PERMISSION_DENIED must not be - used if the caller can not be identified (use UNAUTHENTICATED - instead for those errors). */ + + /// <summary> + /// The caller does not have permission to execute the specified + /// operation. PERMISSION_DENIED must not be used for rejections + /// caused by exhausting some resource (use RESOURCE_EXHAUSTED + /// instead for those errors). PERMISSION_DENIED must not be + /// used if the caller can not be identified (use UNAUTHENTICATED + /// instead for those errors). + /// </summary> PermissionDenied = 7, - /* The request does not have valid authentication credentials for the - operation. */ + + /// <summary>The request does not have valid authentication credentials for the operation.</summary> Unauthenticated = 16, - /* Some resource has been exhausted, perhaps a per-user quota, or - perhaps the entire file system is out of space. */ + + /// <summary> + /// Some resource has been exhausted, perhaps a per-user quota, or + /// perhaps the entire file system is out of space. + /// </summary> ResourceExhausted = 8, - /* Operation was rejected because the system is not in a state - required for the operation's execution. For example, directory - to be deleted may be non-empty, an rmdir operation is applied to - a non-directory, etc. - - A litmus test that may help a service implementor in deciding - between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: - (a) Use UNAVAILABLE if the client can retry just the failing call. - (b) Use ABORTED if the client should retry at a higher-level - (e.g., restarting a read-modify-write sequence). - (c) Use FAILED_PRECONDITION if the client should not retry until - the system state has been explicitly fixed. E.g., if an "rmdir" - fails because the directory is non-empty, FAILED_PRECONDITION - should be returned since the client should not retry unless - they have first fixed up the directory by deleting files from it. - (d) Use FAILED_PRECONDITION if the client performs conditional - REST Get/Update/Delete on a resource and the resource on the - server does not match the condition. E.g., conflicting - read-modify-write on the same resource. */ + + /// <summary> + /// Operation was rejected because the system is not in a state + /// required for the operation's execution. For example, directory + /// to be deleted may be non-empty, an rmdir operation is applied to + /// a non-directory, etc. + /// </summary> FailedPrecondition = 9, - /* The operation was aborted, typically due to a concurrency issue - like sequencer check failures, transaction aborts, etc. - See litmus test above for deciding between FAILED_PRECONDITION, - ABORTED, and UNAVAILABLE. */ + /// <summary> + /// The operation was aborted, typically due to a concurrency issue + /// like sequencer check failures, transaction aborts, etc. + /// </summary> Aborted = 10, - /* Operation was attempted past the valid range. E.g., seeking or - reading past end of file. - - Unlike INVALID_ARGUMENT, this error indicates a problem that may - be fixed if the system state changes. For example, a 32-bit file - system will generate INVALID_ARGUMENT if asked to read at an - offset that is not in the range [0,2^32-1], but it will generate - OUT_OF_RANGE if asked to read from an offset past the current - file size. - - There is a fair bit of overlap between FAILED_PRECONDITION and - OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific - error) when it applies so that callers who are iterating through - a space can easily look for an OUT_OF_RANGE error to detect when - they are done. */ + + /// <summary> + /// Operation was attempted past the valid range. E.g., seeking or + /// reading past end of file. + /// </summary> OutOfRange = 11, - /* Operation is not implemented or not supported/enabled in this service. */ + + /// <summary>Operation is not implemented or not supported/enabled in this service.</summary> Unimplemented = 12, - /* Internal errors. Means some invariants expected by underlying - system has been broken. If you see one of these errors, - something is very broken. */ + + /// <summary> + /// Internal errors. Means some invariants expected by underlying + /// system has been broken. If you see one of these errors, + /// something is very broken. + /// </summary> Internal = 13, - /* The service is currently unavailable. This is a most likely a - transient condition and may be corrected by retrying with - a backoff. - See litmus test above for deciding between FAILED_PRECONDITION, - ABORTED, and UNAVAILABLE. */ + /// <summary> + /// The service is currently unavailable. This is a most likely a + /// transient condition and may be corrected by retrying with + /// a backoff. + /// </summary> Unavailable = 14, - /* Unrecoverable data loss or corruption. */ + + /// <summary>Unrecoverable data loss or corruption.</summary> DataLoss = 15 } } diff --git a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs index 8a748b45a8..cdf1e51026 100644 --- a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs +++ b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs @@ -33,7 +33,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; namespace Grpc.Core.Utils @@ -46,7 +45,7 @@ namespace Grpc.Core.Utils /// <summary> /// Reads the entire stream and executes an async action for each element. /// </summary> - public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction) + public static async Task ForEachAsync<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction) where T : class { while (await streamReader.MoveNext()) @@ -58,7 +57,7 @@ namespace Grpc.Core.Utils /// <summary> /// Reads the entire stream and creates a list containing all the elements read. /// </summary> - public static async Task<List<T>> ToList<T>(this IAsyncStreamReader<T> streamReader) + public static async Task<List<T>> ToListAsync<T>(this IAsyncStreamReader<T> streamReader) where T : class { var result = new List<T>(); @@ -73,7 +72,7 @@ namespace Grpc.Core.Utils /// Writes all elements from given enumerable to the stream. /// Completes the stream afterwards unless close = false. /// </summary> - public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true) + public static async Task WriteAllAsync<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true) where T : class { foreach (var element in elements) @@ -89,7 +88,7 @@ namespace Grpc.Core.Utils /// <summary> /// Writes all elements from given enumerable to the stream. /// </summary> - public static async Task WriteAll<T>(this IServerStreamWriter<T> streamWriter, IEnumerable<T> elements) + public static async Task WriteAllAsync<T>(this IServerStreamWriter<T> streamWriter, IEnumerable<T> elements) where T : class { foreach (var element in elements) diff --git a/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs index 82653c3a1f..eb3a5b16e3 100644 --- a/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs +++ b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs @@ -39,6 +39,9 @@ using System.Threading.Tasks; namespace Grpc.Core.Utils { + /// <summary> + /// Utility methods to run microbenchmarks. + /// </summary> public static class BenchmarkUtil { /// <summary> diff --git a/src/csharp/Grpc.Core/Utils/Preconditions.cs b/src/csharp/Grpc.Core/Utils/Preconditions.cs index aeb5d210a7..374262f87a 100644 --- a/src/csharp/Grpc.Core/Utils/Preconditions.cs +++ b/src/csharp/Grpc.Core/Utils/Preconditions.cs @@ -32,17 +32,16 @@ #endregion using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; -using System.Threading.Tasks; namespace Grpc.Core.Utils { + /// <summary> + /// Utility methods to simplify checking preconditions in the code. + /// </summary> public static class Preconditions { /// <summary> - /// Throws ArgumentException if condition is false. + /// Throws <see cref="ArgumentException"/> if condition is false. /// </summary> public static void CheckArgument(bool condition) { @@ -53,7 +52,7 @@ namespace Grpc.Core.Utils } /// <summary> - /// Throws ArgumentException with given message if condition is false. + /// Throws <see cref="ArgumentException"/> with given message if condition is false. /// </summary> public static void CheckArgument(bool condition, string errorMessage) { @@ -64,31 +63,31 @@ namespace Grpc.Core.Utils } /// <summary> - /// Throws NullReferenceException if reference is null. + /// Throws <see cref="ArgumentNullException"/> if reference is null. /// </summary> public static T CheckNotNull<T>(T reference) { if (reference == null) { - throw new NullReferenceException(); + throw new ArgumentNullException(); } return reference; } /// <summary> - /// Throws NullReferenceException with given message if reference is null. + /// Throws <see cref="ArgumentNullException"/> if reference is null. /// </summary> - public static T CheckNotNull<T>(T reference, string errorMessage) + public static T CheckNotNull<T>(T reference, string paramName) { if (reference == null) { - throw new NullReferenceException(errorMessage); + throw new ArgumentNullException(paramName); } return reference; } /// <summary> - /// Throws InvalidOperationException if condition is false. + /// Throws <see cref="InvalidOperationException"/> if condition is false. /// </summary> public static void CheckState(bool condition) { @@ -99,7 +98,7 @@ namespace Grpc.Core.Utils } /// <summary> - /// Throws InvalidOperationException with given message if condition is false. + /// Throws <see cref="InvalidOperationException"/> with given message if condition is false. /// </summary> public static void CheckState(bool condition, string errorMessage) { diff --git a/src/csharp/Grpc.Core/Version.cs b/src/csharp/Grpc.Core/Version.cs index b5cb652945..d02b301cac 100644 --- a/src/csharp/Grpc.Core/Version.cs +++ b/src/csharp/Grpc.Core/Version.cs @@ -1,5 +1,37 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + using System.Reflection; -using System.Runtime.CompilerServices; // The current version of gRPC C#. -[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".*")] +[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".0")] diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index 656a3d47bb..b6dbd3b49c 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -1,13 +1,46 @@ -using System.Reflection; -using System.Runtime.CompilerServices; +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion namespace Grpc.Core { + /// <summary> + /// Provides info about current version of gRPC. + /// </summary> public static class VersionInfo { /// <summary> /// Current version of gRPC /// </summary> - public const string CurrentVersion = "0.6.0"; + public const string CurrentVersion = "0.6.1"; } } diff --git a/src/csharp/Grpc.Core/Utils/ExceptionHelper.cs b/src/csharp/Grpc.Core/WriteOptions.cs index c4d6bee058..7ef3189d76 100644 --- a/src/csharp/Grpc.Core/Utils/ExceptionHelper.cs +++ b/src/csharp/Grpc.Core/WriteOptions.cs @@ -33,25 +33,50 @@ using System; -namespace Grpc.Core.Utils +namespace Grpc.Core { - public static class ExceptionHelper + /// <summary> + /// Flags for write operations. + /// </summary> + [Flags] + public enum WriteFlags { /// <summary> - /// If inner exceptions contain RpcException, rethrows it. - /// Otherwise, rethrows the original aggregate exception. - /// Always throws, the exception return type is here only to make the. + /// Hint that the write may be buffered and need not go out on the wire immediately. + /// gRPC is free to buffer the message until the next non-buffered + /// write, or until write stream completion, but it need not buffer completely or at all. /// </summary> - public static Exception UnwrapRpcException(AggregateException ae) + BufferHint = 0x1, + + /// <summary> + /// Force compression to be disabled for a particular write. + /// </summary> + NoCompress = 0x2 + } + + /// <summary> + /// Options for write operations. + /// </summary> + public class WriteOptions + { + /// <summary> + /// Default write options. + /// </summary> + public static readonly WriteOptions Default = new WriteOptions(); + + private WriteFlags flags; + + public WriteOptions(WriteFlags flags = default(WriteFlags)) + { + this.flags = flags; + } + + public WriteFlags Flags { - foreach (var e in ae.InnerExceptions) + get { - if (e is RpcException) - { - throw e; - } + return this.flags; } - throw ae; } } } |