diff options
author | Mehrdad Afshari <mmx@google.com> | 2018-02-11 20:10:29 -0800 |
---|---|---|
committer | Mehrdad Afshari <mmx@google.com> | 2018-02-21 18:30:19 -0800 |
commit | b8e362455458a3697fa7194c9c084584467b5b80 (patch) | |
tree | 75eebd6a144629eff80d3481da81c6da30a3b6ea | |
parent | 6cde06129f5e100dd94b0318c97e863d5e02c4b1 (diff) |
Add C# client-side interceptor machinery
-rw-r--r-- | src/csharp/Grpc.Core/ClientBase.cs | 62 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs | 137 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs | 54 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs | 119 |
4 files changed, 252 insertions, 120 deletions
diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index 2d41b29fa0..d64ce7dd94 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -16,6 +16,8 @@ #endregion +using System; +using Grpc.Core.Interceptors; using Grpc.Core.Internal; using Grpc.Core.Utils; @@ -147,6 +149,64 @@ namespace Grpc.Core /// </summary> protected internal class ClientBaseConfiguration { + private class ClientHeaderInterceptor : Interceptor + { + readonly Func<IMethod, string, CallOptions, Tuple<string, CallOptions>> interceptor; + + /// <summary> + /// Creates a new instance of ClientHeaderInterceptor given the specified header interceptor function. + /// </summary> + public ClientHeaderInterceptor(Func<IMethod, string, CallOptions, Tuple<string, CallOptions>> interceptor) + { + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + } + + /// <summary> + /// Intercepts a blocking invocation of a simple remote call. + /// </summary> + public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, BlockingUnaryCallContinuation<TRequest, TResponse> continuation) + { + var newHeaders = interceptor(context.Method, context.Host, context.Options); + return continuation(request, new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHeaders.Item1, newHeaders.Item2)); + } + + /// <summary> + /// Intercepts an asynchronous invocation of a simple remote call. + /// </summary> + public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncUnaryCallContinuation<TRequest, TResponse> continuation) + { + var newHeaders = interceptor(context.Method, context.Host, context.Options); + return continuation(request, new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHeaders.Item1, newHeaders.Item2)); + } + + /// <summary> + /// Intercepts an asynchronous invocation of a streaming remote call. + /// </summary> + public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation) + { + var newHeaders = interceptor(context.Method, context.Host, context.Options); + return continuation(request, new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHeaders.Item1, newHeaders.Item2)); + } + + /// <summary> + /// Intercepts an asynchronous invocation of a client streaming call. + /// </summary> + public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation) + { + var newHeaders = interceptor(context.Method, context.Host, context.Options); + return continuation(new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHeaders.Item1, newHeaders.Item2)); + } + + /// <summary> + /// Intercepts an asynchronous invocation of a duplex streaming call. + /// </summary> + public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation) + { + var newHeaders = interceptor(context.Method, context.Host, context.Options); + return continuation(new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHeaders.Item1, newHeaders.Item2)); + } + } + readonly CallInvoker undecoratedCallInvoker; readonly string host; @@ -158,7 +218,7 @@ namespace Grpc.Core internal CallInvoker CreateDecoratedCallInvoker() { - return new InterceptingCallInvoker(undecoratedCallInvoker, hostInterceptor: (h) => host); + return undecoratedCallInvoker.Intercept(new ClientHeaderInterceptor((method, host, options) => Tuple.Create(this.host, options))); } internal ClientBaseConfiguration WithHost(string host) diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs new file mode 100644 index 0000000000..26e9f8802d --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs @@ -0,0 +1,137 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Linq; +using Grpc.Core.Utils; + +namespace Grpc.Core.Interceptors +{ + /// <summary> + /// Extends the CallInvoker class to provide the interceptor facility on the client side. + /// This is an EXPERIMENTAL API. + /// </summary> + public static class CallInvokerExtensions + { + /// <summary> + /// Decorates an underlying <see cref="Grpc.Core.CallInvoker" /> to + /// intercept calls through a given interceptor. + /// </summary> + private class InterceptingCallInvoker : CallInvoker + { + readonly CallInvoker invoker; + readonly Interceptor interceptor; + + /// <summary> + /// Creates a new instance of <see cref="Grpc.Core.Interceptors.CallInvokerExtensions.InterceptingCallInvoker" /> + /// with the given underlying invoker and interceptor instances. + /// </summary> + public InterceptingCallInvoker(CallInvoker invoker, Interceptor interceptor) + { + this.invoker = GrpcPreconditions.CheckNotNull(invoker, "invoker"); + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + } + + /// <summary> + /// Intercepts a simple blocking call with the registered interceptor. + /// </summary> + public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) + { + return interceptor.BlockingUnaryCall( + request, + new ClientInterceptorContext<TRequest, TResponse>(method, host, options), + (req, ctx) => invoker.BlockingUnaryCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// <summary> + /// Intercepts a simple asynchronous call with the registered interceptor. + /// </summary> + public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) + { + return interceptor.AsyncUnaryCall( + request, + new ClientInterceptorContext<TRequest, TResponse>(method, host, options), + (req, ctx) => invoker.AsyncUnaryCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// <summary> + /// Intercepts an asynchronous server streaming call with the registered interceptor. + /// </summary> + public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) + { + return interceptor.AsyncServerStreamingCall( + request, + new ClientInterceptorContext<TRequest, TResponse>(method, host, options), + (req, ctx) => invoker.AsyncServerStreamingCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// <summary> + /// Intercepts an asynchronous client streaming call with the registered interceptor. + /// </summary> + public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) + { + return interceptor.AsyncClientStreamingCall( + new ClientInterceptorContext<TRequest, TResponse>(method, host, options), + ctx => invoker.AsyncClientStreamingCall(ctx.Method, ctx.Host, ctx.Options)); + } + + /// <summary> + /// Intercepts an asynchronous duplex streaming call with the registered interceptor. + /// </summary> + public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) + { + return interceptor.AsyncDuplexStreamingCall( + new ClientInterceptorContext<TRequest, TResponse>(method, host, options), + ctx => invoker.AsyncDuplexStreamingCall(ctx.Method, ctx.Host, ctx.Options)); + } + } + + /// <summary> + /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts + /// the invoker with the given interceptor. + /// </summary> + /// <param name="invoker">The underlying invoker to intercept.</param> + /// <param name="interceptor">The interceptor to intercept calls to the invoker with.</param> + public static CallInvoker Intercept(this CallInvoker invoker, Interceptor interceptor) + { + return new InterceptingCallInvoker(invoker, interceptor); + } + + /// <summary> + /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts + /// the invoker with the given interceptors. + /// </summary> + /// <param name="invoker">The channel to intercept.</param> + /// <param name="interceptors"> + /// An array of interceptors to intercept the calls to the invoker with. + /// Control is passed to the interceptors in the order specified. + /// </param> + public static CallInvoker Intercept(this CallInvoker invoker, params Interceptor[] interceptors) + { + GrpcPreconditions.CheckNotNull(invoker, "invoker"); + GrpcPreconditions.CheckNotNull(interceptors, "interceptors"); + + foreach (var interceptor in interceptors.Reverse()) + { + invoker = Intercept(invoker, interceptor); + } + + return invoker; + } + } +} diff --git a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs new file mode 100644 index 0000000000..1a54b93dae --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs @@ -0,0 +1,54 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; + +namespace Grpc.Core.Interceptors +{ + /// <summary> + /// Provides extension methods to make it easy to register interceptors on Channel objects. + /// This is an EXPERIMENTAL API. + /// </summary> + public static class ChannelExtensions + { + /// <summary> + /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts + /// the channel with the given interceptor. + /// </summary> + /// <param name="channel">The channel to intercept.</param> + /// <param name="interceptor">The interceptor to intercept the channel with.</param> + public static CallInvoker Intercept(this Channel channel, Interceptor interceptor) + { + return new DefaultCallInvoker(channel).Intercept(interceptor); + } + + /// <summary> + /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts + /// the channel with the given interceptors. + /// </summary> + /// <param name="channel">The channel to intercept.</param> + /// <param name="interceptors"> + /// An array of interceptors to intercept the channel with. + /// Control is passed to the interceptors in the order specified. + /// </param> + public static CallInvoker Intercept(this Channel channel, params Interceptor[] interceptors) + { + return new DefaultCallInvoker(channel).Intercept(interceptors); + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs b/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs deleted file mode 100644 index eb4c7d97a7..0000000000 --- a/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs +++ /dev/null @@ -1,119 +0,0 @@ -#region Copyright notice and license - -// Copyright 2015-2016 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#endregion - -using System; -using System.Threading.Tasks; -using Grpc.Core; -using Grpc.Core.Utils; - -namespace Grpc.Core.Internal -{ - /// <summary> - /// Decorates an underlying <c>CallInvoker</c> to intercept call invocations. - /// </summary> - internal class InterceptingCallInvoker : CallInvoker - { - readonly CallInvoker callInvoker; - readonly Func<string, string> hostInterceptor; - readonly Func<CallOptions, CallOptions> callOptionsInterceptor; - - /// <summary> - /// Initializes a new instance of the <see cref="Grpc.Core.Internal.InterceptingCallInvoker"/> class. - /// </summary> - public InterceptingCallInvoker(CallInvoker callInvoker, - Func<string, string> hostInterceptor = null, - Func<CallOptions, CallOptions> callOptionsInterceptor = null) - { - this.callInvoker = GrpcPreconditions.CheckNotNull(callInvoker); - this.hostInterceptor = hostInterceptor; - this.callOptionsInterceptor = callOptionsInterceptor; - } - - /// <summary> - /// Intercepts a unary call. - /// </summary> - public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.BlockingUnaryCall(method, host, options, request); - } - - /// <summary> - /// Invokes a simple remote call asynchronously. - /// </summary> - public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncUnaryCall(method, host, options, request); - } - - /// <summary> - /// Invokes a server streaming call asynchronously. - /// In server streaming scenario, client sends on request and server responds with a stream of responses. - /// </summary> - public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncServerStreamingCall(method, host, options, request); - } - - /// <summary> - /// Invokes a client streaming call asynchronously. - /// In client streaming scenario, client sends a stream of requests and server responds with a single response. - /// </summary> - public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncClientStreamingCall(method, host, options); - } - - /// <summary> - /// Invokes a duplex streaming call asynchronously. - /// In duplex streaming scenario, client sends a stream of requests and server responds with a stream of responses. - /// The response stream is completely independent and both side can be sending messages at the same time. - /// </summary> - public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncDuplexStreamingCall(method, host, options); - } - - private string InterceptHost(string host) - { - if (hostInterceptor == null) - { - return host; - } - return hostInterceptor(host); - } - - private CallOptions InterceptCallOptions(CallOptions options) - { - if (callOptionsInterceptor == null) - { - return options; - } - return callOptionsInterceptor(options); - } - } -} |