aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mehrdad Afshari <mmx@google.com>2018-02-11 20:10:29 -0800
committerGravatar Mehrdad Afshari <mmx@google.com>2018-02-21 18:30:19 -0800
commitb8e362455458a3697fa7194c9c084584467b5b80 (patch)
tree75eebd6a144629eff80d3481da81c6da30a3b6ea
parent6cde06129f5e100dd94b0318c97e863d5e02c4b1 (diff)
Add C# client-side interceptor machinery
-rw-r--r--src/csharp/Grpc.Core/ClientBase.cs62
-rw-r--r--src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs137
-rw-r--r--src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs54
-rw-r--r--src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs119
4 files changed, 252 insertions, 120 deletions
diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs
index 2d41b29fa0..d64ce7dd94 100644
--- a/src/csharp/Grpc.Core/ClientBase.cs
+++ b/src/csharp/Grpc.Core/ClientBase.cs
@@ -16,6 +16,8 @@
#endregion
+using System;
+using Grpc.Core.Interceptors;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
@@ -147,6 +149,64 @@ namespace Grpc.Core
/// </summary>
protected internal class ClientBaseConfiguration
{
+ private class ClientHeaderInterceptor : Interceptor
+ {
+ readonly Func<IMethod, string, CallOptions, Tuple<string, CallOptions>> interceptor;
+
+ /// <summary>
+ /// Creates a new instance of ClientHeaderInterceptor given the specified header interceptor function.
+ /// </summary>
+ public ClientHeaderInterceptor(Func<IMethod, string, CallOptions, Tuple<string, CallOptions>> interceptor)
+ {
+ this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor");
+ }
+
+ /// <summary>
+ /// Intercepts a blocking invocation of a simple remote call.
+ /// </summary>
+ public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
+ {
+ var newHeaders = interceptor(context.Method, context.Host, context.Options);
+ return continuation(request, new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHeaders.Item1, newHeaders.Item2));
+ }
+
+ /// <summary>
+ /// Intercepts an asynchronous invocation of a simple remote call.
+ /// </summary>
+ public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
+ {
+ var newHeaders = interceptor(context.Method, context.Host, context.Options);
+ return continuation(request, new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHeaders.Item1, newHeaders.Item2));
+ }
+
+ /// <summary>
+ /// Intercepts an asynchronous invocation of a streaming remote call.
+ /// </summary>
+ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation)
+ {
+ var newHeaders = interceptor(context.Method, context.Host, context.Options);
+ return continuation(request, new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHeaders.Item1, newHeaders.Item2));
+ }
+
+ /// <summary>
+ /// Intercepts an asynchronous invocation of a client streaming call.
+ /// </summary>
+ public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation)
+ {
+ var newHeaders = interceptor(context.Method, context.Host, context.Options);
+ return continuation(new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHeaders.Item1, newHeaders.Item2));
+ }
+
+ /// <summary>
+ /// Intercepts an asynchronous invocation of a duplex streaming call.
+ /// </summary>
+ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation)
+ {
+ var newHeaders = interceptor(context.Method, context.Host, context.Options);
+ return continuation(new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHeaders.Item1, newHeaders.Item2));
+ }
+ }
+
readonly CallInvoker undecoratedCallInvoker;
readonly string host;
@@ -158,7 +218,7 @@ namespace Grpc.Core
internal CallInvoker CreateDecoratedCallInvoker()
{
- return new InterceptingCallInvoker(undecoratedCallInvoker, hostInterceptor: (h) => host);
+ return undecoratedCallInvoker.Intercept(new ClientHeaderInterceptor((method, host, options) => Tuple.Create(this.host, options)));
}
internal ClientBaseConfiguration WithHost(string host)
diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs
new file mode 100644
index 0000000000..26e9f8802d
--- /dev/null
+++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs
@@ -0,0 +1,137 @@
+#region Copyright notice and license
+
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Linq;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Interceptors
+{
+ /// <summary>
+ /// Extends the CallInvoker class to provide the interceptor facility on the client side.
+ /// This is an EXPERIMENTAL API.
+ /// </summary>
+ public static class CallInvokerExtensions
+ {
+ /// <summary>
+ /// Decorates an underlying <see cref="Grpc.Core.CallInvoker" /> to
+ /// intercept calls through a given interceptor.
+ /// </summary>
+ private class InterceptingCallInvoker : CallInvoker
+ {
+ readonly CallInvoker invoker;
+ readonly Interceptor interceptor;
+
+ /// <summary>
+ /// Creates a new instance of <see cref="Grpc.Core.Interceptors.CallInvokerExtensions.InterceptingCallInvoker" />
+ /// with the given underlying invoker and interceptor instances.
+ /// </summary>
+ public InterceptingCallInvoker(CallInvoker invoker, Interceptor interceptor)
+ {
+ this.invoker = GrpcPreconditions.CheckNotNull(invoker, "invoker");
+ this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor");
+ }
+
+ /// <summary>
+ /// Intercepts a simple blocking call with the registered interceptor.
+ /// </summary>
+ public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
+ {
+ return interceptor.BlockingUnaryCall(
+ request,
+ new ClientInterceptorContext<TRequest, TResponse>(method, host, options),
+ (req, ctx) => invoker.BlockingUnaryCall(ctx.Method, ctx.Host, ctx.Options, req));
+ }
+
+ /// <summary>
+ /// Intercepts a simple asynchronous call with the registered interceptor.
+ /// </summary>
+ public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
+ {
+ return interceptor.AsyncUnaryCall(
+ request,
+ new ClientInterceptorContext<TRequest, TResponse>(method, host, options),
+ (req, ctx) => invoker.AsyncUnaryCall(ctx.Method, ctx.Host, ctx.Options, req));
+ }
+
+ /// <summary>
+ /// Intercepts an asynchronous server streaming call with the registered interceptor.
+ /// </summary>
+ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
+ {
+ return interceptor.AsyncServerStreamingCall(
+ request,
+ new ClientInterceptorContext<TRequest, TResponse>(method, host, options),
+ (req, ctx) => invoker.AsyncServerStreamingCall(ctx.Method, ctx.Host, ctx.Options, req));
+ }
+
+ /// <summary>
+ /// Intercepts an asynchronous client streaming call with the registered interceptor.
+ /// </summary>
+ public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options)
+ {
+ return interceptor.AsyncClientStreamingCall(
+ new ClientInterceptorContext<TRequest, TResponse>(method, host, options),
+ ctx => invoker.AsyncClientStreamingCall(ctx.Method, ctx.Host, ctx.Options));
+ }
+
+ /// <summary>
+ /// Intercepts an asynchronous duplex streaming call with the registered interceptor.
+ /// </summary>
+ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options)
+ {
+ return interceptor.AsyncDuplexStreamingCall(
+ new ClientInterceptorContext<TRequest, TResponse>(method, host, options),
+ ctx => invoker.AsyncDuplexStreamingCall(ctx.Method, ctx.Host, ctx.Options));
+ }
+ }
+
+ /// <summary>
+ /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts
+ /// the invoker with the given interceptor.
+ /// </summary>
+ /// <param name="invoker">The underlying invoker to intercept.</param>
+ /// <param name="interceptor">The interceptor to intercept calls to the invoker with.</param>
+ public static CallInvoker Intercept(this CallInvoker invoker, Interceptor interceptor)
+ {
+ return new InterceptingCallInvoker(invoker, interceptor);
+ }
+
+ /// <summary>
+ /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts
+ /// the invoker with the given interceptors.
+ /// </summary>
+ /// <param name="invoker">The channel to intercept.</param>
+ /// <param name="interceptors">
+ /// An array of interceptors to intercept the calls to the invoker with.
+ /// Control is passed to the interceptors in the order specified.
+ /// </param>
+ public static CallInvoker Intercept(this CallInvoker invoker, params Interceptor[] interceptors)
+ {
+ GrpcPreconditions.CheckNotNull(invoker, "invoker");
+ GrpcPreconditions.CheckNotNull(interceptors, "interceptors");
+
+ foreach (var interceptor in interceptors.Reverse())
+ {
+ invoker = Intercept(invoker, interceptor);
+ }
+
+ return invoker;
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs
new file mode 100644
index 0000000000..1a54b93dae
--- /dev/null
+++ b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs
@@ -0,0 +1,54 @@
+#region Copyright notice and license
+
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core.Interceptors
+{
+ /// <summary>
+ /// Provides extension methods to make it easy to register interceptors on Channel objects.
+ /// This is an EXPERIMENTAL API.
+ /// </summary>
+ public static class ChannelExtensions
+ {
+ /// <summary>
+ /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts
+ /// the channel with the given interceptor.
+ /// </summary>
+ /// <param name="channel">The channel to intercept.</param>
+ /// <param name="interceptor">The interceptor to intercept the channel with.</param>
+ public static CallInvoker Intercept(this Channel channel, Interceptor interceptor)
+ {
+ return new DefaultCallInvoker(channel).Intercept(interceptor);
+ }
+
+ /// <summary>
+ /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts
+ /// the channel with the given interceptors.
+ /// </summary>
+ /// <param name="channel">The channel to intercept.</param>
+ /// <param name="interceptors">
+ /// An array of interceptors to intercept the channel with.
+ /// Control is passed to the interceptors in the order specified.
+ /// </param>
+ public static CallInvoker Intercept(this Channel channel, params Interceptor[] interceptors)
+ {
+ return new DefaultCallInvoker(channel).Intercept(interceptors);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs b/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs
deleted file mode 100644
index eb4c7d97a7..0000000000
--- a/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs
+++ /dev/null
@@ -1,119 +0,0 @@
-#region Copyright notice and license
-
-// Copyright 2015-2016 gRPC authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#endregion
-
-using System;
-using System.Threading.Tasks;
-using Grpc.Core;
-using Grpc.Core.Utils;
-
-namespace Grpc.Core.Internal
-{
- /// <summary>
- /// Decorates an underlying <c>CallInvoker</c> to intercept call invocations.
- /// </summary>
- internal class InterceptingCallInvoker : CallInvoker
- {
- readonly CallInvoker callInvoker;
- readonly Func<string, string> hostInterceptor;
- readonly Func<CallOptions, CallOptions> callOptionsInterceptor;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="Grpc.Core.Internal.InterceptingCallInvoker"/> class.
- /// </summary>
- public InterceptingCallInvoker(CallInvoker callInvoker,
- Func<string, string> hostInterceptor = null,
- Func<CallOptions, CallOptions> callOptionsInterceptor = null)
- {
- this.callInvoker = GrpcPreconditions.CheckNotNull(callInvoker);
- this.hostInterceptor = hostInterceptor;
- this.callOptionsInterceptor = callOptionsInterceptor;
- }
-
- /// <summary>
- /// Intercepts a unary call.
- /// </summary>
- public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
- {
- host = InterceptHost(host);
- options = InterceptCallOptions(options);
- return callInvoker.BlockingUnaryCall(method, host, options, request);
- }
-
- /// <summary>
- /// Invokes a simple remote call asynchronously.
- /// </summary>
- public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
- {
- host = InterceptHost(host);
- options = InterceptCallOptions(options);
- return callInvoker.AsyncUnaryCall(method, host, options, request);
- }
-
- /// <summary>
- /// Invokes a server streaming call asynchronously.
- /// In server streaming scenario, client sends on request and server responds with a stream of responses.
- /// </summary>
- public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
- {
- host = InterceptHost(host);
- options = InterceptCallOptions(options);
- return callInvoker.AsyncServerStreamingCall(method, host, options, request);
- }
-
- /// <summary>
- /// Invokes a client streaming call asynchronously.
- /// In client streaming scenario, client sends a stream of requests and server responds with a single response.
- /// </summary>
- public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options)
- {
- host = InterceptHost(host);
- options = InterceptCallOptions(options);
- return callInvoker.AsyncClientStreamingCall(method, host, options);
- }
-
- /// <summary>
- /// Invokes a duplex streaming call asynchronously.
- /// In duplex streaming scenario, client sends a stream of requests and server responds with a stream of responses.
- /// The response stream is completely independent and both side can be sending messages at the same time.
- /// </summary>
- public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options)
- {
- host = InterceptHost(host);
- options = InterceptCallOptions(options);
- return callInvoker.AsyncDuplexStreamingCall(method, host, options);
- }
-
- private string InterceptHost(string host)
- {
- if (hostInterceptor == null)
- {
- return host;
- }
- return hostInterceptor(host);
- }
-
- private CallOptions InterceptCallOptions(CallOptions options)
- {
- if (callOptionsInterceptor == null)
- {
- return options;
- }
- return callOptionsInterceptor(options);
- }
- }
-}