diff options
Diffstat (limited to 'src/csharp/Grpc.Core')
35 files changed, 1240 insertions, 263 deletions
diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/CallInvocationDetails.cs index 94c5e26082..eb23a3a209 100644 --- a/src/csharp/Grpc.Core/Call.cs +++ b/src/csharp/Grpc.Core/CallInvocationDetails.cs @@ -38,30 +38,30 @@ using Grpc.Core.Utils; namespace Grpc.Core { /// <summary> - /// Abstraction of a call to be invoked on a client. + /// Details about a client-side call to be invoked. /// </summary> - public class Call<TRequest, TResponse> + public class CallInvocationDetails<TRequest, TResponse> { - readonly string name; + readonly Channel channel; + readonly string method; + readonly string host; readonly Marshaller<TRequest> requestMarshaller; readonly Marshaller<TResponse> responseMarshaller; - readonly Channel channel; - readonly Metadata headers; - readonly DateTime deadline; + readonly CallOptions options; - public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers) - : this(serviceName, method, channel, headers, DateTime.MaxValue) + public CallInvocationDetails(Channel channel, Method<TRequest, TResponse> method, CallOptions options) : + this(channel, method.FullName, null, method.RequestMarshaller, method.ResponseMarshaller, options) { } - public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers, DateTime deadline) + public CallInvocationDetails(Channel channel, string method, string host, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller, CallOptions options) { - this.name = method.GetFullName(serviceName); - this.requestMarshaller = method.RequestMarshaller; - this.responseMarshaller = method.ResponseMarshaller; this.channel = Preconditions.CheckNotNull(channel); - this.headers = Preconditions.CheckNotNull(headers); - this.deadline = deadline; + this.method = Preconditions.CheckNotNull(method); + this.host = host; + this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller); + this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller); + this.options = Preconditions.CheckNotNull(options); } public Channel Channel @@ -72,49 +72,43 @@ namespace Grpc.Core } } - /// <summary> - /// Full methods name including the service name. - /// </summary> - public string Name + public string Method { get { - return name; + return this.method; } } - /// <summary> - /// Headers to send at the beginning of the call. - /// </summary> - public Metadata Headers + public string Host { get { - return headers; + return this.host; } } - public DateTime Deadline + public Marshaller<TRequest> RequestMarshaller { get { - return this.deadline; + return this.requestMarshaller; } } - public Marshaller<TRequest> RequestMarshaller + public Marshaller<TResponse> ResponseMarshaller { get { - return requestMarshaller; + return this.responseMarshaller; } } - - public Marshaller<TResponse> ResponseMarshaller + + public CallOptions Options { get { - return responseMarshaller; + return options; } } } diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs new file mode 100644 index 0000000000..0d82b5a28e --- /dev/null +++ b/src/csharp/Grpc.Core/CallOptions.cs @@ -0,0 +1,118 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading; + +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core +{ + /// <summary> + /// Options for calls made by client. + /// </summary> + public class CallOptions + { + readonly Metadata headers; + readonly DateTime deadline; + readonly CancellationToken cancellationToken; + readonly WriteOptions writeOptions; + readonly ContextPropagationToken propagationToken; + + /// <summary> + /// Creates a new instance of <c>CallOptions</c>. + /// </summary> + /// <param name="headers">Headers to be sent with the call.</param> + /// <param name="deadline">Deadline for the call to finish. null means no deadline.</param> + /// <param name="cancellationToken">Can be used to request cancellation of the call.</param> + /// <param name="writeOptions">Write options that will be used for this call.</param> + /// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param> + public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken? cancellationToken = null, + WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null) + { + // TODO(jtattermusch): consider only creating metadata object once it's really needed. + this.headers = headers ?? new Metadata(); + this.deadline = deadline ?? (propagationToken != null ? propagationToken.Deadline : DateTime.MaxValue); + this.cancellationToken = cancellationToken ?? (propagationToken != null ? propagationToken.CancellationToken : CancellationToken.None); + this.writeOptions = writeOptions; + this.propagationToken = propagationToken; + } + + /// <summary> + /// Headers to send at the beginning of the call. + /// </summary> + public Metadata Headers + { + get { return headers; } + } + + /// <summary> + /// Call deadline. + /// </summary> + public DateTime Deadline + { + get { return deadline; } + } + + /// <summary> + /// Token that can be used for cancelling the call. + /// </summary> + public CancellationToken CancellationToken + { + get { return cancellationToken; } + } + + /// <summary> + /// Write options that will be used for this call. + /// </summary> + public WriteOptions WriteOptions + { + get + { + return this.writeOptions; + } + } + + /// <summary> + /// Token for propagating parent call context. + /// </summary> + public ContextPropagationToken PropagationToken + { + get + { + return this.propagationToken; + } + } + } +} diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index 054fc27491..00a8cabf82 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -43,70 +43,52 @@ namespace Grpc.Core /// </summary> public static class Calls { - public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) + public static TResponse BlockingUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - // TODO(jtattermusch): this gives a race that cancellation can be requested before the call even starts. - RegisterCancellationCallback(asyncCall, token); - return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers, call.Deadline); + var asyncCall = new AsyncCall<TRequest, TResponse>(call); + return asyncCall.UnaryCall(req); } - public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) + public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); - var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers, call.Deadline); - RegisterCancellationCallback(asyncCall, token); + var asyncCall = new AsyncCall<TRequest, TResponse>(call); + var asyncResult = asyncCall.UnaryCallAsync(req); return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) + public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call, TRequest req) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); - asyncCall.StartServerStreamingCall(req, call.Headers, call.Deadline); - RegisterCancellationCallback(asyncCall, token); + var asyncCall = new AsyncCall<TRequest, TResponse>(call); + asyncCall.StartServerStreamingCall(req); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) + public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); - var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers, call.Deadline); - RegisterCancellationCallback(asyncCall, token); + var asyncCall = new AsyncCall<TRequest, TResponse>(call); + var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) + public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvocationDetails<TRequest, TResponse> call) where TRequest : class where TResponse : class { - var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer); - asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name, Timespec.FromDateTime(call.Deadline)); - asyncCall.StartDuplexStreamingCall(call.Headers, call.Deadline); - RegisterCancellationCallback(asyncCall, token); + var asyncCall = new AsyncCall<TRequest, TResponse>(call); + asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } - - private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token) - { - if (token.CanBeCanceled) - { - token.Register(() => asyncCall.Cancel()); - } - } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 18e6f2fda5..9273ea4582 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -37,6 +37,8 @@ using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; +using Grpc.Core.Logging; +using Grpc.Core.Utils; namespace Grpc.Core { @@ -45,21 +47,23 @@ namespace Grpc.Core /// </summary> public class Channel : IDisposable { + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>(); + readonly GrpcEnvironment environment; readonly ChannelSafeHandle handle; readonly List<ChannelOption> options; - readonly string target; bool disposed; /// <summary> /// Creates a channel that connects to a specific host. - /// Port will default to 80 for an unsecure channel and to 443 a secure channel. + /// Port will default to 80 for an unsecure channel and to 443 for a secure channel. /// </summary> - /// <param name="host">The DNS name of IP address of the host.</param> + /// <param name="host">The name or IP address of the host.</param> /// <param name="credentials">Credentials to secure the channel.</param> /// <param name="options">Channel options.</param> public Channel(string host, Credentials credentials, IEnumerable<ChannelOption> options = null) { + Preconditions.CheckNotNull(host); this.environment = GrpcEnvironment.GetInstance(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); @@ -76,14 +80,13 @@ namespace Grpc.Core this.handle = ChannelSafeHandle.CreateInsecure(host, nativeChannelArgs); } } - this.target = GetOverridenTarget(host, this.options); } /// <summary> /// Creates a channel that connects to a specific host and port. /// </summary> - /// <param name="host">DNS name or IP address</param> - /// <param name="port">the port</param> + /// <param name="host">The name or IP address of the host.</param> + /// <param name="port">The port.</param> /// <param name="credentials">Credentials to secure the channel.</param> /// <param name="options">Channel options.</param> public Channel(string host, int port, Credentials credentials, IEnumerable<ChannelOption> options = null) : @@ -91,41 +94,87 @@ namespace Grpc.Core { } - public void Dispose() + /// <summary> + /// Gets current connectivity state of this channel. + /// </summary> + public ChannelState State { - Dispose(true); - GC.SuppressFinalize(this); + get + { + return handle.CheckConnectivityState(false); + } } - internal string Target + /// <summary> + /// Returned tasks completes once channel state has become different from + /// given lastObservedState. + /// If deadline is reached or and error occurs, returned task is cancelled. + /// </summary> + public Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null) { - get + Preconditions.CheckArgument(lastObservedState != ChannelState.FatalFailure, + "FatalFailure is a terminal state. No further state changes can occur."); + var tcs = new TaskCompletionSource<object>(); + var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture; + var handler = new BatchCompletionDelegate((success, ctx) => { - return target; - } + if (success) + { + tcs.SetResult(null); + } + else + { + tcs.SetCanceled(); + } + }); + handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler); + return tcs.Task; } - internal ChannelSafeHandle Handle + /// <summary> Address of the remote endpoint in URI format.</summary> + public string Target { get { - return this.handle; + return handle.GetTarget(); } } - internal CompletionQueueSafeHandle CompletionQueue + /// <summary> + /// Allows explicitly requesting channel to connect without starting an RPC. + /// Returned task completes once state Ready was seen. If the deadline is reached, + /// or channel enters the FatalFailure state, the task is cancelled. + /// There is no need to call this explicitly unless your use case requires that. + /// Starting an RPC on a new channel will request connection implicitly. + /// </summary> + public async Task ConnectAsync(DateTime? deadline = null) { - get + var currentState = handle.CheckConnectivityState(true); + while (currentState != ChannelState.Ready) { - return this.environment.CompletionQueue; + if (currentState == ChannelState.FatalFailure) + { + throw new OperationCanceledException("Channel has reached FatalFailure state."); + } + await WaitForStateChangedAsync(currentState, deadline); + currentState = handle.CheckConnectivityState(false); } } - internal CompletionRegistry CompletionRegistry + /// <summary> + /// Destroys the underlying channel. + /// </summary> + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + internal ChannelSafeHandle Handle { get { - return this.environment.CompletionRegistry; + return this.handle; } } @@ -159,26 +208,5 @@ namespace Grpc.Core // TODO(jtattermusch): it would be useful to also provide .NET/mono version. return string.Format("grpc-csharp/{0}", VersionInfo.CurrentVersion); } - - /// <summary> - /// Look for SslTargetNameOverride option and return its value instead of originalTarget - /// if found. - /// </summary> - private static string GetOverridenTarget(string originalTarget, IEnumerable<ChannelOption> options) - { - if (options == null) - { - return originalTarget; - } - foreach (var option in options) - { - if (option.Type == ChannelOption.OptionType.String - && option.Name == ChannelOptions.SslTargetNameOverride) - { - return option.StringValue; - } - } - return originalTarget; - } } } diff --git a/src/csharp/Grpc.Core/ChannelOptions.cs b/src/csharp/Grpc.Core/ChannelOptions.cs index 9fe03d2805..1e0f90287a 100644 --- a/src/csharp/Grpc.Core/ChannelOptions.cs +++ b/src/csharp/Grpc.Core/ChannelOptions.cs @@ -30,7 +30,6 @@ #endregion using System; using System.Collections.Generic; -using System.Collections.Immutable; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; @@ -135,6 +134,9 @@ namespace Grpc.Core /// <summary>Initial sequence number for http2 transports</summary> public const string Http2InitialSequenceNumber = "grpc.http2.initial_sequence_number"; + /// <summary>Default authority for calls.</summary> + public const string DefaultAuthority = "grpc.default_authority"; + /// <summary>Primary user agent: goes at the start of the user-agent metadata</summary> public const string PrimaryUserAgentString = "grpc.primary_user_agent"; diff --git a/src/csharp/Grpc.Core/ChannelState.cs b/src/csharp/Grpc.Core/ChannelState.cs new file mode 100644 index 0000000000..d293b98f75 --- /dev/null +++ b/src/csharp/Grpc.Core/ChannelState.cs @@ -0,0 +1,69 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + /// <summary> + /// Connectivity state of a channel. + /// Based on grpc_connectivity_state from grpc/grpc.h + /// </summary> + public enum ChannelState + { + /// <summary> + /// Channel is idle + /// </summary> + Idle, + + /// <summary> + /// Channel is connecting + /// </summary> + Connecting, + + /// <summary> + /// Channel is ready for work + /// </summary> + Ready, + + /// <summary> + /// Channel has seen a failure but expects to recover + /// </summary> + TransientFailure, + + /// <summary> + /// Channel has seen a failure that it cannot recover from + /// </summary> + FatalFailure + } +} diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index fd3473128a..88494bb4ac 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -76,19 +76,17 @@ namespace Grpc.Core /// <summary> /// Creates a new call to given method. /// </summary> - protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method, Metadata metadata, DateTime? deadline) + protected CallInvocationDetails<TRequest, TResponse> CreateCall<TRequest, TResponse>(Method<TRequest, TResponse> method, CallOptions options) where TRequest : class where TResponse : class { var interceptor = HeaderInterceptor; if (interceptor != null) { - metadata = metadata ?? new Metadata(); - interceptor(metadata); - metadata.Freeze(); + interceptor(options.Headers); + options.Headers.Freeze(); } - return new Call<TRequest, TResponse>(serviceName, method, channel, - metadata ?? Metadata.Empty, deadline ?? DateTime.MaxValue); + return new CallInvocationDetails<TRequest, TResponse>(channel, method, options); } } } diff --git a/src/csharp/Grpc.Core/CompressionLevel.cs b/src/csharp/Grpc.Core/CompressionLevel.cs new file mode 100644 index 0000000000..399652b85e --- /dev/null +++ b/src/csharp/Grpc.Core/CompressionLevel.cs @@ -0,0 +1,63 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + /// <summary> + /// Compression level based on grpc_compression_level from grpc/compression.h + /// </summary> + public enum CompressionLevel + { + /// <summary> + /// No compression. + /// </summary> + None = 0, + + /// <summary> + /// Low compression. + /// </summary> + Low, + + /// <summary> + /// Medium compression. + /// </summary> + Medium, + + /// <summary> + /// High compression. + /// </summary> + High, + } +} diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs new file mode 100644 index 0000000000..b6ea5115a4 --- /dev/null +++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs @@ -0,0 +1,139 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading; + +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core +{ + /// <summary> + /// Token for propagating context of server side handlers to child calls. + /// In situations when a backend is making calls to another backend, + /// it makes sense to propagate properties like deadline and cancellation + /// token of the server call to the child call. + /// C core provides some other contexts (like tracing context) that + /// are not accessible to C# layer, but this token still allows propagating them. + /// </summary> + public class ContextPropagationToken + { + /// <summary> + /// Default propagation mask used by C core. + /// </summary> + const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff; + + /// <summary> + /// Default propagation mask used by C# - we want to propagate deadline + /// and cancellation token by our own means. + /// </summary> + internal const ContextPropagationFlags DefaultMask = DefaultCoreMask + & ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation; + + readonly CallSafeHandle parentCall; + readonly DateTime deadline; + readonly CancellationToken cancellationToken; + readonly ContextPropagationOptions options; + + internal ContextPropagationToken(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options) + { + this.parentCall = Preconditions.CheckNotNull(parentCall); + this.deadline = deadline; + this.cancellationToken = cancellationToken; + this.options = options ?? ContextPropagationOptions.Default; + } + + internal CallSafeHandle ParentCall + { + get + { + return this.parentCall; + } + } + + internal DateTime Deadline + { + get + { + return this.deadline; + } + } + + internal CancellationToken CancellationToken + { + get + { + return this.cancellationToken; + } + } + + internal ContextPropagationOptions Options + { + get + { + return this.options; + } + } + + internal bool IsPropagateDeadline + { + get { return false; } + } + + internal bool IsPropagateCancellation + { + get { return false; } + } + } + + /// <summary> + /// Options for <see cref="ContextPropagationToken"/>. + /// </summary> + public class ContextPropagationOptions + { + public static readonly ContextPropagationOptions Default = new ContextPropagationOptions(); + } + + /// <summary> + /// Context propagation flags from grpc/grpc.h. + /// </summary> + [Flags] + internal enum ContextPropagationFlags + { + Deadline = 1, + CensusStatsContext = 2, + CensusTracingContext = 4, + Cancellation = 8 + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 940a6b8ac0..e535c47f55 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -44,9 +44,6 @@ <Reference Include="System.Interactive.Async"> <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> </Reference> - <Reference Include="System.Collections.Immutable"> - <HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> - </Reference> </ItemGroup> <ItemGroup> <Compile Include="AsyncDuplexStreamingCall.cs" /> @@ -55,11 +52,11 @@ <Compile Include="IServerStreamWriter.cs" /> <Compile Include="IAsyncStreamWriter.cs" /> <Compile Include="IAsyncStreamReader.cs" /> + <Compile Include="ServerPort.cs" /> <Compile Include="Version.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="RpcException.cs" /> <Compile Include="Calls.cs" /> - <Compile Include="Call.cs" /> <Compile Include="AsyncClientStreamingCall.cs" /> <Compile Include="GrpcEnvironment.cs" /> <Compile Include="Status.cs" /> @@ -115,6 +112,12 @@ <Compile Include="Logging\ILogger.cs" /> <Compile Include="Logging\ConsoleLogger.cs" /> <Compile Include="Internal\NativeLogRedirector.cs" /> + <Compile Include="ChannelState.cs" /> + <Compile Include="CallInvocationDetails.cs" /> + <Compile Include="CallOptions.cs" /> + <Compile Include="CompressionLevel.cs" /> + <Compile Include="WriteOptions.cs" /> + <Compile Include="ContextPropagationToken.cs" /> </ItemGroup> <ItemGroup> <None Include="Grpc.Core.nuspec" /> @@ -145,7 +148,5 @@ </Target> <Import Project="..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets" Condition="Exists('..\packages\grpc.dependencies.openssl.redist.1.0.2.2\build\portable-net45\grpc.dependencies.openssl.redist.targets')" /> <Import Project="..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets" Condition="Exists('..\packages\grpc.dependencies.zlib.redist.1.2.8.9\build\portable-net45\grpc.dependencies.zlib.redist.targets')" /> - <ItemGroup> - <Folder Include="Logging\" /> - </ItemGroup> + <ItemGroup /> </Project>
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec index 086776f69d..fe49efc7ec 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.nuspec +++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec @@ -15,7 +15,6 @@ <copyright>Copyright 2015, Google Inc.</copyright> <tags>gRPC RPC Protocol HTTP/2</tags> <dependencies> - <dependency id="System.Collections.Immutable" version="1.1.36" /> <dependency id="Ix-Async" version="1.2.3" /> <dependency id="grpc.native.csharp_ext" version="$GrpcNativeCsharpExtVersion$" /> </dependencies> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 034a66be3c..1bb83c9962 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -53,6 +53,9 @@ namespace Grpc.Core [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_shutdown(); + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_version_string(); // returns not-owned const char* + static object staticLock = new object(); static GrpcEnvironment instance; @@ -164,6 +167,15 @@ namespace Grpc.Core } /// <summary> + /// Gets version of gRPC C core. + /// </summary> + internal static string GetCoreVersionString() + { + var ptr = grpcsharp_version_string(); // the pointer is not owned + return Marshal.PtrToStringAnsi(ptr); + } + + /// <summary> /// Shuts down this environment. /// </summary> private void Close() diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 371fbf27ce..c0a0674e50 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -43,7 +43,7 @@ namespace Grpc.Core /// A stream of messages to be read. /// </summary> /// <typeparam name="T"></typeparam> - public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse> + public interface IAsyncStreamReader<T> : IAsyncEnumerator<T> { // TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface. } diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs index 2000210252..4e2acb9c71 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs @@ -50,5 +50,13 @@ namespace Grpc.Core /// </summary> /// <param name="message">the message to be written. Cannot be null.</param> Task WriteAsync(T message); + + /// <summary> + /// Write options that will be used for the next write. + /// If null, default options will be used. + /// Once set, this property maintains its value across subsequent + /// writes. + /// <value>The write options.</value> + WriteOptions WriteOptions { get; set; } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index bfcb9366a1..0db9d2a515 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -50,7 +50,7 @@ namespace Grpc.Core.Internal { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); - Channel channel; + readonly CallInvocationDetails<TRequest, TResponse> details; // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; @@ -60,26 +60,19 @@ namespace Grpc.Core.Internal bool readObserverCompleted; // True if readObserver has already been completed. - public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer) + public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) + : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) { - } - - public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline) - { - this.channel = channel; - var call = channel.Handle.CreateCall(channel.CompletionRegistry, cq, methodName, channel.Target, deadline); - channel.Environment.DebugStats.ActiveClientCalls.Increment(); - InitializeInternal(call); + this.details = callDetails; + this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. } // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but // it is reusing fair amount of code in this class, so we are leaving it here. - // TODO: for other calls, you need to call Initialize, this methods calls initialize - // on its own, so there's a usage inconsistency. /// <summary> /// Blocking unary request - unary response call. /// </summary> - public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers, DateTime deadline) + public TResponse UnaryCall(TRequest msg) { using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) { @@ -89,17 +82,19 @@ namespace Grpc.Core.Internal lock (myLock) { - Initialize(channel, cq, methodName, Timespec.FromDateTime(deadline)); + Preconditions.CheckState(!started); started = true; + Initialize(cq); + halfcloseRequested = true; readingDone = true; } - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { using (var ctx = BatchContextSafeHandle.Create()) { - call.StartUnary(payload, ctx, metadataArray); + call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall()); var ev = cq.Pluck(ctx.Handle); bool success = (ev.success != 0); @@ -129,22 +124,24 @@ namespace Grpc.Core.Internal /// <summary> /// Starts a unary request - unary response call. /// </summary> - public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers, DateTime deadline) + public Task<TResponse> UnaryCallAsync(TRequest msg) { lock (myLock) { - Preconditions.CheckNotNull(call); - + Preconditions.CheckState(!started); started = true; + + Initialize(details.Channel.Environment.CompletionQueue); + halfcloseRequested = true; readingDone = true; byte[] payload = UnsafeSerialize(msg); unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartUnary(payload, HandleUnaryResponse, metadataArray); + call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall()); } return unaryResponseTcs.Task; } @@ -154,17 +151,19 @@ namespace Grpc.Core.Internal /// Starts a streamed request - unary response call. /// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// </summary> - public Task<TResponse> ClientStreamingCallAsync(Metadata headers, DateTime deadline) + public Task<TResponse> ClientStreamingCallAsync() { lock (myLock) { - Preconditions.CheckNotNull(call); - + Preconditions.CheckState(!started); started = true; + + Initialize(details.Channel.Environment.CompletionQueue); + readingDone = true; unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartClientStreaming(HandleUnaryResponse, metadataArray); } @@ -176,21 +175,23 @@ namespace Grpc.Core.Internal /// <summary> /// Starts a unary request - streamed response call. /// </summary> - public void StartServerStreamingCall(TRequest msg, Metadata headers, DateTime deadline) + public void StartServerStreamingCall(TRequest msg) { lock (myLock) { - Preconditions.CheckNotNull(call); - + Preconditions.CheckState(!started); started = true; + + Initialize(details.Channel.Environment.CompletionQueue); + halfcloseRequested = true; halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. byte[] payload = UnsafeSerialize(msg); - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartServerStreaming(payload, HandleFinished, metadataArray); + call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall()); } } } @@ -199,15 +200,16 @@ namespace Grpc.Core.Internal /// Starts a streaming request - streaming response call. /// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// </summary> - public void StartDuplexStreamingCall(Metadata headers, DateTime deadline) + public void StartDuplexStreamingCall() { lock (myLock) { - Preconditions.CheckNotNull(call); - + Preconditions.CheckState(!started); started = true; - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + Initialize(details.Channel.Environment.CompletionQueue); + + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartDuplexStreaming(HandleFinished, metadataArray); } @@ -218,9 +220,9 @@ namespace Grpc.Core.Internal /// Sends a streaming request. Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate) + public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) { - StartSendMessageInternal(msg, completionDelegate); + StartSendMessageInternal(msg, writeFlags, completionDelegate); } /// <summary> @@ -277,6 +279,14 @@ namespace Grpc.Core.Internal } } + public CallInvocationDetails<TRequest, TResponse> Details + { + get + { + return this.details; + } + } + /// <summary> /// On client-side, we only fire readCompletionDelegate once all messages have been read /// and status has been received. @@ -309,7 +319,39 @@ namespace Grpc.Core.Internal protected override void OnReleaseResources() { - channel.Environment.DebugStats.ActiveClientCalls.Decrement(); + details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement(); + } + + private void Initialize(CompletionQueueSafeHandle cq) + { + var propagationToken = details.Options.PropagationToken; + var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance; + + var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, + parentCall, ContextPropagationToken.DefaultMask, cq, + details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline)); + details.Channel.Environment.DebugStats.ActiveClientCalls.Increment(); + InitializeInternal(call); + RegisterCancellationCallback(); + } + + // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called. + private void RegisterCancellationCallback() + { + var token = details.Options.CancellationToken; + if (token.CanBeCanceled) + { + token.Register(() => this.Cancel()); + } + } + + /// <summary> + /// Gets WriteFlags set in callDetails.Options.WriteOptions + /// </summary> + private WriteFlags GetWriteFlagsForCall() + { + var writeOptions = details.Options.WriteOptions; + return writeOptions != null ? writeOptions.Flags : default(WriteFlags); } /// <summary> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 38f2a5baeb..9fa0baca87 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -71,6 +71,9 @@ namespace Grpc.Core.Internal protected bool halfclosed; protected bool finished; // True if close has been received from the peer. + protected bool initialMetadataSent; + protected long streamingWritesCounter; + public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) { this.serializer = Preconditions.CheckNotNull(serializer); @@ -123,7 +126,7 @@ namespace Grpc.Core.Internal /// Initiates sending a message. Only one send operation can be active at a time. /// completionDelegate is invoked upon completion. /// </summary> - protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate) + protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) { byte[] payload = UnsafeSerialize(msg); @@ -132,8 +135,11 @@ namespace Grpc.Core.Internal Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); - call.StartSendMessage(payload, HandleSendFinished); + call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); + sendCompletionDelegate = completionDelegate; + initialMetadataSent = true; + streamingWritesCounter++; } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 513902ee36..3710a65d6b 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -83,9 +83,9 @@ namespace Grpc.Core.Internal /// Sends a streaming response. Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate) + public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) { - StartSendMessageInternal(msg, completionDelegate); + StartSendMessageInternal(msg, writeFlags, completionDelegate); } /// <summary> @@ -98,6 +98,35 @@ namespace Grpc.Core.Internal } /// <summary> + /// Initiates sending a initial metadata. + /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation + /// to make things simpler. + /// completionDelegate is invoked upon completion. + /// </summary> + public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate) + { + lock (myLock) + { + Preconditions.CheckNotNull(headers, "metadata"); + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + + Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call."); + Preconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts."); + CheckSendingAllowed(); + + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + + using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + { + call.StartSendInitialMetadata(HandleSendFinished, metadataArray); + } + + this.initialMetadataSent = true; + sendCompletionDelegate = completionDelegate; + } + } + + /// <summary> /// Sends call result status, also indicating server is done with streaming responses. /// Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. @@ -111,7 +140,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray); + call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent); } halfcloseRequested = true; readingDone = true; diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 714749b171..3cb01e29bd 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -42,6 +42,8 @@ namespace Grpc.Core.Internal /// </summary> internal class CallSafeHandle : SafeHandleZeroIsInvalid { + public static readonly CallSafeHandle NullInstance = new CallSafeHandle(); + const uint GRPC_WRITE_BUFFER_HINT = 1; CompletionRegistry completionRegistry; @@ -53,7 +55,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray); + BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, @@ -62,7 +64,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, - MetadataArraySafeHandle metadataArray); + MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, @@ -70,7 +72,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len); + BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, @@ -78,7 +80,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, @@ -89,6 +91,10 @@ namespace Grpc.Core.Internal BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call, + BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call); [DllImport("grpc_csharp_ext.dll")] @@ -103,17 +109,17 @@ namespace Grpc.Core.Internal this.completionRegistry = completionRegistry; } - public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray) + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } - public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray) + public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { - grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray) + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } @@ -124,11 +130,11 @@ namespace Grpc.Core.Internal grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk(); + grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); } public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) @@ -138,11 +144,11 @@ namespace Grpc.Core.Internal grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback) + public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk(); + grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); } public void StartSendCloseFromClient(BatchCompletionDelegate callback) @@ -152,11 +158,11 @@ namespace Grpc.Core.Internal grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } - public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk(); + grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); } public void StartReceiveMessage(BatchCompletionDelegate callback) @@ -173,6 +179,13 @@ namespace Grpc.Core.Internal grpcsharp_call_start_serverside(this, ctx).CheckOk(); } + public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); + } + public void Cancel() { grpcsharp_call_cancel(this).CheckOk(); diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 20815efbd3..7f03bf4ea5 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -47,7 +47,17 @@ namespace Grpc.Core.Internal static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs); [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); + static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); + + [DllImport("grpc_csharp_ext.dll")] + static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_channel_watch_connectivity_state(ChannelSafeHandle channel, ChannelState lastObservedState, + Timespec deadline, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern CStringSafeHandle grpcsharp_channel_get_target(ChannelSafeHandle call); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_channel_destroy(IntPtr channel); @@ -66,13 +76,34 @@ namespace Grpc.Core.Internal return grpcsharp_secure_channel_create(credentials, target, channelArgs); } - public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) + public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) { - var result = grpcsharp_channel_create_call(this, cq, method, host, deadline); + var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline); result.SetCompletionRegistry(registry); return result; } + public ChannelState CheckConnectivityState(bool tryToConnect) + { + return grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0); + } + + public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, + CompletionRegistry completionRegistry, BatchCompletionDelegate callback) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx); + } + + public string GetTarget() + { + using (var cstring = grpcsharp_channel_get_target(this)) + { + return cstring.GetValue(); + } + } + protected override bool ReleaseHandle() { grpcsharp_channel_destroy(handle); diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 58f493463b..013f00ff6f 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -40,16 +40,18 @@ namespace Grpc.Core.Internal internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest> { readonly AsyncCall<TRequest, TResponse> call; + WriteOptions writeOptions; public ClientRequestStream(AsyncCall<TRequest, TResponse> call) { this.call = call; + this.writeOptions = call.Details.Options.WriteOptions; } public Task WriteAsync(TRequest message) { var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendMessage(message, taskSource.CompletionDelegate); + call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); return taskSource.Task; } @@ -59,5 +61,24 @@ namespace Grpc.Core.Internal call.StartSendCloseFromClient(taskSource.CompletionDelegate); return taskSource.Task; } + + public WriteOptions WriteOptions + { + get + { + return this.writeOptions; + } + + set + { + writeOptions = value; + } + } + + private WriteFlags GetWriteFlags() + { + var options = writeOptions; + return options != null ? options.Flags : default(WriteFlags); + } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 19f0e3c57f..688f9f6fec 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -75,7 +75,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { Preconditions.CheckArgument(await requestStream.MoveNext()); @@ -131,7 +131,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { Preconditions.CheckArgument(await requestStream.MoveNext()); @@ -187,7 +187,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { var result = await handler(requestStream, context); @@ -247,7 +247,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { await handler(requestStream, responseStream, context); @@ -304,13 +304,14 @@ namespace Grpc.Core.Internal return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } - public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken) + public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken) + where TRequest : class + where TResponse : class { DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime(); - return new ServerCallContext( - newRpc.Method, newRpc.Host, peer, realtimeDeadline, - newRpc.RequestMetadata, cancellationToken); + return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline, + newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream); } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs index 59238a452c..37a4f5256b 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs @@ -42,7 +42,7 @@ namespace Grpc.Core.Internal internal class ServerCredentialsSafeHandle : SafeHandleZeroIsInvalid { [DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)] - static extern ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs); + static extern ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth); [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_server_credentials_release(IntPtr credentials); @@ -51,12 +51,13 @@ namespace Grpc.Core.Internal { } - public static ServerCredentialsSafeHandle CreateSslCredentials(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray) + public static ServerCredentialsSafeHandle CreateSslCredentials(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, bool forceClientAuth) { Preconditions.CheckArgument(keyCertPairCertChainArray.Length == keyCertPairPrivateKeyArray.Length); return grpcsharp_ssl_server_credentials_create(pemRootCerts, keyCertPairCertChainArray, keyCertPairPrivateKeyArray, - new UIntPtr((ulong)keyCertPairCertChainArray.Length)); + new UIntPtr((ulong)keyCertPairCertChainArray.Length), + forceClientAuth); } protected override bool ReleaseHandle() diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 756dcee87f..03e39efc02 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -38,11 +38,12 @@ namespace Grpc.Core.Internal /// <summary> /// Writes responses asynchronously to an underlying AsyncCallServer object. /// </summary> - internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse> + internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions where TRequest : class where TResponse : class { readonly AsyncCallServer<TRequest, TResponse> call; + WriteOptions writeOptions; public ServerResponseStream(AsyncCallServer<TRequest, TResponse> call) { @@ -52,7 +53,7 @@ namespace Grpc.Core.Internal public Task WriteAsync(TResponse message) { var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendMessage(message, taskSource.CompletionDelegate); + call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); return taskSource.Task; } @@ -62,5 +63,31 @@ namespace Grpc.Core.Internal call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); return taskSource.Task; } + + public Task WriteResponseHeadersAsync(Metadata responseHeaders) + { + var taskSource = new AsyncCompletionTaskSource<object>(); + call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate); + return taskSource.Task; + } + + public WriteOptions WriteOptions + { + get + { + return writeOptions; + } + + set + { + writeOptions = value; + } + } + + private WriteFlags GetWriteFlags() + { + var options = writeOptions; + return options != null ? options.Flags : default(WriteFlags); + } } } diff --git a/src/csharp/Grpc.Core/KeyCertificatePair.cs b/src/csharp/Grpc.Core/KeyCertificatePair.cs index 7cea18618e..5def15a656 100644 --- a/src/csharp/Grpc.Core/KeyCertificatePair.cs +++ b/src/csharp/Grpc.Core/KeyCertificatePair.cs @@ -33,7 +33,6 @@ using System; using System.Collections.Generic; -using System.Collections.Immutable; using Grpc.Core.Internal; using Grpc.Core.Utils; diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs index 2f308cbb11..a58dbdbc93 100644 --- a/src/csharp/Grpc.Core/Metadata.cs +++ b/src/csharp/Grpc.Core/Metadata.cs @@ -32,7 +32,6 @@ using System; using System.Collections; using System.Collections.Generic; -using System.Collections.Immutable; using System.Collections.Specialized; using System.Runtime.InteropServices; using System.Text; @@ -115,6 +114,16 @@ namespace Grpc.Core entries.Add(item); } + public void Add(string key, string value) + { + Add(new Entry(key, value)); + } + + public void Add(string key, byte[] valueBytes) + { + Add(new Entry(key, valueBytes)); + } + public void Clear() { CheckWriteable(); diff --git a/src/csharp/Grpc.Core/Method.cs b/src/csharp/Grpc.Core/Method.cs index 77d36191c3..cc047ac9f8 100644 --- a/src/csharp/Grpc.Core/Method.cs +++ b/src/csharp/Grpc.Core/Method.cs @@ -53,16 +53,20 @@ namespace Grpc.Core public class Method<TRequest, TResponse> { readonly MethodType type; + readonly string serviceName; readonly string name; readonly Marshaller<TRequest> requestMarshaller; readonly Marshaller<TResponse> responseMarshaller; + readonly string fullName; - public Method(MethodType type, string name, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller) + public Method(MethodType type, string serviceName, string name, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller) { this.type = type; - this.name = name; - this.requestMarshaller = requestMarshaller; - this.responseMarshaller = responseMarshaller; + this.serviceName = Preconditions.CheckNotNull(serviceName); + this.name = Preconditions.CheckNotNull(name); + this.requestMarshaller = Preconditions.CheckNotNull(requestMarshaller); + this.responseMarshaller = Preconditions.CheckNotNull(responseMarshaller); + this.fullName = GetFullName(serviceName); } public MethodType Type @@ -72,6 +76,14 @@ namespace Grpc.Core return this.type; } } + + public string ServiceName + { + get + { + return this.serviceName; + } + } public string Name { @@ -97,6 +109,14 @@ namespace Grpc.Core } } + public string FullName + { + get + { + return this.fullName; + } + } + /// <summary> /// Gets full name of the method including the service name. /// </summary> diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 3217547cc4..eb5b043d1c 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -32,7 +32,7 @@ #endregion using System; -using System.Collections.Concurrent; +using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Runtime.InteropServices; @@ -48,18 +48,17 @@ namespace Grpc.Core /// </summary> public class Server { - /// <summary> - /// Pass this value as port to have the server choose an unused listening port for you. - /// </summary> - public const int PickUnusedPort = 0; - static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); + readonly ServiceDefinitionCollection serviceDefinitions; + readonly ServerPortCollection ports; readonly GrpcEnvironment environment; readonly List<ChannelOption> options; readonly ServerSafeHandle handle; readonly object myLock = new object(); + readonly List<ServerServiceDefinition> serviceDefinitionsList = new List<ServerServiceDefinition>(); + readonly List<ServerPort> serverPortList = new List<ServerPort>(); readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>(); readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); @@ -72,6 +71,8 @@ namespace Grpc.Core /// <param name="options">Channel options.</param> public Server(IEnumerable<ChannelOption> options = null) { + this.serviceDefinitions = new ServiceDefinitionCollection(this); + this.ports = new ServerPortCollection(this); this.environment = GrpcEnvironment.GetInstance(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) @@ -81,47 +82,26 @@ namespace Grpc.Core } /// <summary> - /// Adds a service definition to the server. This is how you register - /// handlers for a service with the server. - /// Only call this before Start(). + /// Services that will be exported by the server once started. Register a service with this + /// server by adding its definition to this collection. /// </summary> - public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) + public ServiceDefinitionCollection Services { - lock (myLock) + get { - Preconditions.CheckState(!startRequested); - foreach (var entry in serviceDefinition.CallHandlers) - { - callHandlers.Add(entry.Key, entry.Value); - } + return serviceDefinitions; } } /// <summary> - /// Add a port on which server should listen. - /// Only call this before Start(). + /// Ports on which the server will listen once started. Register a port with this + /// server by adding its definition to this collection. /// </summary> - /// <returns>The port on which server will be listening.</returns> - /// <param name="host">the host</param> - /// <param name="port">the port. If zero, an unused port is chosen automatically.</param> - public int AddPort(string host, int port, ServerCredentials credentials) + public ServerPortCollection Ports { - lock (myLock) + get { - Preconditions.CheckNotNull(credentials); - Preconditions.CheckState(!startRequested); - var address = string.Format("{0}:{1}", host, port); - using (var nativeCredentials = credentials.ToNativeCredentials()) - { - if (nativeCredentials != null) - { - return handle.AddSecurePort(address, nativeCredentials); - } - else - { - return handle.AddInsecurePort(address); - } - } + return ports; } } @@ -190,6 +170,50 @@ namespace Grpc.Core } /// <summary> + /// Adds a service definition. + /// </summary> + private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition) + { + lock (myLock) + { + Preconditions.CheckState(!startRequested); + foreach (var entry in serviceDefinition.CallHandlers) + { + callHandlers.Add(entry.Key, entry.Value); + } + serviceDefinitionsList.Add(serviceDefinition); + } + } + + /// <summary> + /// Adds a listening port. + /// </summary> + private int AddPortInternal(ServerPort serverPort) + { + lock (myLock) + { + Preconditions.CheckNotNull(serverPort.Credentials); + Preconditions.CheckState(!startRequested); + var address = string.Format("{0}:{1}", serverPort.Host, serverPort.Port); + int boundPort; + using (var nativeCredentials = serverPort.Credentials.ToNativeCredentials()) + { + if (nativeCredentials != null) + { + boundPort = handle.AddSecurePort(address, nativeCredentials); + } + else + { + boundPort = handle.AddInsecurePort(address); + } + } + var newServerPort = new ServerPort(serverPort, boundPort); + this.serverPortList.Add(newServerPort); + return boundPort; + } + } + + /// <summary> /// Allows one new RPC call to be received by server. /// </summary> private void AllowOneRpc() @@ -249,5 +273,82 @@ namespace Grpc.Core { shutdownTcs.SetResult(null); } + + /// <summary> + /// Collection of service definitions. + /// </summary> + public class ServiceDefinitionCollection : IEnumerable<ServerServiceDefinition> + { + readonly Server server; + + internal ServiceDefinitionCollection(Server server) + { + this.server = server; + } + + /// <summary> + /// Adds a service definition to the server. This is how you register + /// handlers for a service with the server. Only call this before Start(). + /// </summary> + public void Add(ServerServiceDefinition serviceDefinition) + { + server.AddServiceDefinitionInternal(serviceDefinition); + } + + public IEnumerator<ServerServiceDefinition> GetEnumerator() + { + return server.serviceDefinitionsList.GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return server.serviceDefinitionsList.GetEnumerator(); + } + } + + /// <summary> + /// Collection of server ports. + /// </summary> + public class ServerPortCollection : IEnumerable<ServerPort> + { + readonly Server server; + + internal ServerPortCollection(Server server) + { + this.server = server; + } + + /// <summary> + /// Adds a new port on which server should listen. + /// Only call this before Start(). + /// <returns>The port on which server will be listening.</returns> + /// </summary> + public int Add(ServerPort serverPort) + { + return server.AddPortInternal(serverPort); + } + + /// <summary> + /// Adds a new port on which server should listen. + /// <returns>The port on which server will be listening.</returns> + /// </summary> + /// <param name="host">the host</param> + /// <param name="port">the port. If zero, an unused port is chosen automatically.</param> + /// <param name="credentials">credentials to use to secure this port.</param> + public int Add(string host, int port, ServerCredentials credentials) + { + return Add(new ServerPort(host, port, credentials)); + } + + public IEnumerator<ServerPort> GetEnumerator() + { + return server.serverPortList.GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return server.serverPortList.GetEnumerator(); + } + } } } diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index 0c48adaea5..75d81c64f3 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -36,15 +36,16 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using Grpc.Core.Internal; + namespace Grpc.Core { /// <summary> /// Context for a server-side call. /// </summary> - public sealed class ServerCallContext + public class ServerCallContext { - // TODO(jtattermusch): expose method to send initial metadata back to client - + private readonly CallSafeHandle callHandle; private readonly string method; private readonly string host; private readonly string peer; @@ -54,18 +55,37 @@ namespace Grpc.Core private readonly Metadata responseTrailers = new Metadata(); private Status status = Status.DefaultSuccess; + private Func<Metadata, Task> writeHeadersFunc; + private IHasWriteOptions writeOptionsHolder; - public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken) + internal ServerCallContext(CallSafeHandle callHandle, string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, + Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder) { + this.callHandle = callHandle; this.method = method; this.host = host; this.peer = peer; this.deadline = deadline; this.requestHeaders = requestHeaders; this.cancellationToken = cancellationToken; + this.writeHeadersFunc = writeHeadersFunc; + this.writeOptionsHolder = writeOptionsHolder; + } + + public Task WriteResponseHeadersAsync(Metadata responseHeaders) + { + return writeHeadersFunc(responseHeaders); + } + + /// <summary> + /// Creates a propagation token to be used to propagate call context to a child call. + /// </summary> + public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null) + { + return new ContextPropagationToken(callHandle, deadline, cancellationToken, options); } - /// <summary> Name of method called in this RPC. </summary> + /// <summary>Name of method called in this RPC.</summary> public string Method { get @@ -74,7 +94,7 @@ namespace Grpc.Core } } - /// <summary> Name of host called in this RPC. </summary> + /// <summary>Name of host called in this RPC.</summary> public string Host { get @@ -83,7 +103,7 @@ namespace Grpc.Core } } - /// <summary> Address of the remote endpoint in URI format. </summary> + /// <summary>Address of the remote endpoint in URI format.</summary> public string Peer { get @@ -92,7 +112,7 @@ namespace Grpc.Core } } - /// <summary> Deadline for this RPC. </summary> + /// <summary>Deadline for this RPC.</summary> public DateTime Deadline { get @@ -101,7 +121,7 @@ namespace Grpc.Core } } - /// <summary> Initial metadata sent by client. </summary> + /// <summary>Initial metadata sent by client.</summary> public Metadata RequestHeaders { get @@ -110,8 +130,7 @@ namespace Grpc.Core } } - // TODO(jtattermusch): support signalling cancellation. - /// <summary> Cancellation token signals when call is cancelled. </summary> + /// <summary>Cancellation token signals when call is cancelled.</summary> public CancellationToken CancellationToken { get @@ -120,7 +139,7 @@ namespace Grpc.Core } } - /// <summary> Trailers to send back to client after RPC finishes.</summary> + /// <summary>Trailers to send back to client after RPC finishes.</summary> public Metadata ResponseTrailers { get @@ -142,5 +161,31 @@ namespace Grpc.Core status = value; } } + + /// <summary> + /// Allows setting write options for the following write. + /// For streaming response calls, this property is also exposed as on IServerStreamWriter for convenience. + /// Both properties are backed by the same underlying value. + /// </summary> + public WriteOptions WriteOptions + { + get + { + return writeOptionsHolder.WriteOptions; + } + + set + { + writeOptionsHolder.WriteOptions = value; + } + } + } + + /// <summary> + /// Allows sharing write options between ServerCallContext and other objects. + /// </summary> + public interface IHasWriteOptions + { + WriteOptions WriteOptions { get; set; } } } diff --git a/src/csharp/Grpc.Core/ServerCredentials.cs b/src/csharp/Grpc.Core/ServerCredentials.cs index 32ed4b78a1..c11a1ede08 100644 --- a/src/csharp/Grpc.Core/ServerCredentials.cs +++ b/src/csharp/Grpc.Core/ServerCredentials.cs @@ -33,7 +33,6 @@ using System; using System.Collections.Generic; -using System.Collections.Immutable; using Grpc.Core.Internal; using Grpc.Core.Utils; @@ -80,18 +79,26 @@ namespace Grpc.Core { readonly IList<KeyCertificatePair> keyCertificatePairs; readonly string rootCertificates; + readonly bool forceClientAuth; /// <summary> /// Creates server-side SSL credentials. /// </summary> - /// <param name="rootCertificates">PEM encoded client root certificates used to authenticate client.</param> /// <param name="keyCertificatePairs">Key-certificates to use.</param> - public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs, string rootCertificates) + /// <param name="rootCertificates">PEM encoded client root certificates used to authenticate client.</param> + /// <param name="forceClientAuth">If true, client will be rejected unless it proves its unthenticity using against rootCertificates.</param> + public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs, string rootCertificates, bool forceClientAuth) { this.keyCertificatePairs = new List<KeyCertificatePair>(keyCertificatePairs).AsReadOnly(); Preconditions.CheckArgument(this.keyCertificatePairs.Count > 0, "At least one KeyCertificatePair needs to be provided"); + if (forceClientAuth) + { + Preconditions.CheckNotNull(rootCertificates, + "Cannot force client authentication unless you provide rootCertificates."); + } this.rootCertificates = rootCertificates; + this.forceClientAuth = forceClientAuth; } /// <summary> @@ -100,7 +107,7 @@ namespace Grpc.Core /// using client root certificates. /// </summary> /// <param name="keyCertificatePairs">Key-certificates to use.</param> - public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs) : this(keyCertificatePairs, null) + public SslServerCredentials(IEnumerable<KeyCertificatePair> keyCertificatePairs) : this(keyCertificatePairs, null, false) { } @@ -126,6 +133,17 @@ namespace Grpc.Core } } + /// <summary> + /// If true, the authenticity of client check will be enforced. + /// </summary> + public bool ForceClientAuthentication + { + get + { + return this.forceClientAuth; + } + } + internal override ServerCredentialsSafeHandle ToNativeCredentials() { int count = keyCertificatePairs.Count; @@ -136,7 +154,7 @@ namespace Grpc.Core certChains[i] = keyCertificatePairs[i].CertificateChain; keys[i] = keyCertificatePairs[i].PrivateKey; } - return ServerCredentialsSafeHandle.CreateSslCredentials(rootCertificates, certChains, keys); + return ServerCredentialsSafeHandle.CreateSslCredentials(rootCertificates, certChains, keys, forceClientAuth); } } } diff --git a/src/csharp/Grpc.Core/ServerPort.cs b/src/csharp/Grpc.Core/ServerPort.cs new file mode 100644 index 0000000000..55e4bd0062 --- /dev/null +++ b/src/csharp/Grpc.Core/ServerPort.cs @@ -0,0 +1,120 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +using Grpc.Core.Utils; + +namespace Grpc.Core +{ + /// <summary> + /// A port exposed by a server. + /// </summary> + public class ServerPort + { + /// <summary> + /// Pass this value as port to have the server choose an unused listening port for you. + /// Ports added to a server will contain the bound port in their <see cref="BoundPort"/> property. + /// </summary> + public const int PickUnused = 0; + + readonly string host; + readonly int port; + readonly ServerCredentials credentials; + readonly int boundPort; + + /// <summary> + /// Creates a new port on which server should listen. + /// </summary> + /// <returns>The port on which server will be listening.</returns> + /// <param name="host">the host</param> + /// <param name="port">the port. If zero, an unused port is chosen automatically.</param> + /// <param name="credentials">credentials to use to secure this port.</param> + public ServerPort(string host, int port, ServerCredentials credentials) + { + this.host = Preconditions.CheckNotNull(host); + this.port = port; + this.credentials = Preconditions.CheckNotNull(credentials); + } + + /// <summary> + /// Creates a port from an existing <c>ServerPort</c> instance and boundPort value. + /// </summary> + internal ServerPort(ServerPort serverPort, int boundPort) + { + this.host = serverPort.host; + this.port = serverPort.port; + this.credentials = serverPort.credentials; + this.boundPort = boundPort; + } + + /// <value>The host.</value> + public string Host + { + get + { + return host; + } + } + + /// <value>The port.</value> + public int Port + { + get + { + return port; + } + } + + /// <value>The server credentials.</value> + public ServerCredentials Credentials + { + get + { + return credentials; + } + } + + /// <value> + /// The port actually bound by the server. This is useful if you let server + /// pick port automatically. <see cref="PickUnused"/> + /// </value> + public int BoundPort + { + get + { + return boundPort; + } + } + } +} diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs index b180186c12..a00d156e52 100644 --- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs +++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs @@ -33,7 +33,7 @@ using System; using System.Collections.Generic; -using System.Collections.Immutable; +using System.Collections.ObjectModel; using Grpc.Core.Internal; namespace Grpc.Core @@ -43,14 +43,14 @@ namespace Grpc.Core /// </summary> public class ServerServiceDefinition { - readonly ImmutableDictionary<string, IServerCallHandler> callHandlers; + readonly ReadOnlyDictionary<string, IServerCallHandler> callHandlers; - private ServerServiceDefinition(ImmutableDictionary<string, IServerCallHandler> callHandlers) + private ServerServiceDefinition(Dictionary<string, IServerCallHandler> callHandlers) { - this.callHandlers = callHandlers; + this.callHandlers = new ReadOnlyDictionary<string, IServerCallHandler>(callHandlers); } - internal ImmutableDictionary<string, IServerCallHandler> CallHandlers + internal IDictionary<string, IServerCallHandler> CallHandlers { get { @@ -115,7 +115,7 @@ namespace Grpc.Core public ServerServiceDefinition Build() { - return new ServerServiceDefinition(callHandlers.ToImmutableDictionary()); + return new ServerServiceDefinition(callHandlers); } } } diff --git a/src/csharp/Grpc.Core/Version.cs b/src/csharp/Grpc.Core/Version.cs index b5cb652945..d2a029fbb4 100644 --- a/src/csharp/Grpc.Core/Version.cs +++ b/src/csharp/Grpc.Core/Version.cs @@ -2,4 +2,4 @@ using System.Reflection; using System.Runtime.CompilerServices; // The current version of gRPC C#. -[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".*")] +[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".0")] diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index 656a3d47bb..939372e237 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -8,6 +8,6 @@ namespace Grpc.Core /// <summary> /// Current version of gRPC /// </summary> - public const string CurrentVersion = "0.6.0"; + public const string CurrentVersion = "0.6.1"; } } diff --git a/src/csharp/Grpc.Core/WriteOptions.cs b/src/csharp/Grpc.Core/WriteOptions.cs new file mode 100644 index 0000000000..7ef3189d76 --- /dev/null +++ b/src/csharp/Grpc.Core/WriteOptions.cs @@ -0,0 +1,82 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + /// <summary> + /// Flags for write operations. + /// </summary> + [Flags] + public enum WriteFlags + { + /// <summary> + /// Hint that the write may be buffered and need not go out on the wire immediately. + /// gRPC is free to buffer the message until the next non-buffered + /// write, or until write stream completion, but it need not buffer completely or at all. + /// </summary> + BufferHint = 0x1, + + /// <summary> + /// Force compression to be disabled for a particular write. + /// </summary> + NoCompress = 0x2 + } + + /// <summary> + /// Options for write operations. + /// </summary> + public class WriteOptions + { + /// <summary> + /// Default write options. + /// </summary> + public static readonly WriteOptions Default = new WriteOptions(); + + private WriteFlags flags; + + public WriteOptions(WriteFlags flags = default(WriteFlags)) + { + this.flags = flags; + } + + public WriteFlags Flags + { + get + { + return this.flags; + } + } + } +} diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config index 6cdcdf2656..9b12b9b096 100644 --- a/src/csharp/Grpc.Core/packages.config +++ b/src/csharp/Grpc.Core/packages.config @@ -3,5 +3,4 @@ <package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" /> <package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" /> <package id="Ix-Async" version="1.2.3" targetFramework="net45" /> - <package id="System.Collections.Immutable" version="1.1.36" targetFramework="net45" /> </packages>
\ No newline at end of file |