aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-08-24 12:05:13 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-08-24 12:08:38 -0700
commitc43648f250dd6cb0f086e2366e468372a6de26ae (patch)
tree0353cb50e39de0338553b97e5fcb1276f48cf4a5 /src/csharp/Grpc.Core
parentbeac88ca56f4710e86668f2cbbd80e02e0607f9c (diff)
parent04715888e60c6195a2c1d9d6b31f7a82f0d717e2 (diff)
Merge branch 'master' of github.com:grpc/grpc into compression-accept-encoding
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r--src/csharp/Grpc.Core/AsyncClientStreamingCall.cs33
-rw-r--r--src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs16
-rw-r--r--src/csharp/Grpc.Core/AsyncServerStreamingCall.cs16
-rw-r--r--src/csharp/Grpc.Core/AsyncUnaryCall.cs15
-rw-r--r--src/csharp/Grpc.Core/CallInvocationDetails.cs72
-rw-r--r--src/csharp/Grpc.Core/CallOptions.cs109
-rw-r--r--src/csharp/Grpc.Core/Calls.cs54
-rw-r--r--src/csharp/Grpc.Core/Channel.cs75
-rw-r--r--src/csharp/Grpc.Core/ChannelOptions.cs8
-rw-r--r--src/csharp/Grpc.Core/ClientBase.cs57
-rw-r--r--src/csharp/Grpc.Core/CompressionLevel.cs (renamed from src/csharp/Grpc.Core/OperationFailedException.cs)26
-rw-r--r--src/csharp/Grpc.Core/ContextPropagationToken.cs170
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj6
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs76
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamReader.cs2
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamWriter.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs200
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs76
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs58
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs16
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs82
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs13
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs23
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/DebugStats.cs14
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs3
-rw-r--r--src/csharp/Grpc.Core/Internal/INativeCall.cs85
-rw-r--r--src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs27
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs31
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/Timespec.cs2
-rw-r--r--src/csharp/Grpc.Core/KeyCertificatePair.cs4
-rw-r--r--src/csharp/Grpc.Core/Logging/ConsoleLogger.cs27
-rw-r--r--src/csharp/Grpc.Core/Logging/ILogger.cs6
-rw-r--r--src/csharp/Grpc.Core/Marshaller.cs23
-rw-r--r--src/csharp/Grpc.Core/Metadata.cs124
-rw-r--r--src/csharp/Grpc.Core/Method.cs89
-rw-r--r--src/csharp/Grpc.Core/RpcException.cs14
-rw-r--r--src/csharp/Grpc.Core/Server.cs59
-rw-r--r--src/csharp/Grpc.Core/ServerCallContext.cs56
-rw-r--r--src/csharp/Grpc.Core/ServerCredentials.cs2
-rw-r--r--src/csharp/Grpc.Core/ServerMethods.cs4
-rw-r--r--src/csharp/Grpc.Core/ServerPort.cs4
-rw-r--r--src/csharp/Grpc.Core/ServerServiceDefinition.cs8
-rw-r--r--src/csharp/Grpc.Core/Status.cs13
-rw-r--r--src/csharp/Grpc.Core/StatusCode.cs156
-rw-r--r--src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs9
-rw-r--r--src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs3
-rw-r--r--src/csharp/Grpc.Core/Utils/Preconditions.cs25
-rw-r--r--src/csharp/Grpc.Core/Version.cs36
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs39
-rw-r--r--src/csharp/Grpc.Core/WriteOptions.cs (renamed from src/csharp/Grpc.Core/Utils/ExceptionHelper.cs)49
53 files changed, 1640 insertions, 500 deletions
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
index bf020cd627..dbaa3085c5 100644
--- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
@@ -44,14 +44,16 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> responseAsync;
+ readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.requestStream = requestStream;
this.responseAsync = responseAsync;
+ this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@@ -69,6 +71,17 @@ namespace Grpc.Core
}
/// <summary>
+ /// Asynchronous access to response headers.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return this.responseHeadersAsync;
+ }
+ }
+
+ /// <summary>
/// Async stream to send streaming requests.
/// </summary>
public IClientStreamWriter<TRequest> RequestStream
@@ -89,6 +102,24 @@ namespace Grpc.Core
}
/// <summary>
+ /// Gets the call status if the call has already finished.
+ /// Throws InvalidOperationException otherwise.
+ /// </summary>
+ public Status GetStatus()
+ {
+ return getStatusFunc();
+ }
+
+ /// <summary>
+ /// Gets the call trailing metadata if the call has already finished.
+ /// Throws InvalidOperationException otherwise.
+ /// </summary>
+ public Metadata GetTrailers()
+ {
+ return getTrailersFunc();
+ }
+
+ /// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything.
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index 0979de606f..ee7ba29695 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -32,7 +32,6 @@
#endregion
using System;
-using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Grpc.Core
@@ -44,14 +43,16 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
+ readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
+ this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@@ -80,6 +81,17 @@ namespace Grpc.Core
}
/// <summary>
+ /// Asynchronous access to response headers.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return this.responseHeadersAsync;
+ }
+ }
+
+ /// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index 380efcdb0e..2853a79ce6 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -32,7 +32,6 @@
#endregion
using System;
-using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Grpc.Core
@@ -43,13 +42,15 @@ namespace Grpc.Core
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
+ readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.responseStream = responseStream;
+ this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@@ -67,6 +68,17 @@ namespace Grpc.Core
}
/// <summary>
+ /// Asynchronous access to response headers.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return this.responseHeadersAsync;
+ }
+ }
+
+ /// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
index 224e343916..154a17a33e 100644
--- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs
+++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
@@ -43,13 +43,15 @@ namespace Grpc.Core
public sealed class AsyncUnaryCall<TResponse> : IDisposable
{
readonly Task<TResponse> responseAsync;
+ readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- public AsyncUnaryCall(Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ public AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.responseAsync = responseAsync;
+ this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@@ -67,6 +69,17 @@ namespace Grpc.Core
}
/// <summary>
+ /// Asynchronous access to response headers.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return this.responseHeadersAsync;
+ }
+ }
+
+ /// <summary>
/// Allows awaiting this object directly.
/// </summary>
public TaskAwaiter<TResponse> GetAwaiter()
diff --git a/src/csharp/Grpc.Core/CallInvocationDetails.cs b/src/csharp/Grpc.Core/CallInvocationDetails.cs
index eb23a3a209..6565073fc5 100644
--- a/src/csharp/Grpc.Core/CallInvocationDetails.cs
+++ b/src/csharp/Grpc.Core/CallInvocationDetails.cs
@@ -40,30 +40,60 @@ namespace Grpc.Core
/// <summary>
/// Details about a client-side call to be invoked.
/// </summary>
- public class CallInvocationDetails<TRequest, TResponse>
+ public struct CallInvocationDetails<TRequest, TResponse>
{
readonly Channel channel;
readonly string method;
readonly string host;
readonly Marshaller<TRequest> requestMarshaller;
readonly Marshaller<TResponse> responseMarshaller;
- readonly CallOptions options;
+ CallOptions options;
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Grpc.Core.CallInvocationDetails`2"/> struct.
+ /// </summary>
+ /// <param name="channel">Channel to use for this call.</param>
+ /// <param name="method">Method to call.</param>
+ /// <param name="options">Call options.</param>
public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, CallOptions options) :
- this(channel, method.FullName, null, method.RequestMarshaller, method.ResponseMarshaller, options)
+ this(channel, method, null, options)
{
}
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Grpc.Core.CallInvocationDetails`2"/> struct.
+ /// </summary>
+ /// <param name="channel">Channel to use for this call.</param>
+ /// <param name="method">Method to call.</param>
+ /// <param name="host">Host that contains the method. if <c>null</c>, default host will be used.</param>
+ /// <param name="options">Call options.</param>
+ public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, string host, CallOptions options) :
+ this(channel, method.FullName, host, method.RequestMarshaller, method.ResponseMarshaller, options)
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Grpc.Core.CallInvocationDetails`2"/> struct.
+ /// </summary>
+ /// <param name="channel">Channel to use for this call.</param>
+ /// <param name="method">Qualified method name.</param>
+ /// <param name="host">Host that contains the method.</param>
+ /// <param name="requestMarshaller">Request marshaller.</param>
+ /// <param name="responseMarshaller">Response marshaller.</param>
+ /// <param name="options">Call options.</param>
public CallInvocationDetails(Channel channel, string method, string host, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller, CallOptions options)
{
- this.channel = Preconditions.CheckNotNull(channel);
- this.method = Preconditions.CheckNotNull(method);
+ this.channel = Preconditions.CheckNotNull(channel, "channel");
+ this.method = Preconditions.CheckNotNull(method, "method");
this.host = host;
- this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller);
- this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller);
- this.options = Preconditions.CheckNotNull(options);
+ this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller, "requestMarshaller");
+ this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller, "responseMarshaller");
+ this.options = options;
}
+ /// <summary>
+ /// Get channel associated with this call.
+ /// </summary>
public Channel Channel
{
get
@@ -72,6 +102,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets name of method to be called.
+ /// </summary>
public string Method
{
get
@@ -80,6 +113,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Get name of host.
+ /// </summary>
public string Host
{
get
@@ -88,6 +124,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets marshaller used to serialize requests.
+ /// </summary>
public Marshaller<TRequest> RequestMarshaller
{
get
@@ -96,6 +135,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets marshaller used to deserialized responses.
+ /// </summary>
public Marshaller<TResponse> ResponseMarshaller
{
get
@@ -104,6 +146,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets the call options.
+ /// </summary>
public CallOptions Options
{
get
@@ -111,5 +156,16 @@ namespace Grpc.Core
return options;
}
}
+
+ /// <summary>
+ /// Returns new instance of <see cref="CallInvocationDetails"/> with
+ /// <c>Options</c> set to the value provided. Values of all other fields are preserved.
+ /// </summary>
+ public CallInvocationDetails<TRequest, TResponse> WithOptions(CallOptions options)
+ {
+ var newDetails = this;
+ newDetails.options = options;
+ return newDetails;
+ }
}
}
diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs
index 8e9739335f..3dfe80b48c 100644
--- a/src/csharp/Grpc.Core/CallOptions.cs
+++ b/src/csharp/Grpc.Core/CallOptions.cs
@@ -42,24 +42,30 @@ namespace Grpc.Core
/// <summary>
/// Options for calls made by client.
/// </summary>
- public class CallOptions
+ public struct CallOptions
{
- readonly Metadata headers;
- readonly DateTime deadline;
- readonly CancellationToken cancellationToken;
+ Metadata headers;
+ DateTime? deadline;
+ CancellationToken cancellationToken;
+ WriteOptions writeOptions;
+ ContextPropagationToken propagationToken;
/// <summary>
- /// Creates a new instance of <c>CallOptions</c>.
+ /// Creates a new instance of <c>CallOptions</c> struct.
/// </summary>
/// <param name="headers">Headers to be sent with the call.</param>
/// <param name="deadline">Deadline for the call to finish. null means no deadline.</param>
/// <param name="cancellationToken">Can be used to request cancellation of the call.</param>
- public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
+ /// <param name="writeOptions">Write options that will be used for this call.</param>
+ /// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param>
+ public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken),
+ WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null)
{
- // TODO(jtattermusch): consider only creating metadata object once it's really needed.
- this.headers = headers != null ? headers : new Metadata();
- this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue;
+ this.headers = headers;
+ this.deadline = deadline;
this.cancellationToken = cancellationToken;
+ this.writeOptions = writeOptions;
+ this.propagationToken = propagationToken;
}
/// <summary>
@@ -73,7 +79,7 @@ namespace Grpc.Core
/// <summary>
/// Call deadline.
/// </summary>
- public DateTime Deadline
+ public DateTime? Deadline
{
get { return deadline; }
}
@@ -85,5 +91,88 @@ namespace Grpc.Core
{
get { return cancellationToken; }
}
+
+ /// <summary>
+ /// Write options that will be used for this call.
+ /// </summary>
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+ }
+
+ /// <summary>
+ /// Token for propagating parent call context.
+ /// </summary>
+ public ContextPropagationToken PropagationToken
+ {
+ get
+ {
+ return this.propagationToken;
+ }
+ }
+
+ /// <summary>
+ /// Returns new instance of <see cref="CallOptions"/> with
+ /// <c>Headers</c> set to the value provided. Values of all other fields are preserved.
+ /// </summary>
+ public CallOptions WithHeaders(Metadata headers)
+ {
+ var newOptions = this;
+ newOptions.headers = headers;
+ return newOptions;
+ }
+
+ /// <summary>
+ /// Returns new instance of <see cref="CallOptions"/> with
+ /// <c>Deadline</c> set to the value provided. Values of all other fields are preserved.
+ /// </summary>
+ public CallOptions WithDeadline(DateTime deadline)
+ {
+ var newOptions = this;
+ newOptions.deadline = deadline;
+ return newOptions;
+ }
+
+ /// <summary>
+ /// Returns new instance of <see cref="CallOptions"/> with
+ /// <c>CancellationToken</c> set to the value provided. Values of all other fields are preserved.
+ /// </summary>
+ public CallOptions WithCancellationToken(CancellationToken cancellationToken)
+ {
+ var newOptions = this;
+ newOptions.cancellationToken = cancellationToken;
+ return newOptions;
+ }
+
+ /// <summary>
+ /// Returns a new instance of <see cref="CallOptions"/> with
+ /// all previously unset values set to their defaults and deadline and cancellation
+ /// token propagated when appropriate.
+ /// </summary>
+ internal CallOptions Normalize()
+ {
+ var newOptions = this;
+ if (propagationToken != null)
+ {
+ if (propagationToken.Options.IsPropagateDeadline)
+ {
+ Preconditions.CheckArgument(!newOptions.deadline.HasValue,
+ "Cannot propagate deadline from parent call. The deadline has already been set explicitly.");
+ newOptions.deadline = propagationToken.ParentDeadline;
+ }
+ if (propagationToken.Options.IsPropagateCancellation)
+ {
+ Preconditions.CheckArgument(!newOptions.cancellationToken.CanBeCanceled,
+ "Cannot propagate cancellation token from parent call. The cancellation token has already been set to a non-default value.");
+ }
+ }
+
+ newOptions.headers = newOptions.headers ?? Metadata.Empty;
+ newOptions.deadline = newOptions.deadline ?? DateTime.MaxValue;
+ return newOptions;
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index 00a8cabf82..e57ac89db3 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -31,8 +31,6 @@
#endregion
-using System;
-using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@@ -40,9 +38,20 @@ namespace Grpc.Core
{
/// <summary>
/// Helper methods for generated clients to make RPC calls.
+ /// Most users will use this class only indirectly and will be
+ /// making calls using client object generated from protocol
+ /// buffer definition files.
/// </summary>
public static class Calls
{
+ /// <summary>
+ /// Invokes a simple remote call in a blocking fashion.
+ /// </summary>
+ /// <returns>The response.</returns>
+ /// <param name="call">The call defintion.</param>
+ /// <param name="req">Request message.</param>
+ /// <typeparam name="TRequest">Type of request message.</typeparam>
+ /// <typeparam name="TResponse">The of response message.</typeparam>
public static TResponse BlockingUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
@@ -51,15 +60,32 @@ namespace Grpc.Core
return asyncCall.UnaryCall(req);
}
+ /// <summary>
+ /// Invokes a simple remote call asynchronously.
+ /// </summary>
+ /// <returns>An awaitable call object providing access to the response.</returns>
+ /// <param name="call">The call defintion.</param>
+ /// <param name="req">Request message.</param>
+ /// <typeparam name="TRequest">Type of request message.</typeparam>
+ /// <typeparam name="TResponse">The of response message.</typeparam>
public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
var asyncResult = asyncCall.UnaryCallAsync(req);
- return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+ return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
+ /// <summary>
+ /// Invokes a server streaming call asynchronously.
+ /// In server streaming scenario, client sends on request and server responds with a stream of responses.
+ /// </summary>
+ /// <returns>A call object providing access to the asynchronous response stream.</returns>
+ /// <param name="call">The call defintion.</param>
+ /// <param name="req">Request message.</param>
+ /// <typeparam name="TRequest">Type of request message.</typeparam>
+ /// <typeparam name="TResponse">The of response messages.</typeparam>
public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req)
where TRequest : class
where TResponse : class
@@ -67,9 +93,16 @@ namespace Grpc.Core
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
asyncCall.StartServerStreamingCall(req);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
- return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+ return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
+ /// <summary>
+ /// Invokes a client streaming call asynchronously.
+ /// In client streaming scenario, client sends a stream of requests and server responds with a single response.
+ /// </summary>
+ /// <returns>An awaitable call object providing access to the response.</returns>
+ /// <typeparam name="TRequest">Type of request messages.</typeparam>
+ /// <typeparam name="TResponse">The of response message.</typeparam>
public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call)
where TRequest : class
where TResponse : class
@@ -77,9 +110,18 @@ namespace Grpc.Core
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
- return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+ return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
+ /// <summary>
+ /// Invokes a duplex streaming call asynchronously.
+ /// In duplex streaming scenario, client sends a stream of requests and server responds with a stream of responses.
+ /// The response stream is completely independent and both side can be sending messages at the same time.
+ /// </summary>
+ /// <returns>A call object providing access to the asynchronous request and response streams.</returns>
+ /// <param name="call">The call definition.</param>
+ /// <typeparam name="TRequest">Type of request messages.</typeparam>
+ /// <typeparam name="TResponse">Type of reponse messages.</typeparam>
public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call)
where TRequest : class
where TResponse : class
@@ -88,7 +130,7 @@ namespace Grpc.Core
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
- return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+ return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
}
}
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 9273ea4582..c11b320a64 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -45,26 +45,31 @@ namespace Grpc.Core
/// <summary>
/// gRPC Channel
/// </summary>
- public class Channel : IDisposable
+ public class Channel
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
+ readonly object myLock = new object();
+ readonly AtomicCounter activeCallCounter = new AtomicCounter();
+
+ readonly string target;
readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly List<ChannelOption> options;
- bool disposed;
+
+ bool shutdownRequested;
/// <summary>
/// Creates a channel that connects to a specific host.
/// Port will default to 80 for an unsecure channel and to 443 for a secure channel.
/// </summary>
- /// <param name="host">The name or IP address of the host.</param>
+ /// <param name="target">Target of the channel.</param>
/// <param name="credentials">Credentials to secure the channel.</param>
/// <param name="options">Channel options.</param>
- public Channel(string host, Credentials credentials, IEnumerable<ChannelOption> options = null)
+ public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null)
{
- Preconditions.CheckNotNull(host);
- this.environment = GrpcEnvironment.GetInstance();
+ this.target = Preconditions.CheckNotNull(target, "target");
+ this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
EnsureUserAgentChannelOption(this.options);
@@ -73,11 +78,11 @@ namespace Grpc.Core
{
if (nativeCredentials != null)
{
- this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, host, nativeChannelArgs);
+ this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs);
}
else
{
- this.handle = ChannelSafeHandle.CreateInsecure(host, nativeChannelArgs);
+ this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs);
}
}
}
@@ -131,8 +136,8 @@ namespace Grpc.Core
return tcs.Task;
}
- /// <summary> Address of the remote endpoint in URI format.</summary>
- public string Target
+ /// <summary>Resolved address of the remote endpoint in URI format.</summary>
+ public string ResolvedTarget
{
get
{
@@ -140,6 +145,15 @@ namespace Grpc.Core
}
}
+ /// <summary>The original target used to create the channel.</summary>
+ public string Target
+ {
+ get
+ {
+ return this.target;
+ }
+ }
+
/// <summary>
/// Allows explicitly requesting channel to connect without starting an RPC.
/// Returned task completes once state Ready was seen. If the deadline is reached,
@@ -162,12 +176,26 @@ namespace Grpc.Core
}
/// <summary>
- /// Destroys the underlying channel.
+ /// Waits until there are no more active calls for this channel and then cleans up
+ /// resources used by this channel.
/// </summary>
- public void Dispose()
+ public async Task ShutdownAsync()
{
- Dispose(true);
- GC.SuppressFinalize(this);
+ lock (myLock)
+ {
+ Preconditions.CheckState(!shutdownRequested);
+ shutdownRequested = true;
+ }
+
+ var activeCallCount = activeCallCounter.Count;
+ if (activeCallCount > 0)
+ {
+ Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount);
+ }
+
+ handle.Dispose();
+
+ await Task.Run(() => GrpcEnvironment.Release());
}
internal ChannelSafeHandle Handle
@@ -186,13 +214,20 @@ namespace Grpc.Core
}
}
- protected virtual void Dispose(bool disposing)
+ internal void AddCallReference(object call)
{
- if (disposing && handle != null && !disposed)
- {
- disposed = true;
- handle.Dispose();
- }
+ activeCallCounter.Increment();
+
+ bool success = false;
+ handle.DangerousAddRef(ref success);
+ Preconditions.CheckState(success);
+ }
+
+ internal void RemoveCallReference(object call)
+ {
+ handle.DangerousRelease();
+
+ activeCallCounter.Decrement();
}
private static void EnsureUserAgentChannelOption(List<ChannelOption> options)
diff --git a/src/csharp/Grpc.Core/ChannelOptions.cs b/src/csharp/Grpc.Core/ChannelOptions.cs
index 1e0f90287a..ad54b46ad5 100644
--- a/src/csharp/Grpc.Core/ChannelOptions.cs
+++ b/src/csharp/Grpc.Core/ChannelOptions.cs
@@ -63,19 +63,19 @@ namespace Grpc.Core
public ChannelOption(string name, string stringValue)
{
this.type = OptionType.String;
- this.name = Preconditions.CheckNotNull(name);
- this.stringValue = Preconditions.CheckNotNull(stringValue);
+ this.name = Preconditions.CheckNotNull(name, "name");
+ this.stringValue = Preconditions.CheckNotNull(stringValue, "stringValue");
}
/// <summary>
/// Creates a channel option with an integer value.
/// </summary>
/// <param name="name">Name.</param>
- /// <param name="stringValue">String value.</param>
+ /// <param name="intValue">Integer value.</param>
public ChannelOption(string name, int intValue)
{
this.type = OptionType.Integer;
- this.name = Preconditions.CheckNotNull(name);
+ this.name = Preconditions.CheckNotNull(name, "name");
this.intValue = intValue;
}
diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs
index 88494bb4ac..903449439b 100644
--- a/src/csharp/Grpc.Core/ClientBase.cs
+++ b/src/csharp/Grpc.Core/ClientBase.cs
@@ -32,31 +32,50 @@
#endregion
using System;
-using System.Collections.Generic;
-
-using Grpc.Core.Internal;
+using System.Text.RegularExpressions;
+using System.Threading.Tasks;
namespace Grpc.Core
{
- public delegate void MetadataInterceptorDelegate(Metadata metadata);
+ /// <summary>
+ /// Interceptor for call headers.
+ /// </summary>
+ public delegate void HeaderInterceptor(IMethod method, string authUri, Metadata metadata);
/// <summary>
/// Base class for client-side stubs.
/// </summary>
public abstract class ClientBase
{
+ // Regex for removal of the optional DNS scheme, trailing port, and trailing backslash
+ static readonly Regex ChannelTargetPattern = new Regex(@"^(dns:\/{3})?([^:\/]+)(:\d+)?\/?$");
+
readonly Channel channel;
+ readonly string authUriBase;
public ClientBase(Channel channel)
{
this.channel = channel;
+ this.authUriBase = GetAuthUriBase(channel.Target);
+ }
+
+ /// <summary>
+ /// Can be used to register a custom header (request metadata) interceptor.
+ /// The interceptor is invoked each time a new call on this client is started.
+ /// </summary>
+ public HeaderInterceptor HeaderInterceptor
+ {
+ get;
+ set;
}
/// <summary>
- /// Can be used to register a custom header (initial metadata) interceptor.
- /// The delegate each time before a new call on this client is started.
+ /// gRPC supports multiple "hosts" being served by a single server.
+ /// This property can be used to set the target host explicitly.
+ /// By default, this will be set to <c>null</c> with the meaning
+ /// "use default host".
/// </summary>
- public MetadataInterceptorDelegate HeaderInterceptor
+ public string Host
{
get;
set;
@@ -83,10 +102,28 @@ namespace Grpc.Core
var interceptor = HeaderInterceptor;
if (interceptor != null)
{
- interceptor(options.Headers);
- options.Headers.Freeze();
+ if (options.Headers == null)
+ {
+ options = options.WithHeaders(new Metadata());
+ }
+ var authUri = authUriBase != null ? authUriBase + method.ServiceName : null;
+ interceptor(method, authUri, options.Headers);
+ }
+ return new CallInvocationDetails<TRequest, TResponse>(channel, method, Host, options);
+ }
+
+ /// <summary>
+ /// Creates Auth URI base from channel's target (the one passed at channel creation).
+ /// Fully-qualified service name is to be appended to this.
+ /// </summary>
+ internal static string GetAuthUriBase(string target)
+ {
+ var match = ChannelTargetPattern.Match(target);
+ if (!match.Success)
+ {
+ return null;
}
- return new CallInvocationDetails<TRequest, TResponse>(channel, method, options);
+ return "https://" + match.Groups[2].Value + "/";
}
}
}
diff --git a/src/csharp/Grpc.Core/OperationFailedException.cs b/src/csharp/Grpc.Core/CompressionLevel.cs
index 9b1c24d0c1..399652b85e 100644
--- a/src/csharp/Grpc.Core/OperationFailedException.cs
+++ b/src/csharp/Grpc.Core/CompressionLevel.cs
@@ -36,12 +36,28 @@ using System;
namespace Grpc.Core
{
/// <summary>
- /// Thrown when gRPC operation fails.
+ /// Compression level based on grpc_compression_level from grpc/compression.h
/// </summary>
- public class OperationFailedException : Exception
+ public enum CompressionLevel
{
- public OperationFailedException(string message) : base(message)
- {
- }
+ /// <summary>
+ /// No compression.
+ /// </summary>
+ None = 0,
+
+ /// <summary>
+ /// Low compression.
+ /// </summary>
+ Low,
+
+ /// <summary>
+ /// Medium compression.
+ /// </summary>
+ Medium,
+
+ /// <summary>
+ /// High compression.
+ /// </summary>
+ High,
}
}
diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs
new file mode 100644
index 0000000000..a5bf1b5a70
--- /dev/null
+++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs
@@ -0,0 +1,170 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading;
+
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Token for propagating context of server side handlers to child calls.
+ /// In situations when a backend is making calls to another backend,
+ /// it makes sense to propagate properties like deadline and cancellation
+ /// token of the server call to the child call.
+ /// C core provides some other contexts (like tracing context) that
+ /// are not accessible to C# layer, but this token still allows propagating them.
+ /// </summary>
+ public class ContextPropagationToken
+ {
+ /// <summary>
+ /// Default propagation mask used by C core.
+ /// </summary>
+ private const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
+
+ /// <summary>
+ /// Default propagation mask used by C# - we want to propagate deadline
+ /// and cancellation token by our own means.
+ /// </summary>
+ internal const ContextPropagationFlags DefaultMask = DefaultCoreMask
+ & ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation;
+
+ readonly CallSafeHandle parentCall;
+ readonly DateTime deadline;
+ readonly CancellationToken cancellationToken;
+ readonly ContextPropagationOptions options;
+
+ internal ContextPropagationToken(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options)
+ {
+ this.parentCall = Preconditions.CheckNotNull(parentCall);
+ this.deadline = deadline;
+ this.cancellationToken = cancellationToken;
+ this.options = options ?? ContextPropagationOptions.Default;
+ }
+
+ /// <summary>
+ /// Gets the native handle of the parent call.
+ /// </summary>
+ internal CallSafeHandle ParentCall
+ {
+ get
+ {
+ return this.parentCall;
+ }
+ }
+
+ /// <summary>
+ /// Gets the parent call's deadline.
+ /// </summary>
+ internal DateTime ParentDeadline
+ {
+ get
+ {
+ return this.deadline;
+ }
+ }
+
+ /// <summary>
+ /// Gets the parent call's cancellation token.
+ /// </summary>
+ internal CancellationToken ParentCancellationToken
+ {
+ get
+ {
+ return this.cancellationToken;
+ }
+ }
+
+ /// <summary>
+ /// Get the context propagation options.
+ /// </summary>
+ internal ContextPropagationOptions Options
+ {
+ get
+ {
+ return this.options;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Options for <see cref="ContextPropagationToken"/>.
+ /// </summary>
+ public class ContextPropagationOptions
+ {
+ /// <summary>
+ /// The context propagation options that will be used by default.
+ /// </summary>
+ public static readonly ContextPropagationOptions Default = new ContextPropagationOptions();
+
+ bool propagateDeadline;
+ bool propagateCancellation;
+
+ /// <summary>
+ /// Creates new context propagation options.
+ /// </summary>
+ /// <param name="propagateDeadline">If set to <c>true</c> parent call's deadline will be propagated to the child call.</param>
+ /// <param name="propagateCancellation">If set to <c>true</c> parent call's cancellation token will be propagated to the child call.</param>
+ public ContextPropagationOptions(bool propagateDeadline = true, bool propagateCancellation = true)
+ {
+ this.propagateDeadline = propagateDeadline;
+ this.propagateCancellation = propagateCancellation;
+ }
+
+ /// <value><c>true</c> if parent call's deadline should be propagated to the child call.</value>
+ public bool IsPropagateDeadline
+ {
+ get { return this.propagateDeadline; }
+ }
+
+ /// <value><c>true</c> if parent call's cancellation token should be propagated to the child call.</value>
+ public bool IsPropagateCancellation
+ {
+ get { return this.propagateCancellation; }
+ }
+ }
+
+ /// <summary>
+ /// Context propagation flags from grpc/grpc.h.
+ /// </summary>
+ [Flags]
+ internal enum ContextPropagationFlags
+ {
+ Deadline = 1,
+ CensusStatsContext = 2,
+ CensusTracingContext = 4,
+ Cancellation = 8
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 52defd1965..ad2af17bc7 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -49,6 +49,7 @@
<Compile Include="AsyncDuplexStreamingCall.cs" />
<Compile Include="AsyncServerStreamingCall.cs" />
<Compile Include="IClientStreamWriter.cs" />
+ <Compile Include="Internal\INativeCall.cs" />
<Compile Include="IServerStreamWriter.cs" />
<Compile Include="IAsyncStreamWriter.cs" />
<Compile Include="IAsyncStreamReader.cs" />
@@ -77,14 +78,12 @@
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\AsyncStreamExtensions.cs" />
<Compile Include="Utils\BenchmarkUtil.cs" />
- <Compile Include="Utils\ExceptionHelper.cs" />
<Compile Include="Internal\CredentialsSafeHandle.cs" />
<Compile Include="Credentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
<Compile Include="Internal\AsyncCompletion.cs" />
<Compile Include="Internal\AsyncCallBase.cs" />
<Compile Include="Internal\AsyncCallServer.cs" />
- <Compile Include="OperationFailedException.cs" />
<Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Utils\Preconditions.cs" />
<Compile Include="Internal\ServerCredentialsSafeHandle.cs" />
@@ -115,6 +114,9 @@
<Compile Include="ChannelState.cs" />
<Compile Include="CallInvocationDetails.cs" />
<Compile Include="CallOptions.cs" />
+ <Compile Include="CompressionLevel.cs" />
+ <Compile Include="WriteOptions.cs" />
+ <Compile Include="ContextPropagationToken.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 034a66be3c..e7c04185c2 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -53,8 +53,12 @@ namespace Grpc.Core
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_shutdown();
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern IntPtr grpcsharp_version_string(); // returns not-owned const char*
+
static object staticLock = new object();
static GrpcEnvironment instance;
+ static int refCount;
static ILogger logger = new ConsoleLogger();
@@ -64,13 +68,14 @@ namespace Grpc.Core
bool isClosed;
/// <summary>
- /// Returns an instance of initialized gRPC environment.
- /// Subsequent invocations return the same instance unless Shutdown has been called first.
+ /// Returns a reference-counted instance of initialized gRPC environment.
+ /// Subsequent invocations return the same instance unless reference count has dropped to zero previously.
/// </summary>
- internal static GrpcEnvironment GetInstance()
+ internal static GrpcEnvironment AddRef()
{
lock (staticLock)
{
+ refCount++;
if (instance == null)
{
instance = new GrpcEnvironment();
@@ -80,14 +85,16 @@ namespace Grpc.Core
}
/// <summary>
- /// Shuts down the gRPC environment if it was initialized before.
- /// Blocks until the environment has been fully shutdown.
+ /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero.
+ /// (and blocks until the environment has been fully shutdown).
/// </summary>
- public static void Shutdown()
+ internal static void Release()
{
lock (staticLock)
{
- if (instance != null)
+ Preconditions.CheckState(refCount > 0);
+ refCount--;
+ if (refCount == 0)
{
instance.Close();
instance = null;
@@ -95,6 +102,14 @@ namespace Grpc.Core
}
}
+ internal static int GetRefCount()
+ {
+ lock (staticLock)
+ {
+ return refCount;
+ }
+ }
+
/// <summary>
/// Gets application-wide logger used by gRPC.
/// </summary>
@@ -112,7 +127,7 @@ namespace Grpc.Core
/// </summary>
public static void SetLogger(ILogger customLogger)
{
- Preconditions.CheckNotNull(customLogger);
+ Preconditions.CheckNotNull(customLogger, "customLogger");
logger = customLogger;
}
@@ -122,12 +137,10 @@ namespace Grpc.Core
private GrpcEnvironment()
{
NativeLogRedirector.Redirect();
- grpcsharp_init();
+ GrpcNativeInit();
completionRegistry = new CompletionRegistry(this);
threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE);
threadPool.Start();
- // TODO: use proper logging here
- Logger.Info("gRPC initialized.");
}
/// <summary>
@@ -164,6 +177,25 @@ namespace Grpc.Core
}
/// <summary>
+ /// Gets version of gRPC C core.
+ /// </summary>
+ internal static string GetCoreVersionString()
+ {
+ var ptr = grpcsharp_version_string(); // the pointer is not owned
+ return Marshal.PtrToStringAnsi(ptr);
+ }
+
+ internal static void GrpcNativeInit()
+ {
+ grpcsharp_init();
+ }
+
+ internal static void GrpcNativeShutdown()
+ {
+ grpcsharp_shutdown();
+ }
+
+ /// <summary>
/// Shuts down this environment.
/// </summary>
private void Close()
@@ -173,30 +205,10 @@ namespace Grpc.Core
throw new InvalidOperationException("Close has already been called");
}
threadPool.Stop();
- grpcsharp_shutdown();
+ GrpcNativeShutdown();
isClosed = true;
debugStats.CheckOK();
-
- Logger.Info("gRPC shutdown.");
- }
-
- /// <summary>
- /// Shuts down this environment asynchronously.
- /// </summary>
- private Task CloseAsync()
- {
- return Task.Run(() =>
- {
- try
- {
- Close();
- }
- catch (Exception e)
- {
- Logger.Error(e, "Error occured while shutting down GrpcEnvironment.");
- }
- });
}
}
}
diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
index 371fbf27ce..c0a0674e50 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
@@ -43,7 +43,7 @@ namespace Grpc.Core
/// A stream of messages to be read.
/// </summary>
/// <typeparam name="T"></typeparam>
- public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse>
+ public interface IAsyncStreamReader<T> : IAsyncEnumerator<T>
{
// TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface.
}
diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
index 2000210252..4e2acb9c71 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
@@ -50,5 +50,13 @@ namespace Grpc.Core
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
Task WriteAsync(T message);
+
+ /// <summary>
+ /// Write options that will be used for the next write.
+ /// If null, default options will be used.
+ /// Once set, this property maintains its value across subsequent
+ /// writes.
+ /// <value>The write options.</value>
+ WriteOptions WriteOptions { get; set; }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 414b5c4282..be5d611a53 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -50,20 +50,34 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
- readonly CallInvocationDetails<TRequest, TResponse> callDetails;
+ readonly CallInvocationDetails<TRequest, TResponse> details;
+ readonly INativeCall injectedNativeCall; // for testing
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
+ // Indicates that steaming call has finished.
+ TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+
+ // Response headers set here once received.
+ TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
+
// Set after status is received. Used for both unary and streaming response calls.
ClientSideStatus? finishedStatus;
- bool readObserverCompleted; // True if readObserver has already been completed.
-
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
- : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
+ : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
{
- this.callDetails = callDetails;
+ this.details = callDetails.WithOptions(callDetails.Options.Normalize());
+ this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
+ }
+
+ /// <summary>
+ /// This constructor should only be used for testing.
+ /// </summary>
+ public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails)
+ {
+ this.injectedNativeCall = injectedNativeCall;
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
@@ -89,17 +103,17 @@ namespace Grpc.Core.Internal
readingDone = true;
}
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
- call.StartUnary(payload, ctx, metadataArray);
+ call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
try
{
- HandleUnaryResponse(success, ctx);
+ HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
}
catch (Exception e)
{
@@ -108,15 +122,9 @@ namespace Grpc.Core.Internal
}
}
- try
- {
- // Once the blocking call returns, the result should be available synchronously.
- return unaryResponseTcs.Task.Result;
- }
- catch (AggregateException ae)
- {
- throw ExceptionHelper.UnwrapRpcException(ae);
- }
+ // Once the blocking call returns, the result should be available synchronously.
+ // Note that GetAwaiter().GetResult() doesn't wrap exceptions in AggregateException.
+ return unaryResponseTcs.Task.GetAwaiter().GetResult();
}
}
@@ -130,7 +138,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(environment.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@@ -138,9 +146,9 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartUnary(payload, HandleUnaryResponse, metadataArray);
+ call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
}
@@ -157,12 +165,12 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(environment.CompletionQueue);
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
@@ -181,17 +189,17 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(environment.CompletionQueue);
halfcloseRequested = true;
- halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartServerStreaming(payload, HandleFinished, metadataArray);
+ call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
+ call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
}
}
@@ -206,12 +214,13 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(environment.CompletionQueue);
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
+ call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
}
}
@@ -219,9 +228,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -253,6 +262,28 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise.
+ /// </summary>
+ public Task StreamingCallFinishedTask
+ {
+ get
+ {
+ return streamingCallFinishedTcs.Task;
+ }
+ }
+
+ /// <summary>
+ /// Get the task that completes once response headers are received.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return responseHeadersTcs.Task;
+ }
+ }
+
+ /// <summary>
/// Gets the resulting status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
@@ -278,54 +309,45 @@ namespace Grpc.Core.Internal
}
}
- /// <summary>
- /// On client-side, we only fire readCompletionDelegate once all messages have been read
- /// and status has been received.
- /// </summary>
- protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate)
+ public CallInvocationDetails<TRequest, TResponse> Details
{
- if (completionDelegate != null && readingDone && finishedStatus.HasValue)
+ get
{
- bool shouldComplete;
- lock (myLock)
- {
- shouldComplete = !readObserverCompleted;
- readObserverCompleted = true;
- }
-
- if (shouldComplete)
- {
- var status = finishedStatus.Value.Status;
- if (status.StatusCode != StatusCode.OK)
- {
- FireCompletion(completionDelegate, default(TResponse), new RpcException(status));
- }
- else
- {
- FireCompletion(completionDelegate, default(TResponse), null);
- }
- }
+ return this.details;
}
}
- protected override void OnReleaseResources()
+ protected override void OnAfterReleaseResources()
{
- callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ details.Channel.RemoveCallReference(this);
}
private void Initialize(CompletionQueueSafeHandle cq)
{
- var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq,
- callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Options.Deadline));
- callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
+ var call = CreateNativeCall(cq);
+ details.Channel.AddCallReference(this);
InitializeInternal(call);
RegisterCancellationCallback();
}
+ private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
+ {
+ if (injectedNativeCall != null)
+ {
+ return injectedNativeCall; // allows injecting a mock INativeCall in tests.
+ }
+
+ var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
+
+ return details.Channel.Handle.CreateCall(environment.CompletionRegistry,
+ parentCall, ContextPropagationToken.DefaultMask, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
+ }
+
// Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
private void RegisterCancellationCallback()
{
- var token = callDetails.Options.CancellationToken;
+ var token = details.Options.CancellationToken;
if (token.CanBeCanceled)
{
token.Register(() => this.Cancel());
@@ -333,31 +355,40 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// Handler for unary response completion.
+ /// Gets WriteFlags set in callDetails.Options.WriteOptions
+ /// </summary>
+ private WriteFlags GetWriteFlagsForCall()
+ {
+ var writeOptions = details.Options.WriteOptions;
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
+ }
+
+ /// <summary>
+ /// Handles receive status completion for calls with streaming response.
/// </summary>
- private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
+ private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
- var fullStatus = ctx.GetReceivedStatusOnClient();
+ responseHeadersTcs.SetResult(responseHeaders);
+ }
+ /// <summary>
+ /// Handler for unary response completion.
+ /// </summary>
+ private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
+ {
lock (myLock)
{
finished = true;
- finishedStatus = fullStatus;
-
- halfclosed = true;
+ finishedStatus = receivedStatus;
ReleaseResourcesIfPossible();
}
- if (!success)
- {
- unaryResponseTcs.SetException(new RpcException(new Status(StatusCode.Internal, "Internal error occured.")));
- return;
- }
+ responseHeadersTcs.SetResult(responseHeaders);
- var status = fullStatus.Status;
+ var status = receivedStatus.Status;
- if (status.StatusCode != StatusCode.OK)
+ if (!success || status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
return;
@@ -365,7 +396,7 @@ namespace Grpc.Core.Internal
// TODO: handle deserialization error
TResponse msg;
- TryDeserialize(ctx.GetReceivedMessage(), out msg);
+ TryDeserialize(receivedMessage, out msg);
unaryResponseTcs.SetResult(msg);
}
@@ -373,22 +404,25 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
- private void HandleFinished(bool success, BatchContextSafeHandle ctx)
+ private void HandleFinished(bool success, ClientSideStatus receivedStatus)
{
- var fullStatus = ctx.GetReceivedStatusOnClient();
-
- AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null;
lock (myLock)
{
finished = true;
- finishedStatus = fullStatus;
-
- origReadCompletionDelegate = readCompletionDelegate;
+ finishedStatus = receivedStatus;
ReleaseResourcesIfPossible();
}
- ProcessLastRead(origReadCompletionDelegate);
+ var status = receivedStatus.Status;
+
+ if (!success || status.StatusCode != StatusCode.OK)
+ {
+ streamingCallFinishedTcs.SetException(new RpcException(status));
+ return;
+ }
+
+ streamingCallFinishedTcs.SetResult(null);
}
}
} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 38f2a5baeb..4d20394644 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -54,27 +54,30 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
+ protected readonly GrpcEnvironment environment;
protected readonly object myLock = new object();
- protected CallSafeHandle call;
+ protected INativeCall call;
protected bool disposed;
protected bool started;
- protected bool errorOccured;
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
- protected bool readingDone;
- protected bool halfcloseRequested;
- protected bool halfclosed;
+ protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
+ protected bool halfcloseRequested; // True if send close have been initiated.
protected bool finished; // True if close has been received from the peer.
- public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
+ protected bool initialMetadataSent;
+ protected long streamingWritesCounter; // Number of streaming send operations started so far.
+
+ public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
{
this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = Preconditions.CheckNotNull(deserializer);
+ this.environment = Preconditions.CheckNotNull(environment);
}
/// <summary>
@@ -111,7 +114,7 @@ namespace Grpc.Core.Internal
}
}
- protected void InitializeInternal(CallSafeHandle call)
+ protected void InitializeInternal(INativeCall call)
{
lock (myLock)
{
@@ -123,7 +126,7 @@ namespace Grpc.Core.Internal
/// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate)
+ protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
byte[] payload = UnsafeSerialize(msg);
@@ -132,8 +135,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(payload, HandleSendFinished);
+ call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
+
sendCompletionDelegate = completionDelegate;
+ initialMetadataSent = true;
+ streamingWritesCounter++;
}
}
@@ -153,16 +159,6 @@ namespace Grpc.Core.Internal
}
}
- // TODO(jtattermusch): find more fitting name for this method.
- /// <summary>
- /// Default behavior just completes the read observer, but more sofisticated behavior might be required
- /// by subclasses.
- /// </summary>
- protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate)
- {
- FireCompletion(completionDelegate, default(TRead), null);
- }
-
/// <summary>
/// If there are no more pending actions and no new actions can be started, releases
/// the underlying native resources.
@@ -171,7 +167,7 @@ namespace Grpc.Core.Internal
{
if (!disposed && call != null)
{
- bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null);
+ bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
@@ -183,34 +179,33 @@ namespace Grpc.Core.Internal
private void ReleaseResources()
{
- OnReleaseResources();
if (call != null)
{
call.Dispose();
}
disposed = true;
+ OnAfterReleaseResources();
}
- protected virtual void OnReleaseResources()
+ protected virtual void OnAfterReleaseResources()
{
}
protected void CheckSendingAllowed()
{
Preconditions.CheckState(started);
- Preconditions.CheckState(!errorOccured);
CheckNotCancelled();
Preconditions.CheckState(!disposed);
Preconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
+ Preconditions.CheckState(!finished, "Already finished.");
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
- protected void CheckReadingAllowed()
+ protected virtual void CheckReadingAllowed()
{
Preconditions.CheckState(started);
Preconditions.CheckState(!disposed);
- Preconditions.CheckState(!errorOccured);
Preconditions.CheckState(!readingDone, "Stream has already been closed.");
Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
@@ -274,7 +269,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles send completion.
/// </summary>
- protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx)
+ protected void HandleSendFinished(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@@ -287,7 +282,7 @@ namespace Grpc.Core.Internal
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new OperationFailedException("Send failed"));
+ FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed"));
}
else
{
@@ -298,12 +293,11 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles halfclose completion.
/// </summary>
- protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx)
+ protected void HandleHalfclosed(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
- halfclosed = true;
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
@@ -312,7 +306,7 @@ namespace Grpc.Core.Internal
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new OperationFailedException("Halfclose failed"));
+ FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Halfclose failed"));
}
else
{
@@ -323,23 +317,17 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles streaming read completion.
/// </summary>
- protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx)
+ protected void HandleReadFinished(bool success, byte[] receivedMessage)
{
- var payload = ctx.GetReceivedMessage();
-
AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
lock (myLock)
{
origCompletionDelegate = readCompletionDelegate;
- if (payload != null)
- {
- readCompletionDelegate = null;
- }
- else
+ readCompletionDelegate = null;
+
+ if (receivedMessage == null)
{
- // This was the last read. Keeping the readCompletionDelegate
- // to be either fired by this handler or by client-side finished
- // handler.
+ // This was the last read.
readingDone = true;
}
@@ -348,17 +336,17 @@ namespace Grpc.Core.Internal
// TODO: handle the case when error occured...
- if (payload != null)
+ if (receivedMessage != null)
{
// TODO: handle deserialization error
TRead msg;
- TryDeserialize(payload, out msg);
+ TryDeserialize(receivedMessage, out msg);
FireCompletion(origCompletionDelegate, msg, null);
}
else
{
- ProcessLastRead(origCompletionDelegate);
+ FireCompletion(origCompletionDelegate, default(TRead), null);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 513902ee36..5c47251030 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -49,17 +49,18 @@ namespace Grpc.Core.Internal
{
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
- readonly GrpcEnvironment environment;
+ readonly Server server;
- public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer)
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment)
{
- this.environment = Preconditions.CheckNotNull(environment);
+ this.server = Preconditions.CheckNotNull(server);
}
public void Initialize(CallSafeHandle call)
{
call.SetCompletionRegistry(environment.CompletionRegistry);
- environment.DebugStats.ActiveServerCalls.Increment();
+
+ server.AddCallReference(this);
InitializeInternal(call);
}
@@ -83,9 +84,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -98,6 +99,35 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Initiates sending a initial metadata.
+ /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
+ /// to make things simpler.
+ /// completionDelegate is invoked upon completion.
+ /// </summary>
+ public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(headers, "metadata");
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
+ Preconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
+ CheckSendingAllowed();
+
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ {
+ call.StartSendInitialMetadata(HandleSendFinished, metadataArray);
+ }
+
+ this.initialMetadataSent = true;
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
@@ -111,7 +141,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
+ call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;
@@ -139,18 +169,22 @@ namespace Grpc.Core.Internal
}
}
- protected override void OnReleaseResources()
+ protected override void CheckReadingAllowed()
{
- environment.DebugStats.ActiveServerCalls.Decrement();
+ base.CheckReadingAllowed();
+ Preconditions.CheckArgument(!cancelRequested);
+ }
+
+ protected override void OnAfterReleaseResources()
+ {
+ server.RemoveCallReference(this);
}
/// <summary>
/// Handles the server side close completion.
/// </summary>
- private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx)
+ private void HandleFinishedServerside(bool success, bool cancelled)
{
- bool cancelled = ctx.GetReceivedCloseOnServerCancelled();
-
lock (myLock)
{
finished = true;
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index 6a2add54db..3a96414bea 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -134,7 +134,7 @@ namespace Grpc.Core.Internal
}
// Gets data of server_rpc_new completion.
- public ServerRpcNew GetServerRpcNew()
+ public ServerRpcNew GetServerRpcNew(Server server)
{
var call = grpcsharp_batch_context_server_rpc_new_call(this);
@@ -145,7 +145,7 @@ namespace Grpc.Core.Internal
IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
- return new ServerRpcNew(call, method, host, deadline, metadata);
+ return new ServerRpcNew(server, call, method, host, deadline, metadata);
}
// Gets data of receive_close_on_server completion.
@@ -198,14 +198,16 @@ namespace Grpc.Core.Internal
/// </summary>
internal struct ServerRpcNew
{
+ readonly Server server;
readonly CallSafeHandle call;
readonly string method;
readonly string host;
readonly Timespec deadline;
readonly Metadata requestMetadata;
- public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
+ public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
{
+ this.server = server;
this.call = call;
this.method = method;
this.host = host;
@@ -213,6 +215,14 @@ namespace Grpc.Core.Internal
this.requestMetadata = requestMetadata;
}
+ public Server Server
+ {
+ get
+ {
+ return this.server;
+ }
+ }
+
public CallSafeHandle Call
{
get
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 714749b171..0f187529e8 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -40,8 +40,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
- internal class CallSafeHandle : SafeHandleZeroIsInvalid
+ internal class CallSafeHandle : SafeHandleZeroIsInvalid, INativeCall
{
+ public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
+
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
@@ -53,7 +55,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
@@ -62,7 +64,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
- MetadataArraySafeHandle metadataArray);
+ MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
@@ -70,7 +72,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -78,17 +80,25 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
+ BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
@@ -103,76 +113,90 @@ namespace Grpc.Core.Internal
this.completionRegistry = completionRegistry;
}
- public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
- public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
- public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
- public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
+ public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
- public void StartSendCloseFromClient(BatchCompletionDelegate callback)
+ public void StartSendCloseFromClient(SendCompletionHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
- public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
}
- public void StartReceiveMessage(BatchCompletionDelegate callback)
+ public void StartReceiveMessage(ReceivedMessageHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
grpcsharp_call_recv_message(this, ctx).CheckOk();
}
- public void StartServerSide(BatchCompletionDelegate callback)
+ public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+ grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
+ }
+
+ public void StartServerSide(ReceivedCloseOnServerHandler callback)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
+ public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
+ }
+
public void Cancel()
{
grpcsharp_call_cancel(this).CheckOk();
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 7324ebdf57..8cef566c14 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -47,7 +47,7 @@ namespace Grpc.Core.Internal
static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs);
[DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
+ static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect);
@@ -68,17 +68,23 @@ namespace Grpc.Core.Internal
public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs)
{
+ // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
+ // Doing so would make object finalizer crash if we end up abandoning the handle.
+ GrpcEnvironment.GrpcNativeInit();
return grpcsharp_insecure_channel_create(target, channelArgs);
}
public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs)
{
+ // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
+ // Doing so would make object finalizer crash if we end up abandoning the handle.
+ GrpcEnvironment.GrpcNativeInit();
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
- public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
+ public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
- var result = grpcsharp_channel_create_call(this, cq, method, host, deadline);
+ var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
result.SetCompletionRegistry(registry);
return result;
}
@@ -107,6 +113,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle()
{
grpcsharp_channel_destroy(handle);
+ GrpcEnvironment.GrpcNativeShutdown();
return true;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 58f493463b..013f00ff6f 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -40,16 +40,18 @@ namespace Grpc.Core.Internal
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
{
readonly AsyncCall<TRequest, TResponse> call;
+ WriteOptions writeOptions;
public ClientRequestStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
+ this.writeOptions = call.Details.Options.WriteOptions;
}
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@@ -59,5 +61,24 @@ namespace Grpc.Core.Internal
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index 6c44521038..b4a7335c7c 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -72,7 +72,13 @@ namespace Grpc.Core.Internal
call.StartReadMessage(taskSource.CompletionDelegate);
var result = await taskSource.Task;
this.current = result;
- return result != null;
+
+ if (result == null)
+ {
+ await call.StreamingCallFinishedTask;
+ return false;
+ }
+ return true;
}
public void Dispose()
diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs
index 8793450ff3..1bea1adf9e 100644
--- a/src/csharp/Grpc.Core/Internal/DebugStats.cs
+++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs
@@ -38,10 +38,6 @@ namespace Grpc.Core.Internal
{
internal class DebugStats
{
- public readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
-
- public readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
-
public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
/// <summary>
@@ -49,16 +45,6 @@ namespace Grpc.Core.Internal
/// </summary>
public void CheckOK()
{
- var remainingClientCalls = ActiveClientCalls.Count;
- if (remainingClientCalls != 0)
- {
- DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
- }
- var remainingServerCalls = ActiveServerCalls.Count;
- if (remainingServerCalls != 0)
- {
- DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
- }
var pendingBatchCompletions = PendingBatchCompletions.Count;
if (pendingBatchCompletions != 0)
{
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index cb4c7c821e..4b7124ee74 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -83,8 +83,6 @@ namespace Grpc.Core.Internal
lock (myLock)
{
cq.Shutdown();
-
- Logger.Info("Waiting for GRPC threads to finish.");
foreach (var thread in threads)
{
thread.Join();
@@ -136,7 +134,6 @@ namespace Grpc.Core.Internal
}
}
while (ev.type != GRPCCompletionType.Shutdown);
- Logger.Info("Completion queue has shutdown successfully, thread {0} exiting.", Thread.CurrentThread.Name);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
new file mode 100644
index 0000000000..cbef599139
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -0,0 +1,85 @@
+#region Copyright notice and license
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core.Internal
+{
+ internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders);
+
+ // Received status for streaming response calls.
+ internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus);
+
+ internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage);
+
+ internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders);
+
+ internal delegate void SendCompletionHandler(bool success);
+
+ internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled);
+
+ /// <summary>
+ /// Abstraction of a native call object.
+ /// </summary>
+ internal interface INativeCall : IDisposable
+ {
+ void Cancel();
+
+ void CancelWithStatus(Grpc.Core.Status status);
+
+ string GetPeer();
+
+ void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags);
+
+ void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags);
+
+ void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray);
+
+ void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags);
+
+ void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray);
+
+ void StartReceiveMessage(ReceivedMessageHandler callback);
+
+ void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback);
+
+ void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray);
+
+ void StartSendMessage(SendCompletionHandler callback, byte[] payload, Grpc.Core.WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+
+ void StartSendCloseFromClient(SendCompletionHandler callback);
+
+ void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+
+ void StartServerSide(ReceivedCloseOnServerHandler callback);
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
index 427c16fac6..83994f6762 100644
--- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
@@ -70,7 +70,8 @@ namespace Grpc.Core.Internal
var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
for (int i = 0; i < metadata.Count; i++)
{
- grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, metadata[i].ValueBytes, new UIntPtr((ulong)metadata[i].ValueBytes.Length));
+ var valueBytes = metadata[i].GetSerializedValueUnsafe();
+ grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length));
}
return metadataArray;
}
@@ -94,7 +95,7 @@ namespace Grpc.Core.Internal
string key = Marshal.PtrToStringAnsi(grpcsharp_metadata_array_get_key(metadataArray, index));
var bytes = new byte[grpcsharp_metadata_array_get_value_length(metadataArray, index).ToUInt64()];
Marshal.Copy(grpcsharp_metadata_array_get_value(metadataArray, index), bytes, 0, bytes.Length);
- metadata.Add(new Metadata.Entry(key, bytes));
+ metadata.Add(Metadata.Entry.CreateUnsafe(key, bytes));
}
return metadata;
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 19f0e3c57f..59f4c5727c 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -67,7 +67,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -75,7 +75,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -123,7 +123,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -131,7 +131,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -179,7 +179,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -187,7 +187,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
@@ -239,7 +239,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -247,7 +247,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
@@ -278,7 +278,7 @@ namespace Grpc.Core.Internal
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
- (payload) => payload, (payload) => payload, environment);
+ (payload) => payload, (payload) => payload, environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -304,13 +304,14 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
- public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken)
+ public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
+ where TRequest : class
+ where TResponse : class
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
- return new ServerCallContext(
- newRpc.Method, newRpc.Host, peer, realtimeDeadline,
- newRpc.RequestMetadata, cancellationToken);
+ return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
+ newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 756dcee87f..03e39efc02 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -38,11 +38,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Writes responses asynchronously to an underlying AsyncCallServer object.
/// </summary>
- internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>
+ internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions
where TRequest : class
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
+ WriteOptions writeOptions;
public ServerResponseStream(AsyncCallServer<TRequest, TResponse> call)
{
@@ -52,7 +53,7 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@@ -62,5 +63,31 @@ namespace Grpc.Core.Internal
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ var taskSource = new AsyncCompletionTaskSource<object>();
+ call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
+ return taskSource.Task;
+ }
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptions;
+ }
+
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index f9b44b1acf..5ee7ac14e8 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -74,6 +74,9 @@ namespace Grpc.Core.Internal
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
{
+ // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
+ // Doing so would make object finalizer crash if we end up abandoning the handle.
+ GrpcEnvironment.GrpcNativeInit();
return grpcsharp_server_create(cq, args);
}
@@ -109,6 +112,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle()
{
grpcsharp_server_destroy(handle);
+ GrpcEnvironment.GrpcNativeShutdown();
return true;
}
diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs
index e83d21f4a4..daf85d5f61 100644
--- a/src/csharp/Grpc.Core/Internal/Timespec.cs
+++ b/src/csharp/Grpc.Core/Internal/Timespec.cs
@@ -211,7 +211,7 @@ namespace Grpc.Core.Internal
return Timespec.InfPast;
}
- Preconditions.CheckArgument(dateTime.Kind == DateTimeKind.Utc, "dateTime");
+ Preconditions.CheckArgument(dateTime.Kind == DateTimeKind.Utc, "dateTime needs of kind DateTimeKind.Utc or be equal to DateTime.MaxValue or DateTime.MinValue.");
try
{
diff --git a/src/csharp/Grpc.Core/KeyCertificatePair.cs b/src/csharp/Grpc.Core/KeyCertificatePair.cs
index 5def15a656..6f691975e9 100644
--- a/src/csharp/Grpc.Core/KeyCertificatePair.cs
+++ b/src/csharp/Grpc.Core/KeyCertificatePair.cs
@@ -54,8 +54,8 @@ namespace Grpc.Core
/// <param name="privateKey">PEM encoded private key.</param>
public KeyCertificatePair(string certificateChain, string privateKey)
{
- this.certificateChain = Preconditions.CheckNotNull(certificateChain);
- this.privateKey = Preconditions.CheckNotNull(privateKey);
+ this.certificateChain = Preconditions.CheckNotNull(certificateChain, "certificateChain");
+ this.privateKey = Preconditions.CheckNotNull(privateKey, "privateKey");
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
index c67765c78d..35561d25d8 100644
--- a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
+++ b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
@@ -42,16 +42,33 @@ namespace Grpc.Core.Logging
readonly Type forType;
readonly string forTypeString;
+ /// <summary>Creates a console logger not associated to any specific type.</summary>
public ConsoleLogger() : this(null)
{
}
+ /// <summary>Creates a console logger that logs messsage specific for given type.</summary>
private ConsoleLogger(Type forType)
{
this.forType = forType;
- this.forTypeString = forType != null ? forType.FullName + " " : "";
+ if (forType != null)
+ {
+ var namespaceStr = forType.Namespace ?? "";
+ if (namespaceStr.Length > 0)
+ {
+ namespaceStr += ".";
+ }
+ this.forTypeString = namespaceStr + forType.Name + " ";
+ }
+ else
+ {
+ this.forTypeString = "";
+ }
}
-
+
+ /// <summary>
+ /// Returns a logger associated with the specified type.
+ /// </summary>
public ILogger ForType<T>()
{
if (typeof(T) == forType)
@@ -61,31 +78,37 @@ namespace Grpc.Core.Logging
return new ConsoleLogger(typeof(T));
}
+ /// <summary>Logs a message with severity Debug.</summary>
public void Debug(string message, params object[] formatArgs)
{
Log("D", message, formatArgs);
}
+ /// <summary>Logs a message with severity Info.</summary>
public void Info(string message, params object[] formatArgs)
{
Log("I", message, formatArgs);
}
+ /// <summary>Logs a message with severity Warning.</summary>
public void Warning(string message, params object[] formatArgs)
{
Log("W", message, formatArgs);
}
+ /// <summary>Logs a message and an associated exception with severity Warning.</summary>
public void Warning(Exception exception, string message, params object[] formatArgs)
{
Log("W", message + " " + exception, formatArgs);
}
+ /// <summary>Logs a message with severity Error.</summary>
public void Error(string message, params object[] formatArgs)
{
Log("E", message, formatArgs);
}
+ /// <summary>Logs a message and an associated exception with severity Error.</summary>
public void Error(Exception exception, string message, params object[] formatArgs)
{
Log("E", message + " " + exception, formatArgs);
diff --git a/src/csharp/Grpc.Core/Logging/ILogger.cs b/src/csharp/Grpc.Core/Logging/ILogger.cs
index 0d58f133e3..61e0c91388 100644
--- a/src/csharp/Grpc.Core/Logging/ILogger.cs
+++ b/src/csharp/Grpc.Core/Logging/ILogger.cs
@@ -42,16 +42,22 @@ namespace Grpc.Core.Logging
/// <summary>Returns a logger associated with the specified type.</summary>
ILogger ForType<T>();
+ /// <summary>Logs a message with severity Debug.</summary>
void Debug(string message, params object[] formatArgs);
+ /// <summary>Logs a message with severity Info.</summary>
void Info(string message, params object[] formatArgs);
+ /// <summary>Logs a message with severity Warning.</summary>
void Warning(string message, params object[] formatArgs);
+ /// <summary>Logs a message and an associated exception with severity Warning.</summary>
void Warning(Exception exception, string message, params object[] formatArgs);
+ /// <summary>Logs a message with severity Error.</summary>
void Error(string message, params object[] formatArgs);
+ /// <summary>Logs a message and an associated exception with severity Error.</summary>
void Error(Exception exception, string message, params object[] formatArgs);
}
}
diff --git a/src/csharp/Grpc.Core/Marshaller.cs b/src/csharp/Grpc.Core/Marshaller.cs
index 8b1a929074..f38cb0863f 100644
--- a/src/csharp/Grpc.Core/Marshaller.cs
+++ b/src/csharp/Grpc.Core/Marshaller.cs
@@ -37,19 +37,27 @@ using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
- /// For serializing and deserializing messages.
+ /// Encapsulates the logic for serializing and deserializing messages.
/// </summary>
public struct Marshaller<T>
{
readonly Func<T, byte[]> serializer;
readonly Func<byte[], T> deserializer;
+ /// <summary>
+ /// Initializes a new marshaller.
+ /// </summary>
+ /// <param name="serializer">Function that will be used to serialize messages.</param>
+ /// <param name="deserializer">Function that will be used to deserialize messages.</param>
public Marshaller(Func<T, byte[]> serializer, Func<byte[], T> deserializer)
{
- this.serializer = Preconditions.CheckNotNull(serializer);
- this.deserializer = Preconditions.CheckNotNull(deserializer);
+ this.serializer = Preconditions.CheckNotNull(serializer, "serializer");
+ this.deserializer = Preconditions.CheckNotNull(deserializer, "deserializer");
}
+ /// <summary>
+ /// Gets the serializer function.
+ /// </summary>
public Func<T, byte[]> Serializer
{
get
@@ -58,6 +66,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets the deserializer function.
+ /// </summary>
public Func<byte[], T> Deserializer
{
get
@@ -72,11 +83,17 @@ namespace Grpc.Core
/// </summary>
public static class Marshallers
{
+ /// <summary>
+ /// Creates a marshaller from specified serializer and deserializer.
+ /// </summary>
public static Marshaller<T> Create<T>(Func<T, byte[]> serializer, Func<byte[], T> deserializer)
{
return new Marshaller<T>(serializer, deserializer);
}
+ /// <summary>
+ /// Returns a marshaller for <c>string</c> type. This is useful for testing.
+ /// </summary>
public static Marshaller<string> StringMarshaller
{
get
diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs
index 6fd0a7109d..a589b50caa 100644
--- a/src/csharp/Grpc.Core/Metadata.cs
+++ b/src/csharp/Grpc.Core/Metadata.cs
@@ -46,6 +46,11 @@ namespace Grpc.Core
public sealed class Metadata : IList<Metadata.Entry>
{
/// <summary>
+ /// All binary headers should have this suffix.
+ /// </summary>
+ public const string BinaryHeaderSuffix = "-bin";
+
+ /// <summary>
/// An read-only instance of metadata containing no entries.
/// </summary>
public static readonly Metadata Empty = new Metadata().Freeze();
@@ -114,6 +119,16 @@ namespace Grpc.Core
entries.Add(item);
}
+ public void Add(string key, string value)
+ {
+ Add(new Entry(key, value));
+ }
+
+ public void Add(string key, byte[] valueBytes)
+ {
+ Add(new Entry(key, valueBytes));
+ }
+
public void Clear()
{
CheckWriteable();
@@ -171,23 +186,49 @@ namespace Grpc.Core
private static readonly Encoding Encoding = Encoding.ASCII;
readonly string key;
- string value;
- byte[] valueBytes;
+ readonly string value;
+ readonly byte[] valueBytes;
+ private Entry(string key, string value, byte[] valueBytes)
+ {
+ this.key = key;
+ this.value = value;
+ this.valueBytes = valueBytes;
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct with a binary value.
+ /// </summary>
+ /// <param name="key">Metadata key, needs to have suffix indicating a binary valued metadata entry.</param>
+ /// <param name="valueBytes">Value bytes.</param>
public Entry(string key, byte[] valueBytes)
{
- this.key = Preconditions.CheckNotNull(key);
+ this.key = NormalizeKey(key);
+ Preconditions.CheckArgument(this.key.EndsWith(BinaryHeaderSuffix),
+ "Key for binary valued metadata entry needs to have suffix indicating binary value.");
this.value = null;
- this.valueBytes = Preconditions.CheckNotNull(valueBytes);
+ Preconditions.CheckNotNull(valueBytes, "valueBytes");
+ this.valueBytes = new byte[valueBytes.Length];
+ Buffer.BlockCopy(valueBytes, 0, this.valueBytes, 0, valueBytes.Length); // defensive copy to guarantee immutability
}
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct holding an ASCII value.
+ /// </summary>
+ /// <param name="key">Metadata key, must not use suffix indicating a binary valued metadata entry.</param>
+ /// <param name="value">Value string. Only ASCII characters are allowed.</param>
public Entry(string key, string value)
{
- this.key = Preconditions.CheckNotNull(key);
- this.value = Preconditions.CheckNotNull(value);
+ this.key = NormalizeKey(key);
+ Preconditions.CheckArgument(!this.key.EndsWith(BinaryHeaderSuffix),
+ "Key for ASCII valued metadata entry cannot have suffix indicating binary value.");
+ this.value = Preconditions.CheckNotNull(value, "value");
this.valueBytes = null;
}
+ /// <summary>
+ /// Gets the metadata entry key.
+ /// </summary>
public string Key
{
get
@@ -196,33 +237,86 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets the binary value of this metadata entry.
+ /// </summary>
public byte[] ValueBytes
{
get
{
if (valueBytes == null)
{
- valueBytes = Encoding.GetBytes(value);
+ return Encoding.GetBytes(value);
}
- return valueBytes;
+
+ // defensive copy to guarantee immutability
+ var bytes = new byte[valueBytes.Length];
+ Buffer.BlockCopy(valueBytes, 0, bytes, 0, valueBytes.Length);
+ return bytes;
}
}
+ /// <summary>
+ /// Gets the string value of this metadata entry.
+ /// </summary>
public string Value
{
get
{
- if (value == null)
- {
- value = Encoding.GetString(valueBytes);
- }
- return value;
+ Preconditions.CheckState(!IsBinary, "Cannot access string value of a binary metadata entry");
+ return value ?? Encoding.GetString(valueBytes);
}
}
-
+
+ /// <summary>
+ /// Returns <c>true</c> if this entry is a binary-value entry.
+ /// </summary>
+ public bool IsBinary
+ {
+ get
+ {
+ return value == null;
+ }
+ }
+
+ /// <summary>
+ /// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Metadata+Entry"/>.
+ /// </summary>
public override string ToString()
{
- return string.Format("[Entry: key={0}, value={1}]", Key, Value);
+ if (IsBinary)
+ {
+ return string.Format("[Entry: key={0}, valueBytes={1}]", key, valueBytes);
+ }
+
+ return string.Format("[Entry: key={0}, value={1}]", key, value);
+ }
+
+ /// <summary>
+ /// Gets the serialized value for this entry. For binary metadata entries, this leaks
+ /// the internal <c>valueBytes</c> byte array and caller must not change contents of it.
+ /// </summary>
+ internal byte[] GetSerializedValueUnsafe()
+ {
+ return valueBytes ?? Encoding.GetBytes(value);
+ }
+
+ /// <summary>
+ /// Creates a binary value or ascii value metadata entry from data received from the native layer.
+ /// We trust C core to give us well-formed data, so we don't perform any checks or defensive copying.
+ /// </summary>
+ internal static Entry CreateUnsafe(string key, byte[] valueBytes)
+ {
+ if (key.EndsWith(BinaryHeaderSuffix))
+ {
+ return new Entry(key, null, valueBytes);
+ }
+ return new Entry(key, Encoding.GetString(valueBytes), null);
+ }
+
+ private static string NormalizeKey(string key)
+ {
+ return Preconditions.CheckNotNull(key, "key").ToLower();
}
}
}
diff --git a/src/csharp/Grpc.Core/Method.cs b/src/csharp/Grpc.Core/Method.cs
index cc047ac9f8..4c53285893 100644
--- a/src/csharp/Grpc.Core/Method.cs
+++ b/src/csharp/Grpc.Core/Method.cs
@@ -41,16 +41,50 @@ namespace Grpc.Core
/// </summary>
public enum MethodType
{
- Unary, // Unary request, unary response.
- ClientStreaming, // Streaming request, unary response.
- ServerStreaming, // Unary request, streaming response.
- DuplexStreaming // Streaming request, streaming response.
+ /// <summary>Single request sent from client, single response received from server.</summary>
+ Unary,
+
+ /// <summary>Stream of request sent from client, single response received from server.</summary>
+ ClientStreaming,
+
+ /// <summary>Single request sent from client, stream of responses received from server.</summary>
+ ServerStreaming,
+
+ /// <summary>Both server and client can stream arbitrary number of requests and responses simultaneously.</summary>
+ DuplexStreaming
}
/// <summary>
- /// A description of a service method.
+ /// A non-generic representation of a remote method.
/// </summary>
- public class Method<TRequest, TResponse>
+ public interface IMethod
+ {
+ /// <summary>
+ /// Gets the type of the method.
+ /// </summary>
+ MethodType Type { get; }
+
+ /// <summary>
+ /// Gets the name of the service to which this method belongs.
+ /// </summary>
+ string ServiceName { get; }
+
+ /// <summary>
+ /// Gets the unqualified name of the method.
+ /// </summary>
+ string Name { get; }
+
+ /// <summary>
+ /// Gets the fully qualified name of the method. On the server side, methods are dispatched
+ /// based on this name.
+ /// </summary>
+ string FullName { get; }
+ }
+
+ /// <summary>
+ /// A description of a remote method.
+ /// </summary>
+ public class Method<TRequest, TResponse> : IMethod
{
readonly MethodType type;
readonly string serviceName;
@@ -59,16 +93,27 @@ namespace Grpc.Core
readonly Marshaller<TResponse> responseMarshaller;
readonly string fullName;
+ /// <summary>
+ /// Initializes a new instance of the <c>Method</c> class.
+ /// </summary>
+ /// <param name="type">Type of method.</param>
+ /// <param name="serviceName">Name of service this method belongs to.</param>
+ /// <param name="name">Unqualified name of the method.</param>
+ /// <param name="requestMarshaller">Marshaller used for request messages.</param>
+ /// <param name="responseMarshaller">Marshaller used for response messages.</param>
public Method(MethodType type, string serviceName, string name, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller)
{
this.type = type;
- this.serviceName = Preconditions.CheckNotNull(serviceName);
- this.name = Preconditions.CheckNotNull(name);
- this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller);
- this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller);
- this.fullName = GetFullName(serviceName);
+ this.serviceName = Preconditions.CheckNotNull(serviceName, "serviceName");
+ this.name = Preconditions.CheckNotNull(name, "name");
+ this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller, "requestMarshaller");
+ this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller, "responseMarshaller");
+ this.fullName = GetFullName(serviceName, name);
}
+ /// <summary>
+ /// Gets the type of the method.
+ /// </summary>
public MethodType Type
{
get
@@ -77,6 +122,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets the name of the service to which this method belongs.
+ /// </summary>
public string ServiceName
{
get
@@ -85,6 +133,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets the unqualified name of the method.
+ /// </summary>
public string Name
{
get
@@ -93,6 +144,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets the marshaller used for request messages.
+ /// </summary>
public Marshaller<TRequest> RequestMarshaller
{
get
@@ -101,6 +155,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets the marshaller used for response messages.
+ /// </summary>
public Marshaller<TResponse> ResponseMarshaller
{
get
@@ -108,7 +165,11 @@ namespace Grpc.Core
return this.responseMarshaller;
}
}
-
+
+ /// <summary>
+ /// Gets the fully qualified name of the method. On the server side, methods are dispatched
+ /// based on this name.
+ /// </summary>
public string FullName
{
get
@@ -120,9 +181,9 @@ namespace Grpc.Core
/// <summary>
/// Gets full name of the method including the service name.
/// </summary>
- internal string GetFullName(string serviceName)
+ internal static string GetFullName(string serviceName, string methodName)
{
- return "/" + Preconditions.CheckNotNull(serviceName) + "/" + this.Name;
+ return "/" + serviceName + "/" + methodName;
}
}
}
diff --git a/src/csharp/Grpc.Core/RpcException.cs b/src/csharp/Grpc.Core/RpcException.cs
index c58578286b..cac417e626 100644
--- a/src/csharp/Grpc.Core/RpcException.cs
+++ b/src/csharp/Grpc.Core/RpcException.cs
@@ -36,22 +36,34 @@ using System;
namespace Grpc.Core
{
/// <summary>
- /// Thrown when remote procedure call fails.
+ /// Thrown when remote procedure call fails. Every <c>RpcException</c> is associated with a resulting <see cref="Status"/> of the call.
/// </summary>
public class RpcException : Exception
{
private readonly Status status;
+ /// <summary>
+ /// Creates a new <c>RpcException</c> associated with given status.
+ /// </summary>
+ /// <param name="status">Resulting status of a call.</param>
public RpcException(Status status) : base(status.ToString())
{
this.status = status;
}
+ /// <summary>
+ /// Creates a new <c>RpcException</c> associated with given status and message.
+ /// </summary>
+ /// <param name="status">Resulting status of a call.</param>
+ /// <param name="message">The exception message.</param>
public RpcException(Status status, string message) : base(message)
{
this.status = status;
}
+ /// <summary>
+ /// Resulting status of the call.
+ /// </summary>
public Status Status
{
get
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index eb5b043d1c..28f1686e20 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -50,6 +50,8 @@ namespace Grpc.Core
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
+ readonly AtomicCounter activeCallCounter = new AtomicCounter();
+
readonly ServiceDefinitionCollection serviceDefinitions;
readonly ServerPortCollection ports;
readonly GrpcEnvironment environment;
@@ -73,7 +75,7 @@ namespace Grpc.Core
{
this.serviceDefinitions = new ServiceDefinitionCollection(this);
this.ports = new ServerPortCollection(this);
- this.environment = GrpcEnvironment.GetInstance();
+ this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
@@ -106,6 +108,17 @@ namespace Grpc.Core
}
/// <summary>
+ /// To allow awaiting termination of the server.
+ /// </summary>
+ public Task ShutdownTask
+ {
+ get
+ {
+ return shutdownTcs.Task;
+ }
+ }
+
+ /// <summary>
/// Starts the server.
/// </summary>
public void Start()
@@ -136,18 +149,9 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment);
await shutdownTcs.Task;
- handle.Dispose();
- }
+ DisposeHandle();
- /// <summary>
- /// To allow awaiting termination of the server.
- /// </summary>
- public Task ShutdownTask
- {
- get
- {
- return shutdownTcs.Task;
- }
+ await Task.Run(() => GrpcEnvironment.Release());
}
/// <summary>
@@ -166,7 +170,22 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment);
handle.CancelAllCalls();
await shutdownTcs.Task;
- handle.Dispose();
+ DisposeHandle();
+ }
+
+ internal void AddCallReference(object call)
+ {
+ activeCallCounter.Increment();
+
+ bool success = false;
+ handle.DangerousAddRef(ref success);
+ Preconditions.CheckState(success);
+ }
+
+ internal void RemoveCallReference(object call)
+ {
+ handle.DangerousRelease();
+ activeCallCounter.Decrement();
}
/// <summary>
@@ -192,7 +211,7 @@ namespace Grpc.Core
{
lock (myLock)
{
- Preconditions.CheckNotNull(serverPort.Credentials);
+ Preconditions.CheckNotNull(serverPort.Credentials, "serverPort");
Preconditions.CheckState(!startRequested);
var address = string.Format("{0}:{1}", serverPort.Host, serverPort.Port);
int boundPort;
@@ -227,6 +246,16 @@ namespace Grpc.Core
}
}
+ private void DisposeHandle()
+ {
+ var activeCallCount = activeCallCounter.Count;
+ if (activeCallCount > 0)
+ {
+ Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount);
+ }
+ handle.Dispose();
+ }
+
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
@@ -254,7 +283,7 @@ namespace Grpc.Core
{
if (success)
{
- ServerRpcNew newRpc = ctx.GetServerRpcNew();
+ ServerRpcNew newRpc = ctx.GetServerRpcNew(this);
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs
index 032b1390db..75d81c64f3 100644
--- a/src/csharp/Grpc.Core/ServerCallContext.cs
+++ b/src/csharp/Grpc.Core/ServerCallContext.cs
@@ -36,15 +36,16 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
+using Grpc.Core.Internal;
+
namespace Grpc.Core
{
/// <summary>
/// Context for a server-side call.
/// </summary>
- public sealed class ServerCallContext
+ public class ServerCallContext
{
- // TODO(jtattermusch): expose method to send initial metadata back to client
-
+ private readonly CallSafeHandle callHandle;
private readonly string method;
private readonly string host;
private readonly string peer;
@@ -54,15 +55,34 @@ namespace Grpc.Core
private readonly Metadata responseTrailers = new Metadata();
private Status status = Status.DefaultSuccess;
+ private Func<Metadata, Task> writeHeadersFunc;
+ private IHasWriteOptions writeOptionsHolder;
- public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken)
+ internal ServerCallContext(CallSafeHandle callHandle, string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
+ Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder)
{
+ this.callHandle = callHandle;
this.method = method;
this.host = host;
this.peer = peer;
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
+ this.writeHeadersFunc = writeHeadersFunc;
+ this.writeOptionsHolder = writeOptionsHolder;
+ }
+
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ return writeHeadersFunc(responseHeaders);
+ }
+
+ /// <summary>
+ /// Creates a propagation token to be used to propagate call context to a child call.
+ /// </summary>
+ public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null)
+ {
+ return new ContextPropagationToken(callHandle, deadline, cancellationToken, options);
}
/// <summary>Name of method called in this RPC.</summary>
@@ -110,7 +130,7 @@ namespace Grpc.Core
}
}
- ///<summary>Cancellation token signals when call is cancelled.</summary>
+ /// <summary>Cancellation token signals when call is cancelled.</summary>
public CancellationToken CancellationToken
{
get
@@ -141,5 +161,31 @@ namespace Grpc.Core
status = value;
}
}
+
+ /// <summary>
+ /// Allows setting write options for the following write.
+ /// For streaming response calls, this property is also exposed as on IServerStreamWriter for convenience.
+ /// Both properties are backed by the same underlying value.
+ /// </summary>
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptionsHolder.WriteOptions;
+ }
+
+ set
+ {
+ writeOptionsHolder.WriteOptions = value;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Allows sharing write options between ServerCallContext and other objects.
+ /// </summary>
+ public interface IHasWriteOptions
+ {
+ WriteOptions WriteOptions { get; set; }
}
}
diff --git a/src/csharp/Grpc.Core/ServerCredentials.cs b/src/csharp/Grpc.Core/ServerCredentials.cs
index c11a1ede08..3c6703d30e 100644
--- a/src/csharp/Grpc.Core/ServerCredentials.cs
+++ b/src/csharp/Grpc.Core/ServerCredentials.cs
@@ -91,7 +91,7 @@ namespace Grpc.Core
{
this.keyCertificatePairs = new List<KeyCertificatePair>(keyCertificatePairs).AsReadOnly();
Preconditions.CheckArgument(this.keyCertificatePairs.Count > 0,
- "At least one KeyCertificatePair needs to be provided");
+ "At least one KeyCertificatePair needs to be provided.");
if (forceClientAuth)
{
Preconditions.CheckNotNull(rootCertificates,
diff --git a/src/csharp/Grpc.Core/ServerMethods.cs b/src/csharp/Grpc.Core/ServerMethods.cs
index d457770203..1f119a80ff 100644
--- a/src/csharp/Grpc.Core/ServerMethods.cs
+++ b/src/csharp/Grpc.Core/ServerMethods.cs
@@ -31,12 +31,8 @@
#endregion
-using System;
-using System.Threading;
using System.Threading.Tasks;
-using Grpc.Core.Internal;
-
namespace Grpc.Core
{
/// <summary>
diff --git a/src/csharp/Grpc.Core/ServerPort.cs b/src/csharp/Grpc.Core/ServerPort.cs
index 55e4bd0062..598404d045 100644
--- a/src/csharp/Grpc.Core/ServerPort.cs
+++ b/src/csharp/Grpc.Core/ServerPort.cs
@@ -62,9 +62,9 @@ namespace Grpc.Core
/// <param name="credentials">credentials to use to secure this port.</param>
public ServerPort(string host, int port, ServerCredentials credentials)
{
- this.host = Preconditions.CheckNotNull(host);
+ this.host = Preconditions.CheckNotNull(host, "host");
this.port = port;
- this.credentials = Preconditions.CheckNotNull(credentials);
+ this.credentials = Preconditions.CheckNotNull(credentials, "credentials");
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs
index a00d156e52..94b0a320c3 100644
--- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs
+++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs
@@ -79,7 +79,7 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
- callHandlers.Add(method.GetFullName(serviceName), ServerCalls.UnaryCall(method, handler));
+ callHandlers.Add(method.FullName, ServerCalls.UnaryCall(method, handler));
return this;
}
@@ -89,7 +89,7 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
- callHandlers.Add(method.GetFullName(serviceName), ServerCalls.ClientStreamingCall(method, handler));
+ callHandlers.Add(method.FullName, ServerCalls.ClientStreamingCall(method, handler));
return this;
}
@@ -99,7 +99,7 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
- callHandlers.Add(method.GetFullName(serviceName), ServerCalls.ServerStreamingCall(method, handler));
+ callHandlers.Add(method.FullName, ServerCalls.ServerStreamingCall(method, handler));
return this;
}
@@ -109,7 +109,7 @@ namespace Grpc.Core
where TRequest : class
where TResponse : class
{
- callHandlers.Add(method.GetFullName(serviceName), ServerCalls.DuplexStreamingCall(method, handler));
+ callHandlers.Add(method.FullName, ServerCalls.DuplexStreamingCall(method, handler));
return this;
}
diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs
index 754f6cb3ca..6bd8dc820b 100644
--- a/src/csharp/Grpc.Core/Status.cs
+++ b/src/csharp/Grpc.Core/Status.cs
@@ -29,13 +29,12 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
-using System;
-using System.Runtime.InteropServices;
+using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
- /// Represents RPC result.
+ /// Represents RPC result, which consists of <see cref="StatusCode"/> and an optional detail string.
/// </summary>
public struct Status
{
@@ -52,6 +51,11 @@ namespace Grpc.Core
readonly StatusCode statusCode;
readonly string detail;
+ /// <summary>
+ /// Creates a new instance of <c>Status</c>.
+ /// </summary>
+ /// <param name="statusCode">Status code.</param>
+ /// <param name="detail">Detail.</param>
public Status(StatusCode statusCode, string detail)
{
this.statusCode = statusCode;
@@ -80,6 +84,9 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Status"/>.
+ /// </summary>
public override string ToString()
{
return string.Format("Status(StatusCode={0}, Detail=\"{1}\")", statusCode, detail);
diff --git a/src/csharp/Grpc.Core/StatusCode.cs b/src/csharp/Grpc.Core/StatusCode.cs
index a9696fa469..90606955af 100644
--- a/src/csharp/Grpc.Core/StatusCode.cs
+++ b/src/csharp/Grpc.Core/StatusCode.cs
@@ -31,8 +31,6 @@
#endregion
-using System;
-
namespace Grpc.Core
{
/// <summary>
@@ -41,101 +39,101 @@ namespace Grpc.Core
/// </summary>
public enum StatusCode
{
- /* Not an error; returned on success */
+ /// <summary>Not an error; returned on success.</summary>
OK = 0,
- /* The operation was cancelled (typically by the caller). */
+
+ /// <summary>The operation was cancelled (typically by the caller).</summary>
Cancelled = 1,
- /* Unknown error. An example of where this error may be returned is
- if a Status value received from another address space belongs to
- an error-space that is not known in this address space. Also
- errors raised by APIs that do not return enough error information
- may be converted to this error. */
+
+ /// <summary>
+ /// Unknown error. An example of where this error may be returned is
+ /// if a Status value received from another address space belongs to
+ /// an error-space that is not known in this address space. Also
+ /// errors raised by APIs that do not return enough error information
+ /// may be converted to this error.
+ /// </summary>
Unknown = 2,
- /* Client specified an invalid argument. Note that this differs
- from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments
- that are problematic regardless of the state of the system
- (e.g., a malformed file name). */
+
+ /// <summary>
+ /// Client specified an invalid argument. Note that this differs
+ /// from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments
+ /// that are problematic regardless of the state of the system
+ /// (e.g., a malformed file name).
+ /// </summary>
InvalidArgument = 3,
- /* Deadline expired before operation could complete. For operations
- that change the state of the system, this error may be returned
- even if the operation has completed successfully. For example, a
- successful response from a server could have been delayed long
- enough for the deadline to expire. */
+
+ /// <summary>
+ /// Deadline expired before operation could complete. For operations
+ /// that change the state of the system, this error may be returned
+ /// even if the operation has completed successfully. For example, a
+ /// successful response from a server could have been delayed long
+ /// enough for the deadline to expire.
+ /// </summary>
DeadlineExceeded = 4,
- /* Some requested entity (e.g., file or directory) was not found. */
+
+ /// <summary>Some requested entity (e.g., file or directory) was not found.</summary>
NotFound = 5,
- /* Some entity that we attempted to create (e.g., file or directory)
- already exists. */
+
+ /// <summary>Some entity that we attempted to create (e.g., file or directory) already exists.</summary>
AlreadyExists = 6,
- /* The caller does not have permission to execute the specified
- operation. PERMISSION_DENIED must not be used for rejections
- caused by exhausting some resource (use RESOURCE_EXHAUSTED
- instead for those errors). PERMISSION_DENIED must not be
- used if the caller can not be identified (use UNAUTHENTICATED
- instead for those errors). */
+
+ /// <summary>
+ /// The caller does not have permission to execute the specified
+ /// operation. PERMISSION_DENIED must not be used for rejections
+ /// caused by exhausting some resource (use RESOURCE_EXHAUSTED
+ /// instead for those errors). PERMISSION_DENIED must not be
+ /// used if the caller can not be identified (use UNAUTHENTICATED
+ /// instead for those errors).
+ /// </summary>
PermissionDenied = 7,
- /* The request does not have valid authentication credentials for the
- operation. */
+
+ /// <summary>The request does not have valid authentication credentials for the operation.</summary>
Unauthenticated = 16,
- /* Some resource has been exhausted, perhaps a per-user quota, or
- perhaps the entire file system is out of space. */
+
+ /// <summary>
+ /// Some resource has been exhausted, perhaps a per-user quota, or
+ /// perhaps the entire file system is out of space.
+ /// </summary>
ResourceExhausted = 8,
- /* Operation was rejected because the system is not in a state
- required for the operation's execution. For example, directory
- to be deleted may be non-empty, an rmdir operation is applied to
- a non-directory, etc.
-
- A litmus test that may help a service implementor in deciding
- between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE:
- (a) Use UNAVAILABLE if the client can retry just the failing call.
- (b) Use ABORTED if the client should retry at a higher-level
- (e.g., restarting a read-modify-write sequence).
- (c) Use FAILED_PRECONDITION if the client should not retry until
- the system state has been explicitly fixed. E.g., if an "rmdir"
- fails because the directory is non-empty, FAILED_PRECONDITION
- should be returned since the client should not retry unless
- they have first fixed up the directory by deleting files from it.
- (d) Use FAILED_PRECONDITION if the client performs conditional
- REST Get/Update/Delete on a resource and the resource on the
- server does not match the condition. E.g., conflicting
- read-modify-write on the same resource. */
+
+ /// <summary>
+ /// Operation was rejected because the system is not in a state
+ /// required for the operation's execution. For example, directory
+ /// to be deleted may be non-empty, an rmdir operation is applied to
+ /// a non-directory, etc.
+ /// </summary>
FailedPrecondition = 9,
- /* The operation was aborted, typically due to a concurrency issue
- like sequencer check failures, transaction aborts, etc.
- See litmus test above for deciding between FAILED_PRECONDITION,
- ABORTED, and UNAVAILABLE. */
+ /// <summary>
+ /// The operation was aborted, typically due to a concurrency issue
+ /// like sequencer check failures, transaction aborts, etc.
+ /// </summary>
Aborted = 10,
- /* Operation was attempted past the valid range. E.g., seeking or
- reading past end of file.
-
- Unlike INVALID_ARGUMENT, this error indicates a problem that may
- be fixed if the system state changes. For example, a 32-bit file
- system will generate INVALID_ARGUMENT if asked to read at an
- offset that is not in the range [0,2^32-1], but it will generate
- OUT_OF_RANGE if asked to read from an offset past the current
- file size.
-
- There is a fair bit of overlap between FAILED_PRECONDITION and
- OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific
- error) when it applies so that callers who are iterating through
- a space can easily look for an OUT_OF_RANGE error to detect when
- they are done. */
+
+ /// <summary>
+ /// Operation was attempted past the valid range. E.g., seeking or
+ /// reading past end of file.
+ /// </summary>
OutOfRange = 11,
- /* Operation is not implemented or not supported/enabled in this service. */
+
+ /// <summary>Operation is not implemented or not supported/enabled in this service.</summary>
Unimplemented = 12,
- /* Internal errors. Means some invariants expected by underlying
- system has been broken. If you see one of these errors,
- something is very broken. */
+
+ /// <summary>
+ /// Internal errors. Means some invariants expected by underlying
+ /// system has been broken. If you see one of these errors,
+ /// something is very broken.
+ /// </summary>
Internal = 13,
- /* The service is currently unavailable. This is a most likely a
- transient condition and may be corrected by retrying with
- a backoff.
- See litmus test above for deciding between FAILED_PRECONDITION,
- ABORTED, and UNAVAILABLE. */
+ /// <summary>
+ /// The service is currently unavailable. This is a most likely a
+ /// transient condition and may be corrected by retrying with
+ /// a backoff.
+ /// </summary>
Unavailable = 14,
- /* Unrecoverable data loss or corruption. */
+
+ /// <summary>Unrecoverable data loss or corruption.</summary>
DataLoss = 15
}
}
diff --git a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
index 8a748b45a8..cdf1e51026 100644
--- a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
+++ b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
@@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
-using System.Linq;
using System.Threading.Tasks;
namespace Grpc.Core.Utils
@@ -46,7 +45,7 @@ namespace Grpc.Core.Utils
/// <summary>
/// Reads the entire stream and executes an async action for each element.
/// </summary>
- public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
+ public static async Task ForEachAsync<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
where T : class
{
while (await streamReader.MoveNext())
@@ -58,7 +57,7 @@ namespace Grpc.Core.Utils
/// <summary>
/// Reads the entire stream and creates a list containing all the elements read.
/// </summary>
- public static async Task<List<T>> ToList<T>(this IAsyncStreamReader<T> streamReader)
+ public static async Task<List<T>> ToListAsync<T>(this IAsyncStreamReader<T> streamReader)
where T : class
{
var result = new List<T>();
@@ -73,7 +72,7 @@ namespace Grpc.Core.Utils
/// Writes all elements from given enumerable to the stream.
/// Completes the stream afterwards unless close = false.
/// </summary>
- public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
+ public static async Task WriteAllAsync<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
where T : class
{
foreach (var element in elements)
@@ -89,7 +88,7 @@ namespace Grpc.Core.Utils
/// <summary>
/// Writes all elements from given enumerable to the stream.
/// </summary>
- public static async Task WriteAll<T>(this IServerStreamWriter<T> streamWriter, IEnumerable<T> elements)
+ public static async Task WriteAllAsync<T>(this IServerStreamWriter<T> streamWriter, IEnumerable<T> elements)
where T : class
{
foreach (var element in elements)
diff --git a/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs
index 82653c3a1f..eb3a5b16e3 100644
--- a/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs
+++ b/src/csharp/Grpc.Core/Utils/BenchmarkUtil.cs
@@ -39,6 +39,9 @@ using System.Threading.Tasks;
namespace Grpc.Core.Utils
{
+ /// <summary>
+ /// Utility methods to run microbenchmarks.
+ /// </summary>
public static class BenchmarkUtil
{
/// <summary>
diff --git a/src/csharp/Grpc.Core/Utils/Preconditions.cs b/src/csharp/Grpc.Core/Utils/Preconditions.cs
index aeb5d210a7..374262f87a 100644
--- a/src/csharp/Grpc.Core/Utils/Preconditions.cs
+++ b/src/csharp/Grpc.Core/Utils/Preconditions.cs
@@ -32,17 +32,16 @@
#endregion
using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Threading.Tasks;
namespace Grpc.Core.Utils
{
+ /// <summary>
+ /// Utility methods to simplify checking preconditions in the code.
+ /// </summary>
public static class Preconditions
{
/// <summary>
- /// Throws ArgumentException if condition is false.
+ /// Throws <see cref="ArgumentException"/> if condition is false.
/// </summary>
public static void CheckArgument(bool condition)
{
@@ -53,7 +52,7 @@ namespace Grpc.Core.Utils
}
/// <summary>
- /// Throws ArgumentException with given message if condition is false.
+ /// Throws <see cref="ArgumentException"/> with given message if condition is false.
/// </summary>
public static void CheckArgument(bool condition, string errorMessage)
{
@@ -64,31 +63,31 @@ namespace Grpc.Core.Utils
}
/// <summary>
- /// Throws NullReferenceException if reference is null.
+ /// Throws <see cref="ArgumentNullException"/> if reference is null.
/// </summary>
public static T CheckNotNull<T>(T reference)
{
if (reference == null)
{
- throw new NullReferenceException();
+ throw new ArgumentNullException();
}
return reference;
}
/// <summary>
- /// Throws NullReferenceException with given message if reference is null.
+ /// Throws <see cref="ArgumentNullException"/> if reference is null.
/// </summary>
- public static T CheckNotNull<T>(T reference, string errorMessage)
+ public static T CheckNotNull<T>(T reference, string paramName)
{
if (reference == null)
{
- throw new NullReferenceException(errorMessage);
+ throw new ArgumentNullException(paramName);
}
return reference;
}
/// <summary>
- /// Throws InvalidOperationException if condition is false.
+ /// Throws <see cref="InvalidOperationException"/> if condition is false.
/// </summary>
public static void CheckState(bool condition)
{
@@ -99,7 +98,7 @@ namespace Grpc.Core.Utils
}
/// <summary>
- /// Throws InvalidOperationException with given message if condition is false.
+ /// Throws <see cref="InvalidOperationException"/> with given message if condition is false.
/// </summary>
public static void CheckState(bool condition, string errorMessage)
{
diff --git a/src/csharp/Grpc.Core/Version.cs b/src/csharp/Grpc.Core/Version.cs
index b5cb652945..d02b301cac 100644
--- a/src/csharp/Grpc.Core/Version.cs
+++ b/src/csharp/Grpc.Core/Version.cs
@@ -1,5 +1,37 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
using System.Reflection;
-using System.Runtime.CompilerServices;
// The current version of gRPC C#.
-[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".*")]
+[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".0")]
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index 656a3d47bb..b6dbd3b49c 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -1,13 +1,46 @@
-using System.Reflection;
-using System.Runtime.CompilerServices;
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
namespace Grpc.Core
{
+ /// <summary>
+ /// Provides info about current version of gRPC.
+ /// </summary>
public static class VersionInfo
{
/// <summary>
/// Current version of gRPC
/// </summary>
- public const string CurrentVersion = "0.6.0";
+ public const string CurrentVersion = "0.6.1";
}
}
diff --git a/src/csharp/Grpc.Core/Utils/ExceptionHelper.cs b/src/csharp/Grpc.Core/WriteOptions.cs
index c4d6bee058..7ef3189d76 100644
--- a/src/csharp/Grpc.Core/Utils/ExceptionHelper.cs
+++ b/src/csharp/Grpc.Core/WriteOptions.cs
@@ -33,25 +33,50 @@
using System;
-namespace Grpc.Core.Utils
+namespace Grpc.Core
{
- public static class ExceptionHelper
+ /// <summary>
+ /// Flags for write operations.
+ /// </summary>
+ [Flags]
+ public enum WriteFlags
{
/// <summary>
- /// If inner exceptions contain RpcException, rethrows it.
- /// Otherwise, rethrows the original aggregate exception.
- /// Always throws, the exception return type is here only to make the.
+ /// Hint that the write may be buffered and need not go out on the wire immediately.
+ /// gRPC is free to buffer the message until the next non-buffered
+ /// write, or until write stream completion, but it need not buffer completely or at all.
/// </summary>
- public static Exception UnwrapRpcException(AggregateException ae)
+ BufferHint = 0x1,
+
+ /// <summary>
+ /// Force compression to be disabled for a particular write.
+ /// </summary>
+ NoCompress = 0x2
+ }
+
+ /// <summary>
+ /// Options for write operations.
+ /// </summary>
+ public class WriteOptions
+ {
+ /// <summary>
+ /// Default write options.
+ /// </summary>
+ public static readonly WriteOptions Default = new WriteOptions();
+
+ private WriteFlags flags;
+
+ public WriteOptions(WriteFlags flags = default(WriteFlags))
+ {
+ this.flags = flags;
+ }
+
+ public WriteFlags Flags
{
- foreach (var e in ae.InnerExceptions)
+ get
{
- if (e is RpcException)
- {
- throw e;
- }
+ return this.flags;
}
- throw ae;
}
}
}