diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Channel.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Channel.cs | 68 |
1 files changed, 54 insertions, 14 deletions
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 93a6e6a3d9..4f29c35b32 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -31,7 +31,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -56,6 +55,7 @@ namespace Grpc.Core readonly string target; readonly GrpcEnvironment environment; + readonly CompletionQueueSafeHandle completionQueue; readonly ChannelSafeHandle handle; readonly Dictionary<string, ChannelOption> options; @@ -67,14 +67,26 @@ namespace Grpc.Core /// </summary> /// <param name="target">Target of the channel.</param> /// <param name="credentials">Credentials to secure the channel.</param> + public Channel(string target, ChannelCredentials credentials) : + this(target, credentials, null) + { + } + + /// <summary> + /// Creates a channel that connects to a specific host. + /// Port will default to 80 for an unsecure channel and to 443 for a secure channel. + /// </summary> + /// <param name="target">Target of the channel.</param> + /// <param name="credentials">Credentials to secure the channel.</param> /// <param name="options">Channel options.</param> - public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options = null) + public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options) { this.target = GrpcPreconditions.CheckNotNull(target, "target"); this.options = CreateOptionsDictionary(options); EnsureUserAgentChannelOption(this.options); this.environment = GrpcEnvironment.AddRef(); + this.completionQueue = this.environment.PickCompletionQueue(); using (var nativeCredentials = credentials.ToNativeCredentials()) using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values)) { @@ -87,6 +99,18 @@ namespace Grpc.Core this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs); } } + GrpcEnvironment.RegisterChannel(this); + } + + /// <summary> + /// Creates a channel that connects to a specific host and port. + /// </summary> + /// <param name="host">The name or IP address of the host.</param> + /// <param name="port">The port.</param> + /// <param name="credentials">Credentials to secure the channel.</param> + public Channel(string host, int port, ChannelCredentials credentials) : + this(host, port, credentials, null) + { } /// <summary> @@ -96,14 +120,14 @@ namespace Grpc.Core /// <param name="port">The port.</param> /// <param name="credentials">Credentials to secure the channel.</param> /// <param name="options">Channel options.</param> - public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options = null) : + public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options) : this(string.Format("{0}:{1}", host, port), credentials, options) { } /// <summary> /// Gets current connectivity state of this channel. - /// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned. + /// After channel is has been shutdown, <c>ChannelState.Shutdown</c> will be returned. /// </summary> public ChannelState State { @@ -120,8 +144,8 @@ namespace Grpc.Core /// </summary> public Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null) { - GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.FatalFailure, - "FatalFailure is a terminal state. No further state changes can occur."); + GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.Shutdown, + "Shutdown is a terminal state. No further state changes can occur."); var tcs = new TaskCompletionSource<object>(); var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture; var handler = new BatchCompletionDelegate((success, ctx) => @@ -135,7 +159,7 @@ namespace Grpc.Core tcs.SetCanceled(); } }); - handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler); + handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler); return tcs.Task; } @@ -171,7 +195,7 @@ namespace Grpc.Core /// <summary> /// Allows explicitly requesting channel to connect without starting an RPC. /// Returned task completes once state Ready was seen. If the deadline is reached, - /// or channel enters the FatalFailure state, the task is cancelled. + /// or channel enters the Shutdown state, the task is cancelled. /// There is no need to call this explicitly unless your use case requires that. /// Starting an RPC on a new channel will request connection implicitly. /// </summary> @@ -181,9 +205,9 @@ namespace Grpc.Core var currentState = GetConnectivityState(true); while (currentState != ChannelState.Ready) { - if (currentState == ChannelState.FatalFailure) + if (currentState == ChannelState.Shutdown) { - throw new OperationCanceledException("Channel has reached FatalFailure state."); + throw new OperationCanceledException("Channel has reached Shutdown state."); } await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false); currentState = GetConnectivityState(false); @@ -191,9 +215,16 @@ namespace Grpc.Core } /// <summary> - /// Waits until there are no more active calls for this channel and then cleans up - /// resources used by this channel. + /// Shuts down the channel cleanly. It is strongly recommended to shutdown + /// all previously created channels before exiting from the process. /// </summary> + /// <remarks> + /// This method doesn't wait for all calls on this channel to finish (nor does + /// it explicitly cancel all outstanding calls). It is user's responsibility to make sure + /// all the calls on this channel have finished (successfully or with an error) + /// before shutting down the channel to ensure channel shutdown won't impact + /// the outcome of those remote calls. + /// </remarks> public async Task ShutdownAsync() { lock (myLock) @@ -201,6 +232,7 @@ namespace Grpc.Core GrpcPreconditions.CheckState(!shutdownRequested); shutdownRequested = true; } + GrpcEnvironment.UnregisterChannel(this); shutdownTokenSource.Cancel(); @@ -212,7 +244,7 @@ namespace Grpc.Core handle.Dispose(); - await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false); + await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false); } internal ChannelSafeHandle Handle @@ -231,6 +263,14 @@ namespace Grpc.Core } } + internal CompletionQueueSafeHandle CompletionQueue + { + get + { + return this.completionQueue; + } + } + internal void AddCallReference(object call) { activeCallCounter.Increment(); @@ -255,7 +295,7 @@ namespace Grpc.Core } catch (ObjectDisposedException) { - return ChannelState.FatalFailure; + return ChannelState.Shutdown; } } |