diff options
Diffstat (limited to 'src')
19 files changed, 1395 insertions, 162 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 174a15b447..9a8f25b630 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -446,7 +446,6 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { chand->lb_policy->UpdateLocked(*chand->resolver_result); } else { // Instantiate new LB policy. - lb_policy_created = true; grpc_core::LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = chand->combiner; lb_policy_args.client_channel_factory = chand->client_channel_factory; @@ -458,6 +457,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); } else { + lb_policy_created = true; reresolution_request_args* args = static_cast<reresolution_request_args*>( gpr_zalloc(sizeof(*args))); diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.cc b/src/core/ext/transport/chttp2/transport/bin_decoder.cc index 91980e6974..831a36961e 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.cc +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.cc @@ -75,6 +75,32 @@ static bool input_is_valid(uint8_t* input_ptr, size_t length) { #define COMPOSE_OUTPUT_BYTE_2(input_ptr) \ (uint8_t)((decode_table[input_ptr[2]] << 6) | decode_table[input_ptr[3]]) +// By RFC 4648, if the length of the encoded string without padding is 4n+r, +// the length of decoded string is: 1) 3n if r = 0, 2) 3n + 1 if r = 2, 3, or +// 3) invalid if r = 1. +size_t grpc_chttp2_base64_infer_length_after_decode(const grpc_slice& slice) { + size_t len = GRPC_SLICE_LENGTH(slice); + const uint8_t* bytes = GRPC_SLICE_START_PTR(slice); + while (len > 0 && bytes[len - 1] == '=') { + len--; + } + if (GRPC_SLICE_LENGTH(slice) - len > 2) { + gpr_log(GPR_ERROR, + "Base64 decoding failed. Input has more than 2 paddings."); + return 0; + } + size_t tuples = len / 4; + size_t tail_case = len % 4; + if (tail_case == 1) { + gpr_log(GPR_ERROR, + "Base64 decoding failed. Input has a length of %zu (without" + " padding), which is invalid.\n", + len); + return 0; + } + return tuples * 3 + tail_xtra[tail_case]; +} + bool grpc_base64_decode_partial(struct grpc_base64_decode_context* ctx) { size_t input_tail; diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.h b/src/core/ext/transport/chttp2/transport/bin_decoder.h index 9cb75ccd81..a0d74fb20d 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.h +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.h @@ -48,4 +48,7 @@ grpc_slice grpc_chttp2_base64_decode(grpc_slice input); grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input, size_t output_length); +/* Infer the length of decoded data from encoded data. */ +size_t grpc_chttp2_base64_infer_length_after_decode(const grpc_slice& slice); + #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H */ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 8904c122a7..c367f9c465 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -24,6 +24,8 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/ext/transport/chttp2/transport/bin_decoder.h" +#include "src/core/ext/transport/chttp2/transport/bin_encoder.h" #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include "src/core/ext/transport/cronet/transport/cronet_transport.h" #include "src/core/lib/gpr/host_port.h" @@ -393,6 +395,29 @@ static void execute_from_storage(stream_obj* s) { gpr_mu_unlock(&s->mu); } +static void convert_cronet_array_to_metadata( + const bidirectional_stream_header_array* header_array, + grpc_chttp2_incoming_metadata_buffer* mds) { + for (size_t i = 0; i < header_array->count; i++) { + CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s", + header_array->headers[i].key, header_array->headers[i].value); + grpc_slice key = grpc_slice_intern( + grpc_slice_from_static_string(header_array->headers[i].key)); + grpc_slice value; + if (grpc_is_binary_header(key)) { + value = grpc_slice_from_static_string(header_array->headers[i].value); + value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length( + value, grpc_chttp2_base64_infer_length_after_decode(value))); + } else { + value = grpc_slice_intern( + grpc_slice_from_static_string(header_array->headers[i].value)); + } + GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata", + grpc_chttp2_incoming_metadata_buffer_add( + mds, grpc_mdelem_from_slices(key, value))); + } +} + /* Cronet callback */ @@ -517,16 +542,7 @@ static void on_response_headers_received( sizeof(s->state.rs.initial_metadata)); grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, s->arena); - for (size_t i = 0; i < headers->count; i++) { - GRPC_LOG_IF_ERROR("on_response_headers_received", - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.initial_metadata, - grpc_mdelem_from_slices( - grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].key)), - grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].value))))); - } + convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata); s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || s->state.state_callback_received[OP_FAILED])) { @@ -621,18 +637,11 @@ static void on_response_trailers_received( s->state.rs.trailing_metadata_valid = false; grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, s->arena); - for (size_t i = 0; i < trailers->count; i++) { - CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, - trailers->headers[i].value); - GRPC_LOG_IF_ERROR("on_response_trailers_received", - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices( - grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].key)), - grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].value))))); + convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata); + if (trailers->count > 0) { s->state.rs.trailing_metadata_valid = true; + } + for (size_t i = 0; i < trailers->count; i++) { if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { s->state.fail_state = true; @@ -721,7 +730,14 @@ static void convert_metadata_to_cronet_headers( grpc_mdelem mdelem = curr->md; curr = curr->next; char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem)); - char* value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem)); + char* value; + if (grpc_is_binary_header(GRPC_MDKEY(mdelem))) { + grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem)); + value = grpc_slice_to_c_string(wire_value); + grpc_slice_unref(wire_value); + } else { + value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem)); + } if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) || grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_AUTHORITY)) { /* Cronet populates these fields on its own */ diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs new file mode 100644 index 0000000000..02f6f6ffc6 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs @@ -0,0 +1,228 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Interceptors; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using Grpc.Core.Tests; +using NUnit.Framework; + +namespace Grpc.Core.Interceptors.Tests +{ + public class ClientInterceptorTest + { + const string Host = "127.0.0.1"; + + [Test] + public void AddRequestHeaderInClientInterceptor() + { + const string HeaderKey = "x-client-interceptor"; + const string HeaderValue = "hello-world"; + var helper = new MockServiceHelper(Host); + helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => + { + var interceptorHeader = context.RequestHeaders.Last(m => (m.Key == HeaderKey)).Value; + Assert.AreEqual(interceptorHeader, HeaderValue); + return Task.FromResult("PASS"); + }); + var server = helper.GetServer(); + server.Start(); + var callInvoker = helper.GetChannel().Intercept(metadata => + { + metadata = metadata ?? new Metadata(); + metadata.Add(new Metadata.Entry(HeaderKey, HeaderValue)); + return metadata; + }); + Assert.AreEqual("PASS", callInvoker.BlockingUnaryCall(new Method<string, string>(MethodType.Unary, MockServiceHelper.ServiceName, "Unary", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions(), "")); + } + + [Test] + public void CheckInterceptorOrderInClientInterceptors() + { + var helper = new MockServiceHelper(Host); + helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => + { + return Task.FromResult("PASS"); + }); + var server = helper.GetServer(); + server.Start(); + var stringBuilder = new StringBuilder(); + var callInvoker = helper.GetChannel().Intercept(metadata => { + stringBuilder.Append("interceptor1"); + return metadata; + }).Intercept(new CallbackInterceptor(() => stringBuilder.Append("array1")), + new CallbackInterceptor(() => stringBuilder.Append("array2")), + new CallbackInterceptor(() => stringBuilder.Append("array3"))) + .Intercept(metadata => + { + stringBuilder.Append("interceptor2"); + return metadata; + }).Intercept(metadata => + { + stringBuilder.Append("interceptor3"); + return metadata; + }); + Assert.AreEqual("PASS", callInvoker.BlockingUnaryCall(new Method<string, string>(MethodType.Unary, MockServiceHelper.ServiceName, "Unary", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions(), "")); + Assert.AreEqual("interceptor3interceptor2array1array2array3interceptor1", stringBuilder.ToString()); + } + + [Test] + public void CheckNullInterceptorRegistrationFails() + { + var helper = new MockServiceHelper(Host); + helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => + { + return Task.FromResult("PASS"); + }); + Assert.Throws<ArgumentNullException>(() => helper.GetChannel().Intercept(default(Interceptor))); + Assert.Throws<ArgumentNullException>(() => helper.GetChannel().Intercept(new[]{default(Interceptor)})); + Assert.Throws<ArgumentNullException>(() => helper.GetChannel().Intercept(new[]{new CallbackInterceptor(()=>{}), null})); + Assert.Throws<ArgumentNullException>(() => helper.GetChannel().Intercept(default(Interceptor[]))); + } + + [Test] + public async Task CountNumberOfRequestsInClientInterceptors() + { + var helper = new MockServiceHelper(Host); + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + var stringBuilder = new StringBuilder(); + await requestStream.ForEachAsync(request => + { + stringBuilder.Append(request); + return TaskUtils.CompletedTask; + }); + await Task.Delay(100); + return stringBuilder.ToString(); + }); + + var callInvoker = helper.GetChannel().Intercept(new ClientStreamingCountingInterceptor()); + + var server = helper.GetServer(); + server.Start(); + var call = callInvoker.AsyncClientStreamingCall(new Method<string, string>(MethodType.ClientStreaming, MockServiceHelper.ServiceName, "ClientStreaming", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions()); + await call.RequestStream.WriteAllAsync(new string[] { "A", "B", "C" }); + Assert.AreEqual("3", await call.ResponseAsync); + + Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode); + Assert.IsNotNull(call.GetTrailers()); + } + + private class CallbackInterceptor : Interceptor + { + readonly Action callback; + + public CallbackInterceptor(Action callback) + { + this.callback = GrpcPreconditions.CheckNotNull(callback, nameof(callback)); + } + + public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, BlockingUnaryCallContinuation<TRequest, TResponse> continuation) + { + callback(); + return continuation(request, context); + } + + public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncUnaryCallContinuation<TRequest, TResponse> continuation) + { + callback(); + return continuation(request, context); + } + + public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation) + { + callback(); + return continuation(request, context); + } + + public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation) + { + callback(); + return continuation(context); + } + + public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation) + { + callback(); + return continuation(context); + } + } + + private class ClientStreamingCountingInterceptor : Interceptor + { + public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation) + { + var response = continuation(context); + int counter = 0; + var requestStream = new WrappedClientStreamWriter<TRequest>(response.RequestStream, + message => { counter++; return message; }, null); + var responseAsync = response.ResponseAsync.ContinueWith( + unaryResponse => (TResponse)(object)counter.ToString() // Cast to object first is needed to satisfy the type-checker + ); + return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, responseAsync, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); + } + } + + private class WrappedClientStreamWriter<T> : IClientStreamWriter<T> + { + readonly IClientStreamWriter<T> writer; + readonly Func<T, T> onMessage; + readonly Action onResponseStreamEnd; + public WrappedClientStreamWriter(IClientStreamWriter<T> writer, Func<T, T> onMessage, Action onResponseStreamEnd) + { + this.writer = writer; + this.onMessage = onMessage; + this.onResponseStreamEnd = onResponseStreamEnd; + } + public Task CompleteAsync() + { + if (onResponseStreamEnd != null) + { + return writer.CompleteAsync().ContinueWith(x => onResponseStreamEnd()); + } + return writer.CompleteAsync(); + } + public Task WriteAsync(T message) + { + if (onMessage != null) + { + message = onMessage(message); + } + return writer.WriteAsync(message); + } + public WriteOptions WriteOptions + { + get + { + return writer.WriteOptions; + } + set + { + writer.WriteOptions = value; + } + } + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs new file mode 100644 index 0000000000..e76f21d098 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs @@ -0,0 +1,126 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Interceptors; +using Grpc.Core.Internal; +using Grpc.Core.Tests; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Interceptors.Tests +{ + public class ServerInterceptorTest + { + const string Host = "127.0.0.1"; + + [Test] + public void AddRequestHeaderInServerInterceptor() + { + var helper = new MockServiceHelper(Host); + const string MetadataKey = "x-interceptor"; + const string MetadataValue = "hello world"; + var interceptor = new ServerCallContextInterceptor(ctx => ctx.RequestHeaders.Add(new Metadata.Entry(MetadataKey, MetadataValue))); + helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => + { + var interceptorHeader = context.RequestHeaders.Last(m => (m.Key == MetadataKey)).Value; + Assert.AreEqual(interceptorHeader, MetadataValue); + return Task.FromResult("PASS"); + }); + helper.ServiceDefinition = helper.ServiceDefinition.Intercept(interceptor); + var server = helper.GetServer(); + server.Start(); + var channel = helper.GetChannel(); + Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "")); + } + + [Test] + public void VerifyInterceptorOrdering() + { + var helper = new MockServiceHelper(Host); + helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => + { + return Task.FromResult("PASS"); + }); + var stringBuilder = new StringBuilder(); + helper.ServiceDefinition = helper.ServiceDefinition + .Intercept(new ServerCallContextInterceptor(ctx => stringBuilder.Append("A"))) + .Intercept(new ServerCallContextInterceptor(ctx => stringBuilder.Append("B1")), + new ServerCallContextInterceptor(ctx => stringBuilder.Append("B2")), + new ServerCallContextInterceptor(ctx => stringBuilder.Append("B3"))) + .Intercept(new ServerCallContextInterceptor(ctx => stringBuilder.Append("C"))); + var server = helper.GetServer(); + server.Start(); + var channel = helper.GetChannel(); + Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "")); + Assert.AreEqual("CB1B2B3A", stringBuilder.ToString()); + } + + [Test] + public void CheckNullInterceptorRegistrationFails() + { + var helper = new MockServiceHelper(Host); + var sd = helper.ServiceDefinition; + Assert.Throws<ArgumentNullException>(() => sd.Intercept(default(Interceptor))); + Assert.Throws<ArgumentNullException>(() => sd.Intercept(new[]{default(Interceptor)})); + Assert.Throws<ArgumentNullException>(() => sd.Intercept(new[]{new ServerCallContextInterceptor(ctx=>{}), null})); + Assert.Throws<ArgumentNullException>(() => sd.Intercept(default(Interceptor[]))); + } + + private class ServerCallContextInterceptor : Interceptor + { + readonly Action<ServerCallContext> interceptor; + + public ServerCallContextInterceptor(Action<ServerCallContext> interceptor) + { + GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); + this.interceptor = interceptor; + } + + public override Task<TResponse> UnaryServerHandler<TRequest, TResponse>(TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) + { + interceptor(context); + return continuation(request, context); + } + + public override Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, ServerCallContext context, ClientStreamingServerMethod<TRequest, TResponse> continuation) + { + interceptor(context); + return continuation(requestStream, context); + } + + public override Task ServerStreamingServerHandler<TRequest, TResponse>(TRequest request, IServerStreamWriter<TResponse> responseStream, ServerCallContext context, ServerStreamingServerMethod<TRequest, TResponse> continuation) + { + interceptor(context); + return continuation(request, responseStream, context); + } + + public override Task DuplexStreamingServerHandler<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, IServerStreamWriter<TResponse> responseStream, ServerCallContext context, DuplexStreamingServerMethod<TRequest, TResponse> continuation) + { + interceptor(context); + return continuation(requestStream, responseStream, context); + } + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs index 7f4677d57f..a925f865ff 100644 --- a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs +++ b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs @@ -37,7 +37,6 @@ namespace Grpc.Core.Tests public const string ServiceName = "tests.Test"; readonly string host; - readonly ServerServiceDefinition serviceDefinition; readonly IEnumerable<ChannelOption> channelOptions; readonly Method<string, string> unaryMethod; @@ -87,7 +86,7 @@ namespace Grpc.Core.Tests marshaller, marshaller); - serviceDefinition = ServerServiceDefinition.CreateBuilder() + ServiceDefinition = ServerServiceDefinition.CreateBuilder() .AddMethod(unaryMethod, (request, context) => unaryHandler(request, context)) .AddMethod(clientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context)) .AddMethod(serverStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context)) @@ -131,7 +130,7 @@ namespace Grpc.Core.Tests // Disable SO_REUSEPORT to prevent https://github.com/grpc/grpc/issues/10755 server = new Server(new[] { new ChannelOption(ChannelOptions.SoReuseport, 0) }) { - Services = { serviceDefinition }, + Services = { ServiceDefinition }, Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } } }; } @@ -178,13 +177,7 @@ namespace Grpc.Core.Tests } } - public ServerServiceDefinition ServiceDefinition - { - get - { - return this.serviceDefinition; - } - } + public ServerServiceDefinition ServiceDefinition { get; set; } public UnaryServerMethod<string, string> UnaryHandler { diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index 2d41b29fa0..fac34071be 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -16,6 +16,8 @@ #endregion +using System; +using Grpc.Core.Interceptors; using Grpc.Core.Internal; using Grpc.Core.Utils; @@ -147,6 +149,52 @@ namespace Grpc.Core /// </summary> protected internal class ClientBaseConfiguration { + private class ClientBaseConfigurationInterceptor : Interceptor + { + readonly Func<IMethod, string, CallOptions, Tuple<string, CallOptions>> interceptor; + + /// <summary> + /// Creates a new instance of ClientBaseConfigurationInterceptor given the specified header and host interceptor function. + /// </summary> + public ClientBaseConfigurationInterceptor(Func<IMethod, string, CallOptions, Tuple<string, CallOptions>> interceptor) + { + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); + } + + private ClientInterceptorContext<TRequest, TResponse> GetNewContext<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context) + where TRequest : class + where TResponse : class + { + var newHostAndCallOptions = interceptor(context.Method, context.Host, context.Options); + return new ClientInterceptorContext<TRequest, TResponse>(context.Method, newHostAndCallOptions.Item1, newHostAndCallOptions.Item2); + } + + public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, BlockingUnaryCallContinuation<TRequest, TResponse> continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncUnaryCallContinuation<TRequest, TResponse> continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation) + { + return continuation(GetNewContext(context)); + } + + public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation) + { + return continuation(GetNewContext(context)); + } + } + readonly CallInvoker undecoratedCallInvoker; readonly string host; @@ -158,12 +206,12 @@ namespace Grpc.Core internal CallInvoker CreateDecoratedCallInvoker() { - return new InterceptingCallInvoker(undecoratedCallInvoker, hostInterceptor: (h) => host); + return undecoratedCallInvoker.Intercept(new ClientBaseConfigurationInterceptor((method, host, options) => Tuple.Create(this.host, options))); } internal ClientBaseConfiguration WithHost(string host) { - GrpcPreconditions.CheckNotNull(host, "host"); + GrpcPreconditions.CheckNotNull(host, nameof(host)); return new ClientBaseConfiguration(this.undecoratedCallInvoker, host); } } diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs new file mode 100644 index 0000000000..421b5d379e --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs @@ -0,0 +1,144 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Linq; +using Grpc.Core.Utils; + +namespace Grpc.Core.Interceptors +{ + /// <summary> + /// Extends the CallInvoker class to provide the interceptor facility on the client side. + /// This is an EXPERIMENTAL API. + /// </summary> + public static class CallInvokerExtensions + { + /// <summary> + /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts + /// the invoker with the given interceptor. + /// </summary> + /// <param name="invoker">The underlying invoker to intercept.</param> + /// <param name="interceptor">The interceptor to intercept calls to the invoker with.</param> + /// <remarks> + /// Multiple interceptors can be added on top of each other by calling + /// "invoker.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted CallInvoker, effectively + /// building a chain like "invoker.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// </remarks> + public static CallInvoker Intercept(this CallInvoker invoker, Interceptor interceptor) + { + return new InterceptingCallInvoker(invoker, interceptor); + } + + /// <summary> + /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts + /// the invoker with the given interceptors. + /// </summary> + /// <param name="invoker">The channel to intercept.</param> + /// <param name="interceptors"> + /// An array of interceptors to intercept the calls to the invoker with. + /// Control is passed to the interceptors in the order specified. + /// </param> + /// <remarks> + /// Multiple interceptors can be added on top of each other by calling + /// "invoker.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted CallInvoker, effectively + /// building a chain like "invoker.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// </remarks> + public static CallInvoker Intercept(this CallInvoker invoker, params Interceptor[] interceptors) + { + GrpcPreconditions.CheckNotNull(invoker, nameof(invoker)); + GrpcPreconditions.CheckNotNull(interceptors, nameof(interceptors)); + + foreach (var interceptor in interceptors.Reverse()) + { + invoker = Intercept(invoker, interceptor); + } + + return invoker; + } + + /// <summary> + /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts + /// the invoker with the given interceptor. + /// </summary> + /// <param name="invoker">The underlying invoker to intercept.</param> + /// <param name="interceptor"> + /// An interceptor delegate that takes the request metadata to be sent with an outgoing call + /// and returns a <see cref="Grpc.Core.Metadata" /> instance that will replace the existing + /// invocation metadata. + /// </param> + /// <remarks> + /// Multiple interceptors can be added on top of each other by + /// building a chain like "invoker.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// </remarks> + public static CallInvoker Intercept(this CallInvoker invoker, Func<Metadata, Metadata> interceptor) + { + return new InterceptingCallInvoker(invoker, new MetadataInterceptor(interceptor)); + } + + private class MetadataInterceptor : Interceptor + { + readonly Func<Metadata, Metadata> interceptor; + + /// <summary> + /// Creates a new instance of MetadataInterceptor given the specified interceptor function. + /// </summary> + public MetadataInterceptor(Func<Metadata, Metadata> interceptor) + { + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); + } + + private ClientInterceptorContext<TRequest, TResponse> GetNewContext<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context) + where TRequest : class + where TResponse : class + { + var metadata = context.Options.Headers ?? new Metadata(); + return new ClientInterceptorContext<TRequest, TResponse>(context.Method, context.Host, context.Options.WithHeaders(interceptor(metadata))); + } + + public override TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, BlockingUnaryCallContinuation<TRequest, TResponse> continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncUnaryCallContinuation<TRequest, TResponse> continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation) + { + return continuation(GetNewContext(context)); + } + + public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation) + { + return continuation(GetNewContext(context)); + } + } + } +} diff --git a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs new file mode 100644 index 0000000000..00b2fa8bec --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs @@ -0,0 +1,88 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; + +namespace Grpc.Core.Interceptors +{ + /// <summary> + /// Provides extension methods to make it easy to register interceptors on Channel objects. + /// This is an EXPERIMENTAL API. + /// </summary> + public static class ChannelExtensions + { + /// <summary> + /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts + /// the channel with the given interceptor. + /// </summary> + /// <param name="channel">The channel to intercept.</param> + /// <param name="interceptor">The interceptor to intercept the channel with.</param> + /// <remarks> + /// Multiple interceptors can be added on top of each other by calling + /// "channel.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted channel, effectively + /// building a chain like "channel.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// </remarks> + public static CallInvoker Intercept(this Channel channel, Interceptor interceptor) + { + return new DefaultCallInvoker(channel).Intercept(interceptor); + } + + /// <summary> + /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts + /// the channel with the given interceptors. + /// </summary> + /// <param name="channel">The channel to intercept.</param> + /// <param name="interceptors"> + /// An array of interceptors to intercept the channel with. + /// Control is passed to the interceptors in the order specified. + /// </param> + /// <remarks> + /// Multiple interceptors can be added on top of each other by calling + /// "channel.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted channel, effectively + /// building a chain like "channel.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// </remarks> + public static CallInvoker Intercept(this Channel channel, params Interceptor[] interceptors) + { + return new DefaultCallInvoker(channel).Intercept(interceptors); + } + + /// <summary> + /// Returns a <see cref="Grpc.Core.CallInvoker" /> instance that intercepts + /// the invoker with the given interceptor. + /// </summary> + /// <param name="channel">The channel to intercept.</param> + /// <param name="interceptor"> + /// An interceptor delegate that takes the request metadata to be sent with an outgoing call + /// and returns a <see cref="Grpc.Core.Metadata" /> instance that will replace the existing + /// invocation metadata. + /// </param> + /// <remarks> + /// Multiple interceptors can be added on top of each other by + /// building a chain like "channel.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// </remarks> + public static CallInvoker Intercept(this Channel channel, Func<Metadata, Metadata> interceptor) + { + return new DefaultCallInvoker(channel).Intercept(interceptor); + } + } +} diff --git a/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs b/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs new file mode 100644 index 0000000000..de06a77077 --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs @@ -0,0 +1,65 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Reflection; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Interceptors +{ + /// <summary> + /// Carries along the context associated with intercepted invocations on the client side. + /// This is an EXPERIMENTAL API. + /// </summary> + public struct ClientInterceptorContext<TRequest, TResponse> + where TRequest : class + where TResponse : class + { + /// <summary> + /// Creates a new instance of <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}" /> + /// with the specified method, host, and call options. + /// </summary> + /// <param name="method">A <see cref="Grpc.Core.Method{TRequest, TResponse}"/> object representing the method to be invoked.</param> + /// <param name="host">The host to dispatch the current call to.</param> + /// <param name="options">A <see cref="Grpc.Core.CallOptions"/> instance containing the call options of the current call.</param> + public ClientInterceptorContext(Method<TRequest, TResponse> method, string host, CallOptions options) + { + Method = method; + Host = host; + Options = options; + } + + /// <summary> + /// Gets the <see cref="Grpc.Core.Method{TRequest, TResponse}"/> instance + /// representing the method to be invoked. + /// </summary> + public Method<TRequest, TResponse> Method { get; } + + /// <summary> + /// Gets the host that the currect invocation will be dispatched to. + /// </summary> + public string Host { get; } + + /// <summary> + /// Gets the <see cref="Grpc.Core.CallOptions"/> structure representing the + /// call options associated with the current invocation. + /// </summary> + public CallOptions Options { get; } + } +} diff --git a/src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs b/src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs new file mode 100644 index 0000000000..84d2a0b958 --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs @@ -0,0 +1,96 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using Grpc.Core.Utils; + +namespace Grpc.Core.Interceptors +{ + /// <summary> + /// Decorates an underlying <see cref="Grpc.Core.CallInvoker" /> to + /// intercept calls through a given interceptor. + /// </summary> + internal class InterceptingCallInvoker : CallInvoker + { + readonly CallInvoker invoker; + readonly Interceptor interceptor; + + /// <summary> + /// Creates a new instance of <see cref="Grpc.Core.Interceptors.InterceptingCallInvoker" /> + /// with the given underlying invoker and interceptor instances. + /// </summary> + public InterceptingCallInvoker(CallInvoker invoker, Interceptor interceptor) + { + this.invoker = GrpcPreconditions.CheckNotNull(invoker, nameof(invoker)); + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); + } + + /// <summary> + /// Intercepts a simple blocking call with the registered interceptor. + /// </summary> + public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) + { + return interceptor.BlockingUnaryCall( + request, + new ClientInterceptorContext<TRequest, TResponse>(method, host, options), + (req, ctx) => invoker.BlockingUnaryCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// <summary> + /// Intercepts a simple asynchronous call with the registered interceptor. + /// </summary> + public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) + { + return interceptor.AsyncUnaryCall( + request, + new ClientInterceptorContext<TRequest, TResponse>(method, host, options), + (req, ctx) => invoker.AsyncUnaryCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// <summary> + /// Intercepts an asynchronous server streaming call with the registered interceptor. + /// </summary> + public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) + { + return interceptor.AsyncServerStreamingCall( + request, + new ClientInterceptorContext<TRequest, TResponse>(method, host, options), + (req, ctx) => invoker.AsyncServerStreamingCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// <summary> + /// Intercepts an asynchronous client streaming call with the registered interceptor. + /// </summary> + public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) + { + return interceptor.AsyncClientStreamingCall( + new ClientInterceptorContext<TRequest, TResponse>(method, host, options), + ctx => invoker.AsyncClientStreamingCall(ctx.Method, ctx.Host, ctx.Options)); + } + + /// <summary> + /// Intercepts an asynchronous duplex streaming call with the registered interceptor. + /// </summary> + public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) + { + return interceptor.AsyncDuplexStreamingCall( + new ClientInterceptorContext<TRequest, TResponse>(method, host, options), + ctx => invoker.AsyncDuplexStreamingCall(ctx.Method, ctx.Host, ctx.Options)); + } + } +} diff --git a/src/csharp/Grpc.Core/Interceptors/Interceptor.cs b/src/csharp/Grpc.Core/Interceptors/Interceptor.cs new file mode 100644 index 0000000000..56a30c34af --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/Interceptor.cs @@ -0,0 +1,406 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Reflection; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Interceptors +{ + /// <summary> + /// Serves as the base class for gRPC interceptors. + /// This is an EXPERIMENTAL API. + /// </summary> + public abstract class Interceptor + { + /// <summary> + /// Represents a continuation for intercepting simple blocking invocations. + /// A delegate of this type is passed to the BlockingUnaryCall method + /// when an outgoing invocation is being intercepted and calling the + /// delegate will invoke the next interceptor in the chain, or the underlying + /// call invoker if called from the last interceptor. The interceptor is + /// allowed to call it zero, one, or multiple times, passing it the appropriate + /// context and request values as it sees fit. + /// </summary> + /// <typeparam name="TRequest">Request message type for this invocation.</typeparam> + /// <typeparam name="TResponse">Response message type for this invocation.</typeparam> + /// <param name="request">The request value to continue the invocation with.</param> + /// <param name="context"> + /// The <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}"/> + /// instance to pass to the next step in the invocation process. + /// </param> + /// <returns> + /// The response value of the invocation to return to the caller. + /// The interceptor can choose to return the return value of the + /// continuation delegate or an arbitrary value as it sees fit. + /// </returns> + public delegate TResponse BlockingUnaryCallContinuation<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context) + where TRequest : class + where TResponse : class; + + /// <summary> + /// Represents a continuation for intercepting simple asynchronous invocations. + /// A delegate of this type is passed to the AsyncUnaryCall method + /// when an outgoing invocation is being intercepted and calling the + /// delegate will invoke the next interceptor in the chain, or the underlying + /// call invoker if called from the last interceptor. The interceptor is + /// allowed to call it zero, one, or multiple times, passing it the appropriate + /// request value and context as it sees fit. + /// </summary> + /// <typeparam name="TRequest">Request message type for this invocation.</typeparam> + /// <typeparam name="TResponse">Response message type for this invocation.</typeparam> + /// <param name="request">The request value to continue the invocation with.</param> + /// <param name="context"> + /// The <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}"/> + /// instance to pass to the next step in the invocation process. + /// </param> + /// <returns> + /// An instance of <see cref="Grpc.Core.AsyncUnaryCall{TResponse}" /> + /// representing an asynchronous invocation of a unary RPC. + /// The interceptor can choose to return the same object returned from + /// the continuation delegate or an arbitrarily constructed instance as it sees fit. + /// </returns> + public delegate AsyncUnaryCall<TResponse> AsyncUnaryCallContinuation<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context) + where TRequest : class + where TResponse : class; + + /// <summary> + /// Represents a continuation for intercepting asynchronous server-streaming invocations. + /// A delegate of this type is passed to the AsyncServerStreamingCall method + /// when an outgoing invocation is being intercepted and calling the + /// delegate will invoke the next interceptor in the chain, or the underlying + /// call invoker if called from the last interceptor. The interceptor is + /// allowed to call it zero, one, or multiple times, passing it the appropriate + /// request value and context as it sees fit. + /// </summary> + /// <typeparam name="TRequest">Request message type for this invocation.</typeparam> + /// <typeparam name="TResponse">Response message type for this invocation.</typeparam> + /// <param name="request">The request value to continue the invocation with.</param> + /// <param name="context"> + /// The <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}"/> + /// instance to pass to the next step in the invocation process. + /// </param> + /// <returns> + /// An instance of <see cref="Grpc.Core.AsyncServerStreamingCall{TResponse}" /> + /// representing an asynchronous invocation of a server-streaming RPC. + /// The interceptor can choose to return the same object returned from + /// the continuation delegate or an arbitrarily constructed instance as it sees fit. + /// </returns> + public delegate AsyncServerStreamingCall<TResponse> AsyncServerStreamingCallContinuation<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context) + where TRequest : class + where TResponse : class; + + /// <summary> + /// Represents a continuation for intercepting asynchronous client-streaming invocations. + /// A delegate of this type is passed to the AsyncClientStreamingCall method + /// when an outgoing invocation is being intercepted and calling the + /// delegate will invoke the next interceptor in the chain, or the underlying + /// call invoker if called from the last interceptor. The interceptor is + /// allowed to call it zero, one, or multiple times, passing it the appropriate + /// request value and context as it sees fit. + /// </summary> + /// <typeparam name="TRequest">Request message type for this invocation.</typeparam> + /// <typeparam name="TResponse">Response message type for this invocation.</typeparam> + /// <param name="context"> + /// The <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}"/> + /// instance to pass to the next step in the invocation process. + /// </param> + /// <returns> + /// An instance of <see cref="Grpc.Core.AsyncClientStreamingCall{TRequest, TResponse}" /> + /// representing an asynchronous invocation of a client-streaming RPC. + /// The interceptor can choose to return the same object returned from + /// the continuation delegate or an arbitrarily constructed instance as it sees fit. + /// </returns> + public delegate AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCallContinuation<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context) + where TRequest : class + where TResponse : class; + + /// <summary> + /// Represents a continuation for intercepting asynchronous duplex invocations. + /// A delegate of this type is passed to the AsyncDuplexStreamingCall method + /// when an outgoing invocation is being intercepted and calling the + /// delegate will invoke the next interceptor in the chain, or the underlying + /// call invoker if called from the last interceptor. The interceptor is + /// allowed to call it zero, one, or multiple times, passing it the appropriate + /// request value and context as it sees fit. + /// </summary> + /// <param name="context"> + /// The <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}"/> + /// instance to pass to the next step in the invocation process. + /// </param> + /// <returns> + /// An instance of <see cref="Grpc.Core.AsyncDuplexStreamingCall{TRequest, TResponse}" /> + /// representing an asynchronous invocation of a duplex-streaming RPC. + /// The interceptor can choose to return the same object returned from + /// the continuation delegate or an arbitrarily constructed instance as it sees fit. + /// </returns> + public delegate AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCallContinuation<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context) + where TRequest : class + where TResponse : class; + + /// <summary> + /// Intercepts a blocking invocation of a simple remote call. + /// </summary> + /// <param name="request">The request message of the invocation.</param> + /// <param name="context"> + /// The <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}"/> + /// associated with the current invocation. + /// </param> + /// <param name="continuation"> + /// The callback that continues the invocation process. + /// This can be invoked zero or more times by the interceptor. + /// The interceptor can invoke the continuation passing the given + /// request value and context arguments, or substitute them as it sees fit. + /// </param> + /// <returns> + /// The response message of the current invocation. + /// The interceptor can simply return the return value of the + /// continuation delegate passed to it intact, or an arbitrary + /// value as it sees fit. + /// </returns> + public virtual TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, BlockingUnaryCallContinuation<TRequest, TResponse> continuation) + where TRequest : class + where TResponse : class + { + return continuation(request, context); + } + + /// <summary> + /// Intercepts an asynchronous invocation of a simple remote call. + /// </summary> + /// <param name="request">The request message of the invocation.</param> + /// <param name="context"> + /// The <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}"/> + /// associated with the current invocation. + /// </param> + /// <param name="continuation"> + /// The callback that continues the invocation process. + /// This can be invoked zero or more times by the interceptor. + /// The interceptor can invoke the continuation passing the given + /// request value and context arguments, or substitute them as it sees fit. + /// </param> + /// <returns> + /// An instance of <see cref="Grpc.Core.AsyncUnaryCall{TResponse}" /> + /// representing an asynchronous unary invocation. + /// The interceptor can simply return the return value of the + /// continuation delegate passed to it intact, or construct its + /// own substitute as it sees fit. + /// </returns> + public virtual AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncUnaryCallContinuation<TRequest, TResponse> continuation) + where TRequest : class + where TResponse : class + { + return continuation(request, context); + } + + /// <summary> + /// Intercepts an asynchronous invocation of a streaming remote call. + /// </summary> + /// <param name="request">The request message of the invocation.</param> + /// <param name="context"> + /// The <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}"/> + /// associated with the current invocation. + /// </param> + /// <param name="continuation"> + /// The callback that continues the invocation process. + /// This can be invoked zero or more times by the interceptor. + /// The interceptor can invoke the continuation passing the given + /// request value and context arguments, or substitute them as it sees fit. + /// </param> + /// <returns> + /// An instance of <see cref="Grpc.Core.AsyncServerStreamingCall{TResponse}" /> + /// representing an asynchronous server-streaming invocation. + /// The interceptor can simply return the return value of the + /// continuation delegate passed to it intact, or construct its + /// own substitute as it sees fit. + /// </returns> + public virtual AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncServerStreamingCallContinuation<TRequest, TResponse> continuation) + where TRequest : class + where TResponse : class + { + return continuation(request, context); + } + + /// <summary> + /// Intercepts an asynchronous invocation of a client streaming call. + /// </summary> + /// <param name="context"> + /// The <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}"/> + /// associated with the current invocation. + /// </param> + /// <param name="continuation"> + /// The callback that continues the invocation process. + /// This can be invoked zero or more times by the interceptor. + /// The interceptor can invoke the continuation passing the given + /// context argument, or substitute as it sees fit. + /// </param> + /// <returns> + /// An instance of <see cref="Grpc.Core.AsyncClientStreamingCall{TRequest, TResponse}" /> + /// representing an asynchronous client-streaming invocation. + /// The interceptor can simply return the return value of the + /// continuation delegate passed to it intact, or construct its + /// own substitute as it sees fit. + /// </returns> + public virtual AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncClientStreamingCallContinuation<TRequest, TResponse> continuation) + where TRequest : class + where TResponse : class + { + return continuation(context); + } + + /// <summary> + /// Intercepts an asynchronous invocation of a duplex streaming call. + /// </summary> + /// <param name="context"> + /// The <see cref="Grpc.Core.Interceptors.ClientInterceptorContext{TRequest, TResponse}"/> + /// associated with the current invocation. + /// </param> + /// <param name="continuation"> + /// The callback that continues the invocation process. + /// This can be invoked zero or more times by the interceptor. + /// The interceptor can invoke the continuation passing the given + /// context argument, or substitute as it sees fit. + /// </param> + /// <returns> + /// An instance of <see cref="Grpc.Core.AsyncDuplexStreamingCall{TRequest, TResponse}" /> + /// representing an asynchronous duplex-streaming invocation. + /// The interceptor can simply return the return value of the + /// continuation delegate passed to it intact, or construct its + /// own substitute as it sees fit. + /// </returns> + public virtual AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(ClientInterceptorContext<TRequest, TResponse> context, AsyncDuplexStreamingCallContinuation<TRequest, TResponse> continuation) + where TRequest : class + where TResponse : class + { + return continuation(context); + } + + /// <summary> + /// Server-side handler for intercepting and incoming unary call. + /// </summary> + /// <typeparam name="TRequest">Request message type for this method.</typeparam> + /// <typeparam name="TResponse">Response message type for this method.</typeparam> + /// <param name="request">The request value of the incoming invocation.</param> + /// <param name="context"> + /// An instance of <see cref="Grpc.Core.ServerCallContext" /> representing + /// the context of the invocation. + /// </param> + /// <param name="continuation"> + /// A delegate that asynchronously proceeds with the invocation, calling + /// the next interceptor in the chain, or the service request handler, + /// in case of the last interceptor and return the response value of + /// the RPC. The interceptor can choose to call it zero or more times + /// at its discretion. + /// </param> + /// <returns> + /// A future representing the response value of the RPC. The interceptor + /// can simply return the return value from the continuation intact, + /// or an arbitrary response value as it sees fit. + /// </returns> + public virtual Task<TResponse> UnaryServerHandler<TRequest, TResponse>(TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) + where TRequest : class + where TResponse : class + { + return continuation(request, context); + } + + /// <summary> + /// Server-side handler for intercepting client streaming call. + /// </summary> + /// <typeparam name="TRequest">Request message type for this method.</typeparam> + /// <typeparam name="TResponse">Response message type for this method.</typeparam> + /// <param name="requestStream">The request stream of the incoming invocation.</param> + /// <param name="context"> + /// An instance of <see cref="Grpc.Core.ServerCallContext" /> representing + /// the context of the invocation. + /// </param> + /// <param name="continuation"> + /// A delegate that asynchronously proceeds with the invocation, calling + /// the next interceptor in the chain, or the service request handler, + /// in case of the last interceptor and return the response value of + /// the RPC. The interceptor can choose to call it zero or more times + /// at its discretion. + /// </param> + /// <returns> + /// A future representing the response value of the RPC. The interceptor + /// can simply return the return value from the continuation intact, + /// or an arbitrary response value as it sees fit. The interceptor has + /// the ability to wrap or substitute the request stream when calling + /// the continuation. + /// </returns> + public virtual Task<TResponse> ClientStreamingServerHandler<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, ServerCallContext context, ClientStreamingServerMethod<TRequest, TResponse> continuation) + where TRequest : class + where TResponse : class + { + return continuation(requestStream, context); + } + + /// <summary> + /// Server-side handler for intercepting server streaming call. + /// </summary> + /// <typeparam name="TRequest">Request message type for this method.</typeparam> + /// <typeparam name="TResponse">Response message type for this method.</typeparam> + /// <param name="request">The request value of the incoming invocation.</param> + /// <param name="responseStream">The response stream of the incoming invocation.</param> + /// <param name="context"> + /// An instance of <see cref="Grpc.Core.ServerCallContext" /> representing + /// the context of the invocation. + /// </param> + /// <param name="continuation"> + /// A delegate that asynchronously proceeds with the invocation, calling + /// the next interceptor in the chain, or the service request handler, + /// in case of the last interceptor and the interceptor can choose to + /// call it zero or more times at its discretion. The interceptor has + /// the ability to wrap or substitute the request value and the response stream + /// when calling the continuation. + /// </param> + public virtual Task ServerStreamingServerHandler<TRequest, TResponse>(TRequest request, IServerStreamWriter<TResponse> responseStream, ServerCallContext context, ServerStreamingServerMethod<TRequest, TResponse> continuation) + where TRequest : class + where TResponse : class + { + return continuation(request, responseStream, context); + } + + /// <summary> + /// Server-side handler for intercepting bidirectional streaming calls. + /// </summary> + /// <typeparam name="TRequest">Request message type for this method.</typeparam> + /// <typeparam name="TResponse">Response message type for this method.</typeparam> + /// <param name="requestStream">The request stream of the incoming invocation.</param> + /// <param name="responseStream">The response stream of the incoming invocation.</param> + /// <param name="context"> + /// An instance of <see cref="Grpc.Core.ServerCallContext" /> representing + /// the context of the invocation. + /// </param> + /// <param name="continuation"> + /// A delegate that asynchronously proceeds with the invocation, calling + /// the next interceptor in the chain, or the service request handler, + /// in case of the last interceptor and the interceptor can choose to + /// call it zero or more times at its discretion. The interceptor has + /// the ability to wrap or substitute the request and response streams + /// when calling the continuation. + /// </param> + public virtual Task DuplexStreamingServerHandler<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, IServerStreamWriter<TResponse> responseStream, ServerCallContext context, DuplexStreamingServerMethod<TRequest, TResponse> continuation) + where TRequest : class + where TResponse : class + { + return continuation(requestStream, responseStream, context); + } + } +} diff --git a/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs new file mode 100644 index 0000000000..b9b53247ce --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs @@ -0,0 +1,82 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Linq; +using Grpc.Core.Utils; + +namespace Grpc.Core.Interceptors +{ + /// <summary> + /// Extends the ServerServiceDefinition class to add methods used to register interceptors on the server side. + /// This is an EXPERIMENTAL API. + /// </summary> + public static class ServerServiceDefinitionExtensions + { + /// <summary> + /// Returns a <see cref="Grpc.Core.ServerServiceDefinition" /> instance that + /// intercepts incoming calls to the underlying service handler through the given interceptor. + /// This is an EXPERIMENTAL API. + /// </summary> + /// <param name="serverServiceDefinition">The <see cref="Grpc.Core.ServerServiceDefinition" /> instance to register interceptors on.</param> + /// <param name="interceptor">The interceptor to intercept the incoming invocations with.</param> + /// <remarks> + /// Multiple interceptors can be added on top of each other by calling + /// "serverServiceDefinition.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted service definition, effectively + /// building a chain like "serverServiceDefinition.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// </remarks> + public static ServerServiceDefinition Intercept(this ServerServiceDefinition serverServiceDefinition, Interceptor interceptor) + { + GrpcPreconditions.CheckNotNull(serverServiceDefinition, nameof(serverServiceDefinition)); + GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); + return new ServerServiceDefinition(serverServiceDefinition.CallHandlers.ToDictionary(x => x.Key, x => x.Value.Intercept(interceptor))); + } + + /// <summary> + /// Returns a <see cref="Grpc.Core.ServerServiceDefinition" /> instance that + /// intercepts incoming calls to the underlying service handler through the given interceptors. + /// This is an EXPERIMENTAL API. + /// </summary> + /// <param name="serverServiceDefinition">The <see cref="Grpc.Core.ServerServiceDefinition" /> instance to register interceptors on.</param> + /// <param name="interceptors"> + /// An array of interceptors to intercept the incoming invocations with. + /// Control is passed to the interceptors in the order specified. + /// </param> + /// <remarks> + /// Multiple interceptors can be added on top of each other by calling + /// "serverServiceDefinition.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted service definition, effectively + /// building a chain like "serverServiceDefinition.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// </remarks> + public static ServerServiceDefinition Intercept(this ServerServiceDefinition serverServiceDefinition, params Interceptor[] interceptors) + { + GrpcPreconditions.CheckNotNull(serverServiceDefinition, nameof(serverServiceDefinition)); + GrpcPreconditions.CheckNotNull(interceptors, nameof(interceptors)); + + foreach (var interceptor in interceptors.Reverse()) + { + serverServiceDefinition = Intercept(serverServiceDefinition, interceptor); + } + + return serverServiceDefinition; + } + } +}
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs b/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs deleted file mode 100644 index eb4c7d97a7..0000000000 --- a/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs +++ /dev/null @@ -1,119 +0,0 @@ -#region Copyright notice and license - -// Copyright 2015-2016 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#endregion - -using System; -using System.Threading.Tasks; -using Grpc.Core; -using Grpc.Core.Utils; - -namespace Grpc.Core.Internal -{ - /// <summary> - /// Decorates an underlying <c>CallInvoker</c> to intercept call invocations. - /// </summary> - internal class InterceptingCallInvoker : CallInvoker - { - readonly CallInvoker callInvoker; - readonly Func<string, string> hostInterceptor; - readonly Func<CallOptions, CallOptions> callOptionsInterceptor; - - /// <summary> - /// Initializes a new instance of the <see cref="Grpc.Core.Internal.InterceptingCallInvoker"/> class. - /// </summary> - public InterceptingCallInvoker(CallInvoker callInvoker, - Func<string, string> hostInterceptor = null, - Func<CallOptions, CallOptions> callOptionsInterceptor = null) - { - this.callInvoker = GrpcPreconditions.CheckNotNull(callInvoker); - this.hostInterceptor = hostInterceptor; - this.callOptionsInterceptor = callOptionsInterceptor; - } - - /// <summary> - /// Intercepts a unary call. - /// </summary> - public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.BlockingUnaryCall(method, host, options, request); - } - - /// <summary> - /// Invokes a simple remote call asynchronously. - /// </summary> - public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncUnaryCall(method, host, options, request); - } - - /// <summary> - /// Invokes a server streaming call asynchronously. - /// In server streaming scenario, client sends on request and server responds with a stream of responses. - /// </summary> - public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncServerStreamingCall(method, host, options, request); - } - - /// <summary> - /// Invokes a client streaming call asynchronously. - /// In client streaming scenario, client sends a stream of requests and server responds with a single response. - /// </summary> - public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncClientStreamingCall(method, host, options); - } - - /// <summary> - /// Invokes a duplex streaming call asynchronously. - /// In duplex streaming scenario, client sends a stream of requests and server responds with a stream of responses. - /// The response stream is completely independent and both side can be sending messages at the same time. - /// </summary> - public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncDuplexStreamingCall(method, host, options); - } - - private string InterceptHost(string host) - { - if (hostInterceptor == null) - { - return host; - } - return hostInterceptor(host); - } - - private CallOptions InterceptCallOptions(CallOptions options) - { - if (callOptionsInterceptor == null) - { - return options; - } - return callOptionsInterceptor(options); - } - } -} diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 98995a0862..81522cf8fe 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -21,6 +21,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using Grpc.Core.Interceptors; using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Utils; @@ -30,6 +31,7 @@ namespace Grpc.Core.Internal internal interface IServerCallHandler { Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq); + IServerCallHandler Intercept(Interceptor interceptor); } internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler @@ -74,7 +76,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler."); + Logger.Warning(e, "Exception occurred in the handler or an interceptor."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -89,6 +91,11 @@ namespace Grpc.Core.Internal } await finishedTask.ConfigureAwait(false); } + + public IServerCallHandler Intercept(Interceptor interceptor) + { + return new UnaryServerCallHandler<TRequest, TResponse>(method, (request, context) => interceptor.UnaryServerHandler(request, context, handler)); + } } internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler @@ -131,7 +138,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler."); + Logger.Warning(e, "Exception occurred in the handler or an interceptor."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -147,6 +154,11 @@ namespace Grpc.Core.Internal } await finishedTask.ConfigureAwait(false); } + + public IServerCallHandler Intercept(Interceptor interceptor) + { + return new ServerStreamingServerCallHandler<TRequest, TResponse>(method, (request, responseStream, context) => interceptor.ServerStreamingServerHandler(request, responseStream, context, handler)); + } } internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler @@ -189,7 +201,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler."); + Logger.Warning(e, "Exception occurred in the handler or an interceptor."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -205,6 +217,11 @@ namespace Grpc.Core.Internal } await finishedTask.ConfigureAwait(false); } + + public IServerCallHandler Intercept(Interceptor interceptor) + { + return new ClientStreamingServerCallHandler<TRequest, TResponse>(method, (requestStream, context) => interceptor.ClientStreamingServerHandler(requestStream, context, handler)); + } } internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler @@ -245,7 +262,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler."); + Logger.Warning(e, "Exception occurred in the handler or an interceptor."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -260,6 +277,11 @@ namespace Grpc.Core.Internal } await finishedTask.ConfigureAwait(false); } + + public IServerCallHandler Intercept(Interceptor interceptor) + { + return new DuplexStreamingServerCallHandler<TRequest, TResponse>(method, (requestStream, responseStream, context) => interceptor.DuplexStreamingServerHandler(requestStream, responseStream, context, handler)); + } } internal class UnimplementedMethodCallHandler : IServerCallHandler @@ -288,6 +310,11 @@ namespace Grpc.Core.Internal { return callHandlerImpl.HandleCall(newRpc, cq); } + + public IServerCallHandler Intercept(Interceptor interceptor) + { + return this; // Do not intercept unimplemented methods. + } } internal static class HandlerUtils diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs index 59868c1f40..07c6aa1796 100644 --- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs +++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs @@ -19,7 +19,10 @@ using System; using System.Collections.Generic; using System.Collections.ObjectModel; +using System.Linq; +using Grpc.Core.Interceptors; using Grpc.Core.Internal; +using Grpc.Core.Utils; namespace Grpc.Core { @@ -32,7 +35,7 @@ namespace Grpc.Core { readonly ReadOnlyDictionary<string, IServerCallHandler> callHandlers; - private ServerServiceDefinition(Dictionary<string, IServerCallHandler> callHandlers) + internal ServerServiceDefinition(Dictionary<string, IServerCallHandler> callHandlers) { this.callHandlers = new ReadOnlyDictionary<string, IServerCallHandler>(callHandlers); } diff --git a/src/csharp/tests.json b/src/csharp/tests.json index 469328af1a..60f67ff3c9 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -1,5 +1,7 @@ { "Grpc.Core.Tests": [ + "Grpc.Core.Interceptors.Tests.ClientInterceptorTest", + "Grpc.Core.Interceptors.Tests.ServerInterceptorTest", "Grpc.Core.Internal.Tests.AsyncCallServerTest", "Grpc.Core.Internal.Tests.AsyncCallTest", "Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest", @@ -59,4 +61,4 @@ "Grpc.Reflection.Tests.ReflectionClientServerTest", "Grpc.Reflection.Tests.SymbolRegistryTest" ] -}
\ No newline at end of file +} diff --git a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.mm b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.mm index d91b5cf99e..33ccdb5844 100644 --- a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.mm +++ b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.mm @@ -224,8 +224,7 @@ static char *roots_filename; } - (void)testBinaryMetadata { - // NOT SUPPORTED - //[self testIndividualCase:(char *)"binary_metadata"]; + [self testIndividualCase:(char *)"binary_metadata"]; } - (void)testCallCreds { |