From efc5ae8008d8778d98a98616df5e5367e44babb6 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 14 Feb 2018 17:17:58 -0800 Subject: Add binary metadata support for cronet transport --- .../transport/cronet/transport/cronet_transport.cc | 65 ++++++++++++++++++---- .../CoreCronetEnd2EndTests.mm | 3 +- 2 files changed, 55 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 8904c122a7..283417bcbe 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -24,6 +24,8 @@ #include #include +#include "src/core/ext/transport/chttp2/transport/bin_decoder.h" +#include "src/core/ext/transport/chttp2/transport/bin_encoder.h" #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include "src/core/ext/transport/cronet/transport/cronet_transport.h" #include "src/core/lib/gpr/host_port.h" @@ -214,6 +216,26 @@ void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); } static enum e_op_result execute_stream_op(struct op_and_state* oas); +static const size_t tail_xtra[4] = {0, 0, 1, 2}; + +static size_t infer_length_after_decode(const grpc_slice& slice) { + size_t len = GRPC_SLICE_LENGTH(slice); + const uint8_t* bytes = GRPC_SLICE_START_PTR(slice); + while (len > 0 && bytes[len - 1] == '=') { + len--; + } + size_t tuples = len / 4; + size_t tail_case = len % 4; + if (tail_case == 1) { + gpr_log(GPR_ERROR, + "Base64 decoding failed. Input has a length of %zu (withou " + "padding), which is invalid.\n", + len); + tail_case = 0; + } + return tuples * 3 + tail_xtra[tail_case]; +} + /* Utility function to translate enum into string for printing */ @@ -518,14 +540,21 @@ static void on_response_headers_received( grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, s->arena); for (size_t i = 0; i < headers->count; i++) { + grpc_slice key = grpc_slice_intern( + grpc_slice_from_static_string(headers->headers[i].key)); + grpc_slice value; + if (grpc_is_binary_header(key)) { + value = grpc_slice_from_static_string(headers->headers[i].value); + value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length( + value, infer_length_after_decode(value))); + } else { + value = grpc_slice_intern( + grpc_slice_from_static_string(headers->headers[i].value)); + } GRPC_LOG_IF_ERROR("on_response_headers_received", grpc_chttp2_incoming_metadata_buffer_add( &s->state.rs.initial_metadata, - grpc_mdelem_from_slices( - grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].key)), - grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].value))))); + grpc_mdelem_from_slices(key, value))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -624,14 +653,21 @@ static void on_response_trailers_received( for (size_t i = 0; i < trailers->count; i++) { CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, trailers->headers[i].value); + grpc_slice key = grpc_slice_intern( + grpc_slice_from_static_string(trailers->headers[i].key)); + grpc_slice value; + if (grpc_is_binary_header(key)) { + value = grpc_slice_from_static_string(trailers->headers[i].value); + value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length( + value, infer_length_after_decode(value))); + } else { + value = grpc_slice_intern( + grpc_slice_from_static_string(trailers->headers[i].value)); + } GRPC_LOG_IF_ERROR("on_response_trailers_received", grpc_chttp2_incoming_metadata_buffer_add( &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices( - grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].key)), - grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].value))))); + grpc_mdelem_from_slices(key, value))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { @@ -721,7 +757,14 @@ static void convert_metadata_to_cronet_headers( grpc_mdelem mdelem = curr->md; curr = curr->next; char* key = grpc_slice_to_c_string(GRPC_MDKEY(mdelem)); - char* value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem)); + char* value; + if (grpc_is_binary_header(GRPC_MDKEY(mdelem))) { + grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem)); + value = grpc_slice_to_c_string(wire_value); + grpc_slice_unref(wire_value); + } else { + value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem)); + } if (grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_SCHEME) || grpc_slice_eq(GRPC_MDKEY(mdelem), GRPC_MDSTR_AUTHORITY)) { /* Cronet populates these fields on its own */ diff --git a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.mm b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.mm index d91b5cf99e..33ccdb5844 100644 --- a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.mm +++ b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.mm @@ -224,8 +224,7 @@ static char *roots_filename; } - (void)testBinaryMetadata { - // NOT SUPPORTED - //[self testIndividualCase:(char *)"binary_metadata"]; + [self testIndividualCase:(char *)"binary_metadata"]; } - (void)testCallCreds { -- cgit v1.2.3 From ec42f328349efdbd88a267f74bde8e31dd6c7d68 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Thu, 15 Feb 2018 14:27:14 -0800 Subject: Fix typo --- src/core/ext/transport/cronet/transport/cronet_transport.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 283417bcbe..9dce5c99ca 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -228,8 +228,8 @@ static size_t infer_length_after_decode(const grpc_slice& slice) { size_t tail_case = len % 4; if (tail_case == 1) { gpr_log(GPR_ERROR, - "Base64 decoding failed. Input has a length of %zu (withou " - "padding), which is invalid.\n", + "Base64 decoding failed. Input has a length of %zu (without" + " padding), which is invalid.\n", len); tail_case = 0; } -- cgit v1.2.3 From 3ef4af2eae875abc0305ddfc553946d4f4c686b1 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 21 Feb 2018 07:53:26 -0800 Subject: Fix crash when failing to create an LB policy. --- src/core/ext/filters/client_channel/client_channel.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 174a15b447..9a8f25b630 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -446,7 +446,6 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { chand->lb_policy->UpdateLocked(*chand->resolver_result); } else { // Instantiate new LB policy. - lb_policy_created = true; grpc_core::LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.combiner = chand->combiner; lb_policy_args.client_channel_factory = chand->client_channel_factory; @@ -458,6 +457,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); } else { + lb_policy_created = true; reresolution_request_args* args = static_cast( gpr_zalloc(sizeof(*args))); -- cgit v1.2.3 From 09bd5c0b1b9d5748b8a272e8495ecd0c9fd1602c Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 21 Feb 2018 15:18:07 -0800 Subject: convert cronet headers to metadata in one function --- .../transport/cronet/transport/cronet_transport.cc | 63 ++++++++++------------ 1 file changed, 28 insertions(+), 35 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 9dce5c99ca..6c73c80eda 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -415,6 +415,29 @@ static void execute_from_storage(stream_obj* s) { gpr_mu_unlock(&s->mu); } +static void convert_cronet_array_to_metadata( + const bidirectional_stream_header_array* header_array, + grpc_chttp2_incoming_metadata_buffer* mds) { + for (size_t i = 0; i < header_array->count; i++) { + CRONET_LOG(GPR_DEBUG, "header key=%s, value=%s", + header_array->headers[i].key, header_array->headers[i].value); + grpc_slice key = grpc_slice_intern( + grpc_slice_from_static_string(header_array->headers[i].key)); + grpc_slice value; + if (grpc_is_binary_header(key)) { + value = grpc_slice_from_static_string(header_array->headers[i].value); + value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length( + value, infer_length_after_decode(value))); + } else { + value = grpc_slice_intern( + grpc_slice_from_static_string(header_array->headers[i].value)); + } + GRPC_LOG_IF_ERROR("convert_cronet_array_to_metadata", + grpc_chttp2_incoming_metadata_buffer_add( + mds, grpc_mdelem_from_slices(key, value))); + } +} + /* Cronet callback */ @@ -539,23 +562,7 @@ static void on_response_headers_received( sizeof(s->state.rs.initial_metadata)); grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, s->arena); - for (size_t i = 0; i < headers->count; i++) { - grpc_slice key = grpc_slice_intern( - grpc_slice_from_static_string(headers->headers[i].key)); - grpc_slice value; - if (grpc_is_binary_header(key)) { - value = grpc_slice_from_static_string(headers->headers[i].value); - value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length( - value, infer_length_after_decode(value))); - } else { - value = grpc_slice_intern( - grpc_slice_from_static_string(headers->headers[i].value)); - } - GRPC_LOG_IF_ERROR("on_response_headers_received", - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.initial_metadata, - grpc_mdelem_from_slices(key, value))); - } + convert_cronet_array_to_metadata(headers, &s->state.rs.initial_metadata); s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || s->state.state_callback_received[OP_FAILED])) { @@ -650,25 +657,11 @@ static void on_response_trailers_received( s->state.rs.trailing_metadata_valid = false; grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, s->arena); - for (size_t i = 0; i < trailers->count; i++) { - CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, - trailers->headers[i].value); - grpc_slice key = grpc_slice_intern( - grpc_slice_from_static_string(trailers->headers[i].key)); - grpc_slice value; - if (grpc_is_binary_header(key)) { - value = grpc_slice_from_static_string(trailers->headers[i].value); - value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length( - value, infer_length_after_decode(value))); - } else { - value = grpc_slice_intern( - grpc_slice_from_static_string(trailers->headers[i].value)); - } - GRPC_LOG_IF_ERROR("on_response_trailers_received", - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices(key, value))); + convert_cronet_array_to_metadata(trailers, &s->state.rs.trailing_metadata); + if (trailers->count > 0) { s->state.rs.trailing_metadata_valid = true; + } + for (size_t i = 0; i < trailers->count; i++) { if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { s->state.fail_state = true; -- cgit v1.2.3 From 9ef1f359ca1dec335300067e6d4e3ce4da24b584 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 21 Feb 2018 15:26:54 -0800 Subject: Put infer_length_after_decode in bin_decoder --- .../ext/transport/chttp2/transport/bin_decoder.cc | 18 ++++++++++++++++++ .../ext/transport/chttp2/transport/bin_decoder.h | 3 +++ .../transport/cronet/transport/cronet_transport.cc | 22 +--------------------- 3 files changed, 22 insertions(+), 21 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.cc b/src/core/ext/transport/chttp2/transport/bin_decoder.cc index 91980e6974..53b8622259 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.cc +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.cc @@ -75,6 +75,24 @@ static bool input_is_valid(uint8_t* input_ptr, size_t length) { #define COMPOSE_OUTPUT_BYTE_2(input_ptr) \ (uint8_t)((decode_table[input_ptr[2]] << 6) | decode_table[input_ptr[3]]) +size_t grpc_base64_infer_length_after_decode(const grpc_slice& slice) { + size_t len = GRPC_SLICE_LENGTH(slice); + const uint8_t* bytes = GRPC_SLICE_START_PTR(slice); + while (len > 0 && bytes[len - 1] == '=') { + len--; + } + size_t tuples = len / 4; + size_t tail_case = len % 4; + if (tail_case == 1) { + gpr_log(GPR_ERROR, + "Base64 decoding failed. Input has a length of %zu (without" + " padding), which is invalid.\n", + len); + tail_case = 0; + } + return tuples * 3 + tail_xtra[tail_case]; +} + bool grpc_base64_decode_partial(struct grpc_base64_decode_context* ctx) { size_t input_tail; diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.h b/src/core/ext/transport/chttp2/transport/bin_decoder.h index 9cb75ccd81..b5acb3fa91 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.h +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.h @@ -48,4 +48,7 @@ grpc_slice grpc_chttp2_base64_decode(grpc_slice input); grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input, size_t output_length); +/* Infer the length of decoded data from encoded data. */ +size_t grpc_base64_infer_length_after_decode(const grpc_slice& slice); + #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H */ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 6c73c80eda..627082427d 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -216,26 +216,6 @@ void grpc_cronet_stream_unref(stream_obj* s) { grpc_stream_unref(s->refcount); } static enum e_op_result execute_stream_op(struct op_and_state* oas); -static const size_t tail_xtra[4] = {0, 0, 1, 2}; - -static size_t infer_length_after_decode(const grpc_slice& slice) { - size_t len = GRPC_SLICE_LENGTH(slice); - const uint8_t* bytes = GRPC_SLICE_START_PTR(slice); - while (len > 0 && bytes[len - 1] == '=') { - len--; - } - size_t tuples = len / 4; - size_t tail_case = len % 4; - if (tail_case == 1) { - gpr_log(GPR_ERROR, - "Base64 decoding failed. Input has a length of %zu (without" - " padding), which is invalid.\n", - len); - tail_case = 0; - } - return tuples * 3 + tail_xtra[tail_case]; -} - /* Utility function to translate enum into string for printing */ @@ -427,7 +407,7 @@ static void convert_cronet_array_to_metadata( if (grpc_is_binary_header(key)) { value = grpc_slice_from_static_string(header_array->headers[i].value); value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length( - value, infer_length_after_decode(value))); + value, grpc_base64_infer_length_after_decode(value))); } else { value = grpc_slice_intern( grpc_slice_from_static_string(header_array->headers[i].value)); -- cgit v1.2.3 From 6c9a87c174189a767eca3bc387f36f4d5880cde3 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 21 Feb 2018 16:06:25 -0800 Subject: Polish infer_length_after_decode and add test cases --- .../ext/transport/chttp2/transport/bin_decoder.cc | 12 +++++++-- .../ext/transport/chttp2/transport/bin_decoder.h | 2 +- test/core/transport/chttp2/bin_decoder_test.cc | 30 ++++++++++++++++++++++ 3 files changed, 41 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.cc b/src/core/ext/transport/chttp2/transport/bin_decoder.cc index 53b8622259..831a36961e 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.cc +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.cc @@ -75,12 +75,20 @@ static bool input_is_valid(uint8_t* input_ptr, size_t length) { #define COMPOSE_OUTPUT_BYTE_2(input_ptr) \ (uint8_t)((decode_table[input_ptr[2]] << 6) | decode_table[input_ptr[3]]) -size_t grpc_base64_infer_length_after_decode(const grpc_slice& slice) { +// By RFC 4648, if the length of the encoded string without padding is 4n+r, +// the length of decoded string is: 1) 3n if r = 0, 2) 3n + 1 if r = 2, 3, or +// 3) invalid if r = 1. +size_t grpc_chttp2_base64_infer_length_after_decode(const grpc_slice& slice) { size_t len = GRPC_SLICE_LENGTH(slice); const uint8_t* bytes = GRPC_SLICE_START_PTR(slice); while (len > 0 && bytes[len - 1] == '=') { len--; } + if (GRPC_SLICE_LENGTH(slice) - len > 2) { + gpr_log(GPR_ERROR, + "Base64 decoding failed. Input has more than 2 paddings."); + return 0; + } size_t tuples = len / 4; size_t tail_case = len % 4; if (tail_case == 1) { @@ -88,7 +96,7 @@ size_t grpc_base64_infer_length_after_decode(const grpc_slice& slice) { "Base64 decoding failed. Input has a length of %zu (without" " padding), which is invalid.\n", len); - tail_case = 0; + return 0; } return tuples * 3 + tail_xtra[tail_case]; } diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.h b/src/core/ext/transport/chttp2/transport/bin_decoder.h index b5acb3fa91..a0d74fb20d 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.h +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.h @@ -49,6 +49,6 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_slice input, size_t output_length); /* Infer the length of decoded data from encoded data. */ -size_t grpc_base64_infer_length_after_decode(const grpc_slice& slice); +size_t grpc_chttp2_base64_infer_length_after_decode(const grpc_slice& slice); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_DECODER_H */ diff --git a/test/core/transport/chttp2/bin_decoder_test.cc b/test/core/transport/chttp2/bin_decoder_test.cc index 283eebbacf..751dd90c8c 100644 --- a/test/core/transport/chttp2/bin_decoder_test.cc +++ b/test/core/transport/chttp2/bin_decoder_test.cc @@ -67,6 +67,16 @@ static grpc_slice base64_decode_with_length(const char* s, return out; } +static size_t base64_infer_length(const char* s) { + grpc_slice ss = grpc_slice_from_copied_string(s); + size_t out = grpc_chttp2_base64_infer_length_after_decode(ss); + grpc_slice_unref_internal(ss); + return out; +} + +#define EXPECT_DECODED_LENGTH(s, expected) \ + GPR_ASSERT((expected) == base64_infer_length((s))); + #define EXPECT_SLICE_EQ(expected, slice) \ expect_slice_eq( \ grpc_slice_from_copied_buffer(expected, sizeof(expected) - 1), slice, \ @@ -131,6 +141,26 @@ int main(int argc, char** argv) { // Test illegal charactors in grpc_chttp2_base64_decode_with_length EXPECT_SLICE_EQ("", base64_decode_with_length("Zm:v", 3)); EXPECT_SLICE_EQ("", base64_decode_with_length("Zm=v", 3)); + + EXPECT_DECODED_LENGTH("", 0); + EXPECT_DECODED_LENGTH("ab", 1); + EXPECT_DECODED_LENGTH("abc", 2); + EXPECT_DECODED_LENGTH("abcd", 3); + EXPECT_DECODED_LENGTH("abcdef", 4); + EXPECT_DECODED_LENGTH("abcdefg", 5); + EXPECT_DECODED_LENGTH("abcdefgh", 6); + + EXPECT_DECODED_LENGTH("ab==", 1); + EXPECT_DECODED_LENGTH("abc=", 2); + EXPECT_DECODED_LENGTH("abcd", 3); + EXPECT_DECODED_LENGTH("abcdef==", 4); + EXPECT_DECODED_LENGTH("abcdefg=", 5); + EXPECT_DECODED_LENGTH("abcdefgh", 6); + + EXPECT_DECODED_LENGTH("a", 0); + EXPECT_DECODED_LENGTH("a===", 0); + EXPECT_DECODED_LENGTH("abcde", 0); + EXPECT_DECODED_LENGTH("abcde===", 0); } grpc_shutdown(); return all_ok ? 0 : 1; -- cgit v1.2.3 From 48396479c04d3ba40ad46be768bdf79840a65cbc Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 21 Feb 2018 17:52:39 -0800 Subject: Build typo fix --- src/core/ext/transport/cronet/transport/cronet_transport.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 627082427d..c367f9c465 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -407,7 +407,7 @@ static void convert_cronet_array_to_metadata( if (grpc_is_binary_header(key)) { value = grpc_slice_from_static_string(header_array->headers[i].value); value = grpc_slice_intern(grpc_chttp2_base64_decode_with_length( - value, grpc_base64_infer_length_after_decode(value))); + value, grpc_chttp2_base64_infer_length_after_decode(value))); } else { value = grpc_slice_intern( grpc_slice_from_static_string(header_array->headers[i].value)); -- cgit v1.2.3 From 6cde06129f5e100dd94b0318c97e863d5e02c4b1 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Sun, 11 Feb 2018 20:07:31 -0800 Subject: Add C# Interceptor base class --- src/csharp/Grpc.Core/Interceptors/Interceptor.cs | 281 +++++++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 src/csharp/Grpc.Core/Interceptors/Interceptor.cs (limited to 'src') diff --git a/src/csharp/Grpc.Core/Interceptors/Interceptor.cs b/src/csharp/Grpc.Core/Interceptors/Interceptor.cs new file mode 100644 index 0000000000..0f32e8b420 --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/Interceptor.cs @@ -0,0 +1,281 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Reflection; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Interceptors +{ + /// + /// Carries along the context associated with intercepted invocations on the client side. + /// This is an EXPERIMENTAL API. + /// + public class ClientInterceptorContext + where TRequest : class + where TResponse : class + { + /// + /// Creates a new instance of + /// with the specified method, host, and call options. + /// + /// A object representing the method to be invoked. + /// The host to dispatch the current call to. + /// A instance containing the call options of the current call. + + public ClientInterceptorContext(Method method, string host, CallOptions options) + { + Method = method; + Host = host; + Options = options; + } + + /// + /// Gets the representing + /// the method to be invoked. + /// + public Method Method { get; } + + /// + /// Gets the host that the currect invocation will be dispatched to. + /// + public string Host { get; } + + /// + /// Gets the structure representing the + /// call options associated with the current invocation. + /// + public CallOptions Options { get; } + } + + /// + /// Serves as the base class for gRPC interceptors. + /// This is an EXPERIMENTAL API. + /// + public abstract class Interceptor + { + /// + /// Represents a continuation for intercepting simple blocking invocations. + /// + /// Request message type for this invocation. + /// Response message type for this invocation. + /// The request value to continue the invocation with. + /// + /// The + /// instance to pass to the next step in the invocation process. + /// + public delegate TResponse BlockingUnaryCallContinuation(TRequest request, ClientInterceptorContext context) + where TRequest : class + where TResponse : class; + + /// + /// Represents a continuation for intercepting simple asynchronous invocations. + /// + /// Request message type for this invocation. + /// Response message type for this invocation. + /// The request value to continue the invocation with. + /// + /// The + /// instance to pass to the next step in the invocation process. + /// + public delegate AsyncUnaryCall AsyncUnaryCallContinuation(TRequest request, ClientInterceptorContext context) + where TRequest : class + where TResponse : class; + + /// + /// Represents a continuation for intercepting asynchronous server-streaming invocations. + /// + /// Request message type for this invocation. + /// Response message type for this invocation. + /// The request value to continue the invocation with. + /// + /// The + /// instance to pass to the next step in the invocation process. + /// + public delegate AsyncServerStreamingCall AsyncServerStreamingCallContinuation(TRequest request, ClientInterceptorContext context) + where TRequest : class + where TResponse : class; + + /// + /// Represents a continuation for intercepting asynchronous client-streaming invocations. + /// + /// Request message type for this invocation. + /// Response message type for this invocation. + /// + /// The + /// instance to pass to the next step in the invocation process. + /// + public delegate AsyncClientStreamingCall AsyncClientStreamingCallContinuation(ClientInterceptorContext context) + where TRequest : class + where TResponse : class; + + /// + /// Represents a continuation for intercepting asynchronous duplex invocations. + /// + /// + /// The + /// instance to pass to the next step in the invocation process. + /// + public delegate AsyncDuplexStreamingCall AsyncDuplexStreamingCallContinuation(ClientInterceptorContext context) + where TRequest : class + where TResponse : class; + + /// + /// Intercepts a blocking invocation of a simple remote call. + /// + /// The request message of the invocation. + /// + /// The + /// associated with the current invocation. + /// + /// + /// The callback that continues the invocation process. + /// This can be invoked zero or more times by the interceptor. + /// + /// The response message of the current invocation. + public virtual TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) + where TRequest : class + where TResponse : class + { + return continuation(request, context); + } + + /// + /// Intercepts an asynchronous invocation of a simple remote call. + /// + /// The request message of the invocation. + /// + /// The + /// associated with the current invocation. + /// + /// + /// The callback that continues the invocation process. + /// This can be invoked zero or more times by the interceptor. + /// + public virtual AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) + where TRequest : class + where TResponse : class + { + return continuation(request, context); + } + + /// + /// Intercepts an asynchronous invocation of a streaming remote call. + /// + /// The request message of the invocation. + /// + /// The + /// associated with the current invocation. + /// + /// + /// The callback that continues the invocation process. + /// This can be invoked zero or more times by the interceptor. + /// + public virtual AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) + where TRequest : class + where TResponse : class + { + return continuation(request, context); + } + + /// + /// Intercepts an asynchronous invocation of a client streaming call. + /// + /// + /// The + /// associated with the current invocation. + /// + /// + /// The callback that continues the invocation process. + /// This can be invoked zero or more times by the interceptor. + /// + public virtual AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) + where TRequest : class + where TResponse : class + { + return continuation(context); + } + + /// + /// Intercepts an asynchronous invocation of a duplex streaming call. + /// + /// + /// The + /// associated with the current invocation. + /// + /// + /// The callback that continues the invocation process. + /// This can be invoked zero or more times by the interceptor. + /// + public virtual AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) + where TRequest : class + where TResponse : class + { + return continuation(context); + } + + /// + /// Server-side handler for intercepting unary calls. + /// + /// Request message type for this method. + /// Response message type for this method. + public virtual Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) + where TRequest : class + where TResponse : class + { + return continuation(request, context); + } + + /// + /// Server-side handler for intercepting client streaming call. + /// + /// Request message type for this method. + /// Response message type for this method. + public virtual Task ClientStreamingServerHandler(IAsyncStreamReader requestStream, ServerCallContext context, ClientStreamingServerMethod continuation) + where TRequest : class + where TResponse : class + { + return continuation(requestStream, context); + } + + /// + /// Server-side handler for intercepting server streaming calls. + /// + /// Request message type for this method. + /// Response message type for this method. + public virtual Task ServerStreamingServerHandler(TRequest request, IServerStreamWriter responseStream, ServerCallContext context, ServerStreamingServerMethod continuation) + where TRequest : class + where TResponse : class + { + return continuation(request, responseStream, context); + } + + /// + /// Server-side handler for intercepting bidi streaming calls. + /// + /// Request message type for this method. + /// Response message type for this method. + public virtual Task DuplexStreamingServerHandler(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context, DuplexStreamingServerMethod continuation) + where TRequest : class + where TResponse : class + { + return continuation(requestStream, responseStream, context); + } + } +} -- cgit v1.2.3 From b8e362455458a3697fa7194c9c084584467b5b80 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Sun, 11 Feb 2018 20:10:29 -0800 Subject: Add C# client-side interceptor machinery --- src/csharp/Grpc.Core/ClientBase.cs | 62 +++++++++- .../Interceptors/CallInvokerExtensions.cs | 137 +++++++++++++++++++++ .../Grpc.Core/Interceptors/ChannelExtensions.cs | 54 ++++++++ .../Grpc.Core/Internal/InterceptingCallInvoker.cs | 119 ------------------ 4 files changed, 252 insertions(+), 120 deletions(-) create mode 100644 src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs create mode 100644 src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs delete mode 100644 src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs (limited to 'src') diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index 2d41b29fa0..d64ce7dd94 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -16,6 +16,8 @@ #endregion +using System; +using Grpc.Core.Interceptors; using Grpc.Core.Internal; using Grpc.Core.Utils; @@ -147,6 +149,64 @@ namespace Grpc.Core /// protected internal class ClientBaseConfiguration { + private class ClientHeaderInterceptor : Interceptor + { + readonly Func> interceptor; + + /// + /// Creates a new instance of ClientHeaderInterceptor given the specified header interceptor function. + /// + public ClientHeaderInterceptor(Func> interceptor) + { + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + } + + /// + /// Intercepts a blocking invocation of a simple remote call. + /// + public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) + { + var newHeaders = interceptor(context.Method, context.Host, context.Options); + return continuation(request, new ClientInterceptorContext(context.Method, newHeaders.Item1, newHeaders.Item2)); + } + + /// + /// Intercepts an asynchronous invocation of a simple remote call. + /// + public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) + { + var newHeaders = interceptor(context.Method, context.Host, context.Options); + return continuation(request, new ClientInterceptorContext(context.Method, newHeaders.Item1, newHeaders.Item2)); + } + + /// + /// Intercepts an asynchronous invocation of a streaming remote call. + /// + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) + { + var newHeaders = interceptor(context.Method, context.Host, context.Options); + return continuation(request, new ClientInterceptorContext(context.Method, newHeaders.Item1, newHeaders.Item2)); + } + + /// + /// Intercepts an asynchronous invocation of a client streaming call. + /// + public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) + { + var newHeaders = interceptor(context.Method, context.Host, context.Options); + return continuation(new ClientInterceptorContext(context.Method, newHeaders.Item1, newHeaders.Item2)); + } + + /// + /// Intercepts an asynchronous invocation of a duplex streaming call. + /// + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) + { + var newHeaders = interceptor(context.Method, context.Host, context.Options); + return continuation(new ClientInterceptorContext(context.Method, newHeaders.Item1, newHeaders.Item2)); + } + } + readonly CallInvoker undecoratedCallInvoker; readonly string host; @@ -158,7 +218,7 @@ namespace Grpc.Core internal CallInvoker CreateDecoratedCallInvoker() { - return new InterceptingCallInvoker(undecoratedCallInvoker, hostInterceptor: (h) => host); + return undecoratedCallInvoker.Intercept(new ClientHeaderInterceptor((method, host, options) => Tuple.Create(this.host, options))); } internal ClientBaseConfiguration WithHost(string host) diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs new file mode 100644 index 0000000000..26e9f8802d --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs @@ -0,0 +1,137 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Linq; +using Grpc.Core.Utils; + +namespace Grpc.Core.Interceptors +{ + /// + /// Extends the CallInvoker class to provide the interceptor facility on the client side. + /// This is an EXPERIMENTAL API. + /// + public static class CallInvokerExtensions + { + /// + /// Decorates an underlying to + /// intercept calls through a given interceptor. + /// + private class InterceptingCallInvoker : CallInvoker + { + readonly CallInvoker invoker; + readonly Interceptor interceptor; + + /// + /// Creates a new instance of + /// with the given underlying invoker and interceptor instances. + /// + public InterceptingCallInvoker(CallInvoker invoker, Interceptor interceptor) + { + this.invoker = GrpcPreconditions.CheckNotNull(invoker, "invoker"); + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + } + + /// + /// Intercepts a simple blocking call with the registered interceptor. + /// + public override TResponse BlockingUnaryCall(Method method, string host, CallOptions options, TRequest request) + { + return interceptor.BlockingUnaryCall( + request, + new ClientInterceptorContext(method, host, options), + (req, ctx) => invoker.BlockingUnaryCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// + /// Intercepts a simple asynchronous call with the registered interceptor. + /// + public override AsyncUnaryCall AsyncUnaryCall(Method method, string host, CallOptions options, TRequest request) + { + return interceptor.AsyncUnaryCall( + request, + new ClientInterceptorContext(method, host, options), + (req, ctx) => invoker.AsyncUnaryCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// + /// Intercepts an asynchronous server streaming call with the registered interceptor. + /// + public override AsyncServerStreamingCall AsyncServerStreamingCall(Method method, string host, CallOptions options, TRequest request) + { + return interceptor.AsyncServerStreamingCall( + request, + new ClientInterceptorContext(method, host, options), + (req, ctx) => invoker.AsyncServerStreamingCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// + /// Intercepts an asynchronous client streaming call with the registered interceptor. + /// + public override AsyncClientStreamingCall AsyncClientStreamingCall(Method method, string host, CallOptions options) + { + return interceptor.AsyncClientStreamingCall( + new ClientInterceptorContext(method, host, options), + ctx => invoker.AsyncClientStreamingCall(ctx.Method, ctx.Host, ctx.Options)); + } + + /// + /// Intercepts an asynchronous duplex streaming call with the registered interceptor. + /// + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(Method method, string host, CallOptions options) + { + return interceptor.AsyncDuplexStreamingCall( + new ClientInterceptorContext(method, host, options), + ctx => invoker.AsyncDuplexStreamingCall(ctx.Method, ctx.Host, ctx.Options)); + } + } + + /// + /// Returns a instance that intercepts + /// the invoker with the given interceptor. + /// + /// The underlying invoker to intercept. + /// The interceptor to intercept calls to the invoker with. + public static CallInvoker Intercept(this CallInvoker invoker, Interceptor interceptor) + { + return new InterceptingCallInvoker(invoker, interceptor); + } + + /// + /// Returns a instance that intercepts + /// the invoker with the given interceptors. + /// + /// The channel to intercept. + /// + /// An array of interceptors to intercept the calls to the invoker with. + /// Control is passed to the interceptors in the order specified. + /// + public static CallInvoker Intercept(this CallInvoker invoker, params Interceptor[] interceptors) + { + GrpcPreconditions.CheckNotNull(invoker, "invoker"); + GrpcPreconditions.CheckNotNull(interceptors, "interceptors"); + + foreach (var interceptor in interceptors.Reverse()) + { + invoker = Intercept(invoker, interceptor); + } + + return invoker; + } + } +} diff --git a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs new file mode 100644 index 0000000000..1a54b93dae --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs @@ -0,0 +1,54 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; + +namespace Grpc.Core.Interceptors +{ + /// + /// Provides extension methods to make it easy to register interceptors on Channel objects. + /// This is an EXPERIMENTAL API. + /// + public static class ChannelExtensions + { + /// + /// Returns a instance that intercepts + /// the channel with the given interceptor. + /// + /// The channel to intercept. + /// The interceptor to intercept the channel with. + public static CallInvoker Intercept(this Channel channel, Interceptor interceptor) + { + return new DefaultCallInvoker(channel).Intercept(interceptor); + } + + /// + /// Returns a instance that intercepts + /// the channel with the given interceptors. + /// + /// The channel to intercept. + /// + /// An array of interceptors to intercept the channel with. + /// Control is passed to the interceptors in the order specified. + /// + public static CallInvoker Intercept(this Channel channel, params Interceptor[] interceptors) + { + return new DefaultCallInvoker(channel).Intercept(interceptors); + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs b/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs deleted file mode 100644 index eb4c7d97a7..0000000000 --- a/src/csharp/Grpc.Core/Internal/InterceptingCallInvoker.cs +++ /dev/null @@ -1,119 +0,0 @@ -#region Copyright notice and license - -// Copyright 2015-2016 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#endregion - -using System; -using System.Threading.Tasks; -using Grpc.Core; -using Grpc.Core.Utils; - -namespace Grpc.Core.Internal -{ - /// - /// Decorates an underlying CallInvoker to intercept call invocations. - /// - internal class InterceptingCallInvoker : CallInvoker - { - readonly CallInvoker callInvoker; - readonly Func hostInterceptor; - readonly Func callOptionsInterceptor; - - /// - /// Initializes a new instance of the class. - /// - public InterceptingCallInvoker(CallInvoker callInvoker, - Func hostInterceptor = null, - Func callOptionsInterceptor = null) - { - this.callInvoker = GrpcPreconditions.CheckNotNull(callInvoker); - this.hostInterceptor = hostInterceptor; - this.callOptionsInterceptor = callOptionsInterceptor; - } - - /// - /// Intercepts a unary call. - /// - public override TResponse BlockingUnaryCall(Method method, string host, CallOptions options, TRequest request) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.BlockingUnaryCall(method, host, options, request); - } - - /// - /// Invokes a simple remote call asynchronously. - /// - public override AsyncUnaryCall AsyncUnaryCall(Method method, string host, CallOptions options, TRequest request) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncUnaryCall(method, host, options, request); - } - - /// - /// Invokes a server streaming call asynchronously. - /// In server streaming scenario, client sends on request and server responds with a stream of responses. - /// - public override AsyncServerStreamingCall AsyncServerStreamingCall(Method method, string host, CallOptions options, TRequest request) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncServerStreamingCall(method, host, options, request); - } - - /// - /// Invokes a client streaming call asynchronously. - /// In client streaming scenario, client sends a stream of requests and server responds with a single response. - /// - public override AsyncClientStreamingCall AsyncClientStreamingCall(Method method, string host, CallOptions options) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncClientStreamingCall(method, host, options); - } - - /// - /// Invokes a duplex streaming call asynchronously. - /// In duplex streaming scenario, client sends a stream of requests and server responds with a stream of responses. - /// The response stream is completely independent and both side can be sending messages at the same time. - /// - public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(Method method, string host, CallOptions options) - { - host = InterceptHost(host); - options = InterceptCallOptions(options); - return callInvoker.AsyncDuplexStreamingCall(method, host, options); - } - - private string InterceptHost(string host) - { - if (hostInterceptor == null) - { - return host; - } - return hostInterceptor(host); - } - - private CallOptions InterceptCallOptions(CallOptions options) - { - if (callOptionsInterceptor == null) - { - return options; - } - return callOptionsInterceptor(options); - } - } -} -- cgit v1.2.3 From 4df68ae3305049fdaedcb32b8029185ea105e62f Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Sun, 11 Feb 2018 20:11:07 -0800 Subject: Add C# server-side interceptor machinery --- src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 42 +++++++++++++++++----- src/csharp/Grpc.Core/ServerServiceDefinition.cs | 25 +++++++++++++ 2 files changed, 59 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 98995a0862..f9bf40f237 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -21,6 +21,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using Grpc.Core.Interceptors; using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Utils; @@ -32,7 +33,12 @@ namespace Grpc.Core.Internal Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq); } - internal class UnaryServerCallHandler : IServerCallHandler + internal interface IInterceptableCallHandler + { + IServerCallHandler Intercept(Interceptor interceptor); + } + + internal class UnaryServerCallHandler : IServerCallHandler, IInterceptableCallHandler where TRequest : class where TResponse : class { @@ -74,7 +80,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler."); + Logger.Warning(e, "Exception occured in handler or interceptors."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -89,9 +95,14 @@ namespace Grpc.Core.Internal } await finishedTask.ConfigureAwait(false); } + + IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor) + { + return new UnaryServerCallHandler(method, (request, context) => interceptor.UnaryServerHandler(request, context, handler)); + } } - internal class ServerStreamingServerCallHandler : IServerCallHandler + internal class ServerStreamingServerCallHandler : IServerCallHandler, IInterceptableCallHandler where TRequest : class where TResponse : class { @@ -131,7 +142,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler."); + Logger.Warning(e, "Exception occured in handler or interceptors."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -147,9 +158,14 @@ namespace Grpc.Core.Internal } await finishedTask.ConfigureAwait(false); } + + IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor) + { + return new ServerStreamingServerCallHandler(method, (request, responseStream, context) => interceptor.ServerStreamingServerHandler(request, responseStream, context, handler)); + } } - internal class ClientStreamingServerCallHandler : IServerCallHandler + internal class ClientStreamingServerCallHandler : IServerCallHandler, IInterceptableCallHandler where TRequest : class where TResponse : class { @@ -189,7 +205,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler."); + Logger.Warning(e, "Exception occured in handler or interceptor."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -205,9 +221,14 @@ namespace Grpc.Core.Internal } await finishedTask.ConfigureAwait(false); } + + IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor) + { + return new ClientStreamingServerCallHandler(method, (requestStream, context) => interceptor.ClientStreamingServerHandler(requestStream, context, handler)); + } } - internal class DuplexStreamingServerCallHandler : IServerCallHandler + internal class DuplexStreamingServerCallHandler : IServerCallHandler, IInterceptableCallHandler where TRequest : class where TResponse : class { @@ -245,7 +266,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler."); + Logger.Warning(e, "Exception occured in handler or interceptor."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -260,6 +281,11 @@ namespace Grpc.Core.Internal } await finishedTask.ConfigureAwait(false); } + + IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor) + { + return new DuplexStreamingServerCallHandler(method, (requestStream, responseStream, context) => interceptor.DuplexStreamingServerHandler(requestStream, responseStream, context, handler)); + } } internal class UnimplementedMethodCallHandler : IServerCallHandler diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs index 59868c1f40..3e6c12884b 100644 --- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs +++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs @@ -19,7 +19,10 @@ using System; using System.Collections.Generic; using System.Collections.ObjectModel; +using System.Linq; +using Grpc.Core.Interceptors; using Grpc.Core.Internal; +using Grpc.Core.Utils; namespace Grpc.Core { @@ -45,6 +48,28 @@ namespace Grpc.Core } } + /// + /// Returns a instance that + /// intercepts calls to the underlying service handler via the given interceptor. + /// This is an EXPERIMENTAL API. + /// + /// The interceptor to register on service. + public ServerServiceDefinition Intercept(Interceptor interceptor) + { + GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + return new ServerServiceDefinition(CallHandlers.ToDictionary( + x => x.Key, x => + { + var value = x.Value; + var interceptable = value as IInterceptableCallHandler; + if (interceptable == null) + { + return value; + } + return interceptable.Intercept(interceptor); + })); + } + /// /// Creates a new builder object for ServerServiceDefinition. /// -- cgit v1.2.3 From e954dff656bcd0e9a1fe305d584857fa063f090d Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Sun, 11 Feb 2018 20:11:28 -0800 Subject: Add basic tests for C# interceptors --- .../Interceptors/ClientInterceptorTest.cs | 76 +++++++++++++++++++++ .../Interceptors/ServerInterceptorTest.cs | 79 ++++++++++++++++++++++ src/csharp/Grpc.Core.Tests/MockServiceHelper.cs | 13 +--- src/csharp/tests.json | 4 +- 4 files changed, 161 insertions(+), 11 deletions(-) create mode 100644 src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs create mode 100644 src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs (limited to 'src') diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs new file mode 100644 index 0000000000..0904c51a48 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs @@ -0,0 +1,76 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Interceptors; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using Grpc.Core.Tests; +using NUnit.Framework; + +namespace Grpc.Core.Interceptors.Tests +{ + public class ClientInterceptorTest + { + private class AddHeaderClientInterceptor : Interceptor + { + readonly Metadata.Entry header; + public AddHeaderClientInterceptor(string key, string value) + { + this.header = new Metadata.Entry(key, value); + } + public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) + { + context.Options.Headers.Add(this.header); + return continuation(request, context); + } + + public Metadata.Entry Header + { + get + { + return this.header; + } + } + } + + const string Host = "127.0.0.1"; + + [Test] + public void AddRequestHeaderInClientInterceptor() + { + var helper = new MockServiceHelper(Host); + var interceptor = new AddHeaderClientInterceptor("x-client-interceptor", "hello world"); + helper.UnaryHandler = new UnaryServerMethod((request, context) => + { + var interceptorHeader = context.RequestHeaders.Last(m => (m.Key == interceptor.Header.Key)).Value; + Assert.AreEqual(interceptorHeader, interceptor.Header.Value); + return Task.FromResult("PASS"); + }); + var server = helper.GetServer(); + server.Start(); + var callInvoker = helper.GetChannel().Intercept(interceptor); + Assert.AreEqual("PASS", callInvoker.BlockingUnaryCall(new Method(MethodType.Unary, MockServiceHelper.ServiceName, "Unary", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions().WithHeaders(new Metadata()), "")); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs new file mode 100644 index 0000000000..57ea3e37d5 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs @@ -0,0 +1,79 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Interceptors; +using Grpc.Core.Internal; +using Grpc.Core.Tests; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Interceptors.Tests +{ + public class ServerInterceptorTest + { + const string Host = "127.0.0.1"; + + private class AddRequestHeaderServerInterceptor : Interceptor + { + readonly Metadata.Entry header; + + public AddRequestHeaderServerInterceptor(string key, string value) + { + this.header = new Metadata.Entry(key, value); + } + + public override async Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) + { + context.RequestHeaders.Add(header); + return await continuation(request, context).ConfigureAwait(false); + } + + public Metadata.Entry Header + { + get + { + return header; + } + } + } + + [Test] + public void AddRequestHeaderInServerInterceptor() + { + var helper = new MockServiceHelper(Host); + var interceptor = new AddRequestHeaderServerInterceptor("x-interceptor", "hello world"); + helper.UnaryHandler = new UnaryServerMethod((request, context) => + { + var interceptorHeader = context.RequestHeaders.Last(m => (m.Key == interceptor.Header.Key)).Value; + Assert.AreEqual(interceptorHeader, interceptor.Header.Value); + return Task.FromResult("PASS"); + }); + helper.ServiceDefinition = helper.ServiceDefinition.Intercept(interceptor); + var server = helper.GetServer(); + server.Start(); + var channel = helper.GetChannel(); + Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "")); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs index 7f4677d57f..a925f865ff 100644 --- a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs +++ b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs @@ -37,7 +37,6 @@ namespace Grpc.Core.Tests public const string ServiceName = "tests.Test"; readonly string host; - readonly ServerServiceDefinition serviceDefinition; readonly IEnumerable channelOptions; readonly Method unaryMethod; @@ -87,7 +86,7 @@ namespace Grpc.Core.Tests marshaller, marshaller); - serviceDefinition = ServerServiceDefinition.CreateBuilder() + ServiceDefinition = ServerServiceDefinition.CreateBuilder() .AddMethod(unaryMethod, (request, context) => unaryHandler(request, context)) .AddMethod(clientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context)) .AddMethod(serverStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context)) @@ -131,7 +130,7 @@ namespace Grpc.Core.Tests // Disable SO_REUSEPORT to prevent https://github.com/grpc/grpc/issues/10755 server = new Server(new[] { new ChannelOption(ChannelOptions.SoReuseport, 0) }) { - Services = { serviceDefinition }, + Services = { ServiceDefinition }, Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } } }; } @@ -178,13 +177,7 @@ namespace Grpc.Core.Tests } } - public ServerServiceDefinition ServiceDefinition - { - get - { - return this.serviceDefinition; - } - } + public ServerServiceDefinition ServiceDefinition { get; set; } public UnaryServerMethod UnaryHandler { diff --git a/src/csharp/tests.json b/src/csharp/tests.json index 469328af1a..60f67ff3c9 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -1,5 +1,7 @@ { "Grpc.Core.Tests": [ + "Grpc.Core.Interceptors.Tests.ClientInterceptorTest", + "Grpc.Core.Interceptors.Tests.ServerInterceptorTest", "Grpc.Core.Internal.Tests.AsyncCallServerTest", "Grpc.Core.Internal.Tests.AsyncCallTest", "Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest", @@ -59,4 +61,4 @@ "Grpc.Reflection.Tests.ReflectionClientServerTest", "Grpc.Reflection.Tests.SymbolRegistryTest" ] -} \ No newline at end of file +} -- cgit v1.2.3 From ef4d4e8904958cd8eeda11a6cdb8eafef1f83edc Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Sun, 11 Feb 2018 22:57:37 -0800 Subject: Add GenericInterceptor to help writing client interceptors --- .../Grpc.Core/Interceptors/GenericInterceptor.cs | 325 +++++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100644 src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs (limited to 'src') diff --git a/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs b/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs new file mode 100644 index 0000000000..b9fc5e0a19 --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs @@ -0,0 +1,325 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Interceptors +{ + /// + /// Provides a base class for generic interceptor implementations that raises + /// events and hooks to control the RPC lifecycle. + /// + public abstract class GenericInterceptor : Interceptor + { + + /// + /// Provides hooks through which an invocation should be intercepted. + /// + public sealed class ClientCallArbitrator + where TRequest : class + where TResponse : class + { + internal ClientCallArbitrator Freeze() + { + return (ClientCallArbitrator)MemberwiseClone(); + } + /// + /// Override the context for the outgoing invocation. + /// + public ClientInterceptorContext Context { get; set; } + /// + /// Override the request for the outgoing invocation for non-client-streaming invocations. + /// + public TRequest UnaryRequest { get; set; } + /// + /// Delegate that intercepts a response from a non-server-streaming invocation and optionally overrides it. + /// + public Func OnUnaryResponse { get; set; } + /// + /// Delegate that intercepts each request message for a client-streaming invocation and optionally overrides each message. + /// + public Func OnRequestMessage { get; set; } + /// + /// Delegate that intercepts each response message for a server-streaming invocation and optionally overrides each message. + /// + public Func OnResponseMessage { get; set; } + /// + /// Callback that gets invoked when response stream is finished. + /// + public Action OnResponseStreamEnd { get; set; } + /// + /// Callback that gets invoked when request stream is finished. + /// + public Action OnRequestStreamEnd { get; set; } + } + + /// + /// Intercepts an outgoing call from the client side. + /// Derived classes that intend to intercept outgoing invocations from the client side should + /// override this and return the appropriate hooks in the form of a ClientCallArbitrator instance. + /// + /// The context of the outgoing invocation. + /// True if the invocation is client-streaming. + /// True if the invocation is server-streaming. + /// The request message for client-unary invocations, null otherwise. + /// Request message type for the current invocation. + /// Response message type for the current invocation. + /// + /// The derived class should return an instance of ClientCallArbitrator to control the trajectory + /// as they see fit, or null if it does not intend to pursue the invocation any further. + /// + protected virtual ClientCallArbitrator InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + where TRequest : class + where TResponse : class + { + return null; + } + + /// + /// Intercepts a blocking invocation of a simple remote call and dispatches the events accordingly. + /// + public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) + { + var arbitrator = InterceptCall(context, false, false, request)?.Freeze(); + context = arbitrator?.Context ?? context; + request = arbitrator?.UnaryRequest ?? request; + var response = continuation(request, context); + if (arbitrator?.OnUnaryResponse != null) + { + response = arbitrator.OnUnaryResponse(response); + } + return response; + } + + /// + /// Intercepts an asynchronous invocation of a simple remote call and dispatches the events accordingly. + /// + public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) + { + var arbitrator = InterceptCall(context, false, false, request)?.Freeze(); + context = arbitrator?.Context ?? context; + request = arbitrator?.UnaryRequest ?? request; + var response = continuation(request, context); + if (arbitrator?.OnUnaryResponse != null) + { + response = new AsyncUnaryCall(response.ResponseAsync.ContinueWith(unaryResponse => arbitrator.OnUnaryResponse(unaryResponse.Result)), + response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); + } + return response; + } + + /// + /// Intercepts an asynchronous invocation of a streaming remote call and dispatches the events accordingly. + /// + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) + { + var arbitrator = InterceptCall(context, false, true, request)?.Freeze(); + context = arbitrator?.Context ?? context; + request = arbitrator?.UnaryRequest ?? request; + var response = continuation(request, context); + if (arbitrator?.OnResponseMessage != null || arbitrator?.OnResponseStreamEnd != null) + { + response = new AsyncServerStreamingCall( + new WrappedClientStreamReader(response.ResponseStream, arbitrator.OnResponseMessage, arbitrator.OnResponseStreamEnd), + response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); + } + return response; + } + + /// + /// Intercepts an asynchronous invocation of a client streaming call and dispatches the events accordingly. + /// + public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) + { + var arbitrator = InterceptCall(context, true, false, null)?.Freeze(); + context = arbitrator?.Context ?? context; + var response = continuation(context); + if (arbitrator?.OnRequestMessage != null || arbitrator?.OnResponseStreamEnd != null || arbitrator?.OnUnaryResponse != null) + { + var requestStream = response.RequestStream; + if (arbitrator?.OnRequestMessage != null || arbitrator?.OnRequestStreamEnd != null) + { + requestStream = new WrappedClientStreamWriter(response.RequestStream, arbitrator.OnRequestMessage, arbitrator.OnRequestStreamEnd); + } + var responseAsync = response.ResponseAsync; + if (arbitrator?.OnUnaryResponse != null) + { + responseAsync = response.ResponseAsync.ContinueWith(unaryResponse => arbitrator.OnUnaryResponse(unaryResponse.Result)); + } + response = new AsyncClientStreamingCall(requestStream, responseAsync, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); + } + return response; + } + + /// + /// Intercepts an asynchronous invocation of a duplex streaming call and dispatches the events accordingly. + /// + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) + { + var arbitrator = InterceptCall(context, true, true, null)?.Freeze(); + context = arbitrator?.Context ?? context; + var response = continuation(context); + if (arbitrator?.OnRequestMessage != null || arbitrator?.OnRequestStreamEnd != null || arbitrator?.OnResponseMessage != null || arbitrator?.OnResponseStreamEnd != null) + { + var requestStream = response.RequestStream; + if (arbitrator?.OnRequestMessage != null || arbitrator?.OnRequestStreamEnd != null) + { + requestStream = new WrappedClientStreamWriter(response.RequestStream, arbitrator.OnRequestMessage, arbitrator.OnRequestStreamEnd); + } + var responseStream = response.ResponseStream; + if (arbitrator?.OnResponseMessage != null || arbitrator?.OnResponseStreamEnd != null) + { + responseStream = new WrappedClientStreamReader(response.ResponseStream, arbitrator.OnResponseMessage, arbitrator.OnResponseStreamEnd); + } + response = new AsyncDuplexStreamingCall(requestStream, responseStream, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); + } + return response; + } + + /// + /// Server-side handler for intercepting unary calls. + /// + /// Request message type for this method. + /// Response message type for this method. + public override Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) + { + return continuation(request, context); + } + + /// + /// Server-side handler for intercepting client streaming call. + /// + /// Request message type for this method. + /// Response message type for this method. + public override Task ClientStreamingServerHandler(IAsyncStreamReader requestStream, ServerCallContext context, ClientStreamingServerMethod continuation) + { + return continuation(requestStream, context); + } + + /// + /// Server-side handler for intercepting server streaming calls. + /// + /// Request message type for this method. + /// Response message type for this method. + public override Task ServerStreamingServerHandler(TRequest request, IServerStreamWriter responseStream, ServerCallContext context, ServerStreamingServerMethod continuation) + { + return continuation(request, responseStream, context); + } + + /// + /// Server-side handler for intercepting bidi streaming calls. + /// + /// Request message type for this method. + /// Response message type for this method. + public override Task DuplexStreamingServerHandler(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context, DuplexStreamingServerMethod continuation) + { + return continuation(requestStream, responseStream, context); + } + + private class WrappedClientStreamReader : IAsyncStreamReader + { + readonly IAsyncStreamReader reader; + readonly Func onMessage; + readonly Action onStreamEnd; + public WrappedClientStreamReader(IAsyncStreamReader reader, Func onMessage, Action onStreamEnd) + { + this.reader = reader; + this.onMessage = onMessage; + this.onStreamEnd = onStreamEnd; + } + + public void Dispose() => ((IDisposable)reader).Dispose(); + + private T current; + public T Current + { + get + { + if (current == null) + { + throw new InvalidOperationException("No current element is available."); + } + return current; + } + } + + public async Task MoveNext(CancellationToken token) + { + if (await reader.MoveNext(token)) + { + var current = reader.Current; + if (onMessage != null) + { + var mappedValue = onMessage(current); + if (mappedValue != null) + { + current = mappedValue; + } + } + this.current = current; + return true; + } + onStreamEnd?.Invoke(); + return false; + } + } + + private class WrappedClientStreamWriter : IClientStreamWriter + { + readonly IClientStreamWriter writer; + readonly Func onMessage; + readonly Action onResponseStreamEnd; + public WrappedClientStreamWriter(IClientStreamWriter writer, Func onMessage, Action onResponseStreamEnd) + { + this.writer = writer; + this.onMessage = onMessage; + this.onResponseStreamEnd = onResponseStreamEnd; + } + public Task CompleteAsync() + { + if (onResponseStreamEnd != null) + { + return writer.CompleteAsync().ContinueWith(x => onResponseStreamEnd()); + } + return writer.CompleteAsync(); + } + public Task WriteAsync(T message) + { + if (onMessage != null) + { + message = onMessage(message); + } + return writer.WriteAsync(message); + } + public WriteOptions WriteOptions + { + get + { + return writer.WriteOptions; + } + set + { + writer.WriteOptions = value; + } + } + } + } +} -- cgit v1.2.3 From b604750f2aacac9deb17abe72231abbbcfa8a5ba Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Sun, 11 Feb 2018 23:12:27 -0800 Subject: Simplify the AddHeaderClientInterceptor with GenericInterceptor --- .../Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs index 0904c51a48..91c7d54181 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs @@ -32,26 +32,20 @@ namespace Grpc.Core.Interceptors.Tests { public class ClientInterceptorTest { - private class AddHeaderClientInterceptor : Interceptor + private class AddHeaderClientInterceptor : GenericInterceptor { readonly Metadata.Entry header; public AddHeaderClientInterceptor(string key, string value) { this.header = new Metadata.Entry(key, value); } - public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) + protected override ClientCallArbitrator InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) { context.Options.Headers.Add(this.header); - return continuation(request, context); + return new ClientCallArbitrator { Context = context }; } - public Metadata.Entry Header - { - get - { - return this.header; - } - } + public Metadata.Entry Header => this.header; } const string Host = "127.0.0.1"; -- cgit v1.2.3 From 4bc49f5c4b9b22c2d90e1933d3223799755c1e81 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Sun, 11 Feb 2018 23:19:34 -0800 Subject: Simplify ClientHeaderInterceptor in ClientBase with GenericInterceptor --- src/csharp/Grpc.Core/ClientBase.cs | 50 ++++++-------------------------------- 1 file changed, 7 insertions(+), 43 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index d64ce7dd94..f978d084d9 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -149,7 +149,7 @@ namespace Grpc.Core /// protected internal class ClientBaseConfiguration { - private class ClientHeaderInterceptor : Interceptor + private class ClientHeaderInterceptor : GenericInterceptor { readonly Func> interceptor; @@ -161,49 +161,13 @@ namespace Grpc.Core this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); } - /// - /// Intercepts a blocking invocation of a simple remote call. - /// - public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) - { - var newHeaders = interceptor(context.Method, context.Host, context.Options); - return continuation(request, new ClientInterceptorContext(context.Method, newHeaders.Item1, newHeaders.Item2)); - } - - /// - /// Intercepts an asynchronous invocation of a simple remote call. - /// - public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) - { - var newHeaders = interceptor(context.Method, context.Host, context.Options); - return continuation(request, new ClientInterceptorContext(context.Method, newHeaders.Item1, newHeaders.Item2)); - } - - /// - /// Intercepts an asynchronous invocation of a streaming remote call. - /// - public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) - { - var newHeaders = interceptor(context.Method, context.Host, context.Options); - return continuation(request, new ClientInterceptorContext(context.Method, newHeaders.Item1, newHeaders.Item2)); - } - - /// - /// Intercepts an asynchronous invocation of a client streaming call. - /// - public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) - { - var newHeaders = interceptor(context.Method, context.Host, context.Options); - return continuation(new ClientInterceptorContext(context.Method, newHeaders.Item1, newHeaders.Item2)); - } - - /// - /// Intercepts an asynchronous invocation of a duplex streaming call. - /// - public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) + protected override ClientCallArbitrator InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) { - var newHeaders = interceptor(context.Method, context.Host, context.Options); - return continuation(new ClientInterceptorContext(context.Method, newHeaders.Item1, newHeaders.Item2)); + var newHostAndCallOptions = interceptor(context.Method, context.Host, context.Options); + return new ClientCallArbitrator + { + Context = new ClientInterceptorContext(context.Method, newHostAndCallOptions.Item1, newHostAndCallOptions.Item2) + }; } } -- cgit v1.2.3 From 2354a1d76eecef541083f674a6c21592556ac54a Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Sun, 11 Feb 2018 23:29:51 -0800 Subject: Add Intercept(metadata=>metadata) helper function --- .../Interceptors/ClientInterceptorTest.cs | 29 ++++++----------- .../Interceptors/CallInvokerExtensions.cs | 36 ++++++++++++++++++++++ .../Grpc.Core/Interceptors/ChannelExtensions.cs | 15 +++++++++ 3 files changed, 60 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs index 91c7d54181..880e4d2b19 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs @@ -32,38 +32,27 @@ namespace Grpc.Core.Interceptors.Tests { public class ClientInterceptorTest { - private class AddHeaderClientInterceptor : GenericInterceptor - { - readonly Metadata.Entry header; - public AddHeaderClientInterceptor(string key, string value) - { - this.header = new Metadata.Entry(key, value); - } - protected override ClientCallArbitrator InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) - { - context.Options.Headers.Add(this.header); - return new ClientCallArbitrator { Context = context }; - } - - public Metadata.Entry Header => this.header; - } - const string Host = "127.0.0.1"; [Test] public void AddRequestHeaderInClientInterceptor() { + const string HeaderKey = "x-client-interceptor"; + const string HeaderValue = "hello-world"; var helper = new MockServiceHelper(Host); - var interceptor = new AddHeaderClientInterceptor("x-client-interceptor", "hello world"); helper.UnaryHandler = new UnaryServerMethod((request, context) => { - var interceptorHeader = context.RequestHeaders.Last(m => (m.Key == interceptor.Header.Key)).Value; - Assert.AreEqual(interceptorHeader, interceptor.Header.Value); + var interceptorHeader = context.RequestHeaders.Last(m => (m.Key == HeaderKey)).Value; + Assert.AreEqual(interceptorHeader, HeaderValue); return Task.FromResult("PASS"); }); var server = helper.GetServer(); server.Start(); - var callInvoker = helper.GetChannel().Intercept(interceptor); + var callInvoker = helper.GetChannel().Intercept(metadata => + { + metadata.Add(new Metadata.Entry(HeaderKey, HeaderValue)); + return metadata; + }); Assert.AreEqual("PASS", callInvoker.BlockingUnaryCall(new Method(MethodType.Unary, MockServiceHelper.ServiceName, "Unary", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions().WithHeaders(new Metadata()), "")); } } diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs index 26e9f8802d..9cec66282f 100644 --- a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs @@ -101,6 +101,42 @@ namespace Grpc.Core.Interceptors } } + private class MetadataInterceptor : GenericInterceptor + { + readonly Func interceptor; + + /// + /// Creates a new instance of MetadataInterceptor given the specified interceptor function. + /// + public MetadataInterceptor(Func interceptor) + { + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + } + + protected override ClientCallArbitrator InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + { + return new ClientCallArbitrator + { + Context = new ClientInterceptorContext(context.Method, context.Host, context.Options.WithHeaders(interceptor(context.Options.Headers))) + }; + } + } + + /// + /// Returns a instance that intercepts + /// the invoker with the given interceptor. + /// + /// The underlying invoker to intercept. + /// + /// An interceptor delegate that takes the request metadata to be sent with an outgoing call + /// and returns a instance that will replace the existing + /// invocation metadata. + /// + public static CallInvoker Intercept(this CallInvoker invoker, Func interceptor) + { + return new InterceptingCallInvoker(invoker, new MetadataInterceptor(interceptor)); + } + /// /// Returns a instance that intercepts /// the invoker with the given interceptor. diff --git a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs index 1a54b93dae..25d1724803 100644 --- a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs @@ -50,5 +50,20 @@ namespace Grpc.Core.Interceptors { return new DefaultCallInvoker(channel).Intercept(interceptors); } + + /// + /// Returns a instance that intercepts + /// the invoker with the given interceptor. + /// + /// The channel to intercept. + /// + /// An interceptor delegate that takes the request metadata to be sent with an outgoing call + /// and returns a instance that will replace the existing + /// invocation metadata. + /// + public static CallInvoker Intercept(this Channel channel, Func interceptor) + { + return new DefaultCallInvoker(channel).Intercept(interceptor); + } } } -- cgit v1.2.3 From e97fe27f687e27680577dc9e8bedae258ef5a36b Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Sun, 11 Feb 2018 23:42:39 -0800 Subject: Add more tests for client interceptors --- .../Interceptors/ClientInterceptorTest.cs | 80 +++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs index 880e4d2b19..ca7677c41f 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs @@ -19,6 +19,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using Grpc.Core; @@ -50,10 +51,87 @@ namespace Grpc.Core.Interceptors.Tests server.Start(); var callInvoker = helper.GetChannel().Intercept(metadata => { + metadata = metadata ?? new Metadata(); metadata.Add(new Metadata.Entry(HeaderKey, HeaderValue)); return metadata; }); - Assert.AreEqual("PASS", callInvoker.BlockingUnaryCall(new Method(MethodType.Unary, MockServiceHelper.ServiceName, "Unary", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions().WithHeaders(new Metadata()), "")); + Assert.AreEqual("PASS", callInvoker.BlockingUnaryCall(new Method(MethodType.Unary, MockServiceHelper.ServiceName, "Unary", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions(), "")); + } + + [Test] + public void CheckInterceptorOrderInClientInterceptors() + { + var helper = new MockServiceHelper(Host); + helper.UnaryHandler = new UnaryServerMethod((request, context) => + { + return Task.FromResult("PASS"); + }); + var server = helper.GetServer(); + server.Start(); + var stringBuilder = new StringBuilder(); + var callInvoker = helper.GetChannel().Intercept(metadata => + { + metadata = metadata ?? new Metadata(); + stringBuilder.Append("interceptor1"); + return metadata; + }).Intercept(metadata => + { + metadata = metadata ?? new Metadata(); + stringBuilder.Append("interceptor2"); + return metadata; + }).Intercept(metadata => + { + metadata = metadata ?? new Metadata(); + stringBuilder.Append("interceptor3"); + return metadata; + }); + Assert.AreEqual("PASS", callInvoker.BlockingUnaryCall(new Method(MethodType.Unary, MockServiceHelper.ServiceName, "Unary", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions(), "")); + Assert.AreEqual("interceptor3interceptor2interceptor1", stringBuilder.ToString()); + } + + private class CountingInterceptor : GenericInterceptor + { + protected override ClientCallArbitrator InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + { + if (!clientStreaming) + { + return null; + } + int counter = 0; + return new ClientCallArbitrator + { + OnRequestMessage = m => { counter++; return m; }, + OnUnaryResponse = x => (TResponse)(object)counter.ToString() // Cast to object first is needed to satisfy the type-checker + }; + } + } + + [Test] + public async Task CountNumberOfRequestsInClientInterceptors() + { + var helper = new MockServiceHelper(Host); + helper.ClientStreamingHandler = new ClientStreamingServerMethod(async (requestStream, context) => + { + string result = ""; + await requestStream.ForEachAsync((request) => + { + result += request; + return TaskUtils.CompletedTask; + }); + await Task.Delay(100); + return result; + }); + + var callInvoker = helper.GetChannel().Intercept(new CountingInterceptor()); + + var server = helper.GetServer(); + server.Start(); + var call = callInvoker.AsyncClientStreamingCall(new Method(MethodType.ClientStreaming, MockServiceHelper.ServiceName, "ClientStreaming", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions()); + await call.RequestStream.WriteAllAsync(new string[] { "A", "B", "C" }); + Assert.AreEqual("3", await call.ResponseAsync); + + Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode); + Assert.IsNotNull(call.GetTrailers()); } } } -- cgit v1.2.3 From 6c3cb2299124773abe4b7039b94a976c5552c432 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Mon, 12 Feb 2018 01:09:34 -0800 Subject: Add server-side interceptor helper facility to GenericInterceptor --- .../Grpc.Core/Interceptors/GenericInterceptor.cs | 150 +++++++++++++++++++-- 1 file changed, 137 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs b/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs index b9fc5e0a19..ed90ded889 100644 --- a/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs +++ b/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs @@ -29,7 +29,6 @@ namespace Grpc.Core.Interceptors /// public abstract class GenericInterceptor : Interceptor { - /// /// Provides hooks through which an invocation should be intercepted. /// @@ -93,6 +92,65 @@ namespace Grpc.Core.Interceptors return null; } + /// + /// Provides hooks through which a server-side handler should be intercepted. + /// + public sealed class ServerCallArbitrator + where TRequest : class + where TResponse : class + { + internal ServerCallArbitrator Freeze() + { + return (ServerCallArbitrator)MemberwiseClone(); + } + /// + /// Override the request for the outgoing invocation for non-client-streaming invocations. + /// + public TRequest UnaryRequest { get; set; } + /// + /// Delegate that intercepts a response from a non-server-streaming invocation and optionally overrides it. + /// + public Func OnUnaryResponse { get; set; } + /// + /// Delegate that intercepts each request message for a client-streaming invocation and optionally overrides each message. + /// + public Func OnRequestMessage { get; set; } + /// + /// Delegate that intercepts each response message for a server-streaming invocation and optionally overrides each message. + /// + public Func OnResponseMessage { get; set; } + /// + /// Callback that gets invoked when handler is finished executing. + /// + public Action OnHandlerEnd { get; set; } + /// + /// Callback that gets invoked when request stream is finished. + /// + public Action OnRequestStreamEnd { get; set; } + } + + /// + /// Intercepts an incoming service handler invocation on the server side. + /// Derived classes that intend to intercept incoming handlers on the server side should + /// override this and return the appropriate hooks in the form of a ServerCallArbitrator instance. + /// + /// The context of the incoming invocation. + /// True if the invocation is client-streaming. + /// True if the invocation is server-streaming. + /// The request message for client-unary invocations, null otherwise. + /// Request message type for the current invocation. + /// Response message type for the current invocation. + /// + /// The derived class should return an instance of ServerCallArbitrator to control the trajectory + /// as they see fit, or null if it does not intend to pursue the invocation any further. + /// + protected virtual Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) + where TRequest : class + where TResponse : class + { + return Task.FromResult>(null); + } + /// /// Intercepts a blocking invocation of a simple remote call and dispatches the events accordingly. /// @@ -138,7 +196,7 @@ namespace Grpc.Core.Interceptors if (arbitrator?.OnResponseMessage != null || arbitrator?.OnResponseStreamEnd != null) { response = new AsyncServerStreamingCall( - new WrappedClientStreamReader(response.ResponseStream, arbitrator.OnResponseMessage, arbitrator.OnResponseStreamEnd), + new WrappedAsyncStreamReader(response.ResponseStream, arbitrator.OnResponseMessage, arbitrator.OnResponseStreamEnd), response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); } return response; @@ -187,7 +245,7 @@ namespace Grpc.Core.Interceptors var responseStream = response.ResponseStream; if (arbitrator?.OnResponseMessage != null || arbitrator?.OnResponseStreamEnd != null) { - responseStream = new WrappedClientStreamReader(response.ResponseStream, arbitrator.OnResponseMessage, arbitrator.OnResponseStreamEnd); + responseStream = new WrappedAsyncStreamReader(response.ResponseStream, arbitrator.OnResponseMessage, arbitrator.OnResponseStreamEnd); } response = new AsyncDuplexStreamingCall(requestStream, responseStream, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); } @@ -199,9 +257,17 @@ namespace Grpc.Core.Interceptors /// /// Request message type for this method. /// Response message type for this method. - public override Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) + public override async Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) { - return continuation(request, context); + var arbitrator = (await InterceptHandler(context, false, false, request))?.Freeze(); + request = arbitrator?.UnaryRequest ?? request; + var response = await continuation(request, context); + if (arbitrator?.OnUnaryResponse != null) + { + response = arbitrator.OnUnaryResponse(response); + } + arbitrator?.OnHandlerEnd(); + return response; } /// @@ -209,9 +275,20 @@ namespace Grpc.Core.Interceptors /// /// Request message type for this method. /// Response message type for this method. - public override Task ClientStreamingServerHandler(IAsyncStreamReader requestStream, ServerCallContext context, ClientStreamingServerMethod continuation) + public override async Task ClientStreamingServerHandler(IAsyncStreamReader requestStream, ServerCallContext context, ClientStreamingServerMethod continuation) { - return continuation(requestStream, context); + var arbitrator = (await InterceptHandler(context, true, false, null))?.Freeze(); + if (arbitrator?.OnRequestMessage != null || arbitrator?.OnRequestStreamEnd != null) + { + requestStream = new WrappedAsyncStreamReader(requestStream, arbitrator.OnRequestMessage, arbitrator.OnRequestStreamEnd); + } + var response = await continuation(requestStream, context); + if (arbitrator?.OnUnaryResponse != null) + { + response = arbitrator.OnUnaryResponse(response); + } + arbitrator?.OnHandlerEnd(); + return response; } /// @@ -219,9 +296,16 @@ namespace Grpc.Core.Interceptors /// /// Request message type for this method. /// Response message type for this method. - public override Task ServerStreamingServerHandler(TRequest request, IServerStreamWriter responseStream, ServerCallContext context, ServerStreamingServerMethod continuation) + public override async Task ServerStreamingServerHandler(TRequest request, IServerStreamWriter responseStream, ServerCallContext context, ServerStreamingServerMethod continuation) { - return continuation(request, responseStream, context); + var arbitrator = (await InterceptHandler(context, false, true, request))?.Freeze(); + request = arbitrator?.UnaryRequest ?? request; + if (arbitrator?.OnResponseMessage != null) + { + responseStream = new WrappedAsyncStreamWriter(responseStream, arbitrator.OnResponseMessage); + } + await continuation(request, responseStream, context); + arbitrator?.OnHandlerEnd(); } /// @@ -229,17 +313,27 @@ namespace Grpc.Core.Interceptors /// /// Request message type for this method. /// Response message type for this method. - public override Task DuplexStreamingServerHandler(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context, DuplexStreamingServerMethod continuation) + public override async Task DuplexStreamingServerHandler(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context, DuplexStreamingServerMethod continuation) { - return continuation(requestStream, responseStream, context); + var arbitrator = (await InterceptHandler(context, true, true, null))?.Freeze(); + if (arbitrator?.OnRequestMessage != null || arbitrator?.OnRequestStreamEnd != null) + { + requestStream = new WrappedAsyncStreamReader(requestStream, arbitrator.OnRequestMessage, arbitrator.OnRequestStreamEnd); + } + if (arbitrator?.OnResponseMessage != null) + { + responseStream = new WrappedAsyncStreamWriter(responseStream, arbitrator.OnResponseMessage); + } + await continuation(requestStream, responseStream, context); + arbitrator?.OnHandlerEnd(); } - private class WrappedClientStreamReader : IAsyncStreamReader + private class WrappedAsyncStreamReader : IAsyncStreamReader { readonly IAsyncStreamReader reader; readonly Func onMessage; readonly Action onStreamEnd; - public WrappedClientStreamReader(IAsyncStreamReader reader, Func onMessage, Action onStreamEnd) + public WrappedAsyncStreamReader(IAsyncStreamReader reader, Func onMessage, Action onStreamEnd) { this.reader = reader; this.onMessage = onMessage; @@ -321,5 +415,35 @@ namespace Grpc.Core.Interceptors } } } + + private class WrappedAsyncStreamWriter : IServerStreamWriter + { + readonly IAsyncStreamWriter writer; + readonly Func onMessage; + public WrappedAsyncStreamWriter(IAsyncStreamWriter writer, Func onMessage) + { + this.writer = writer; + this.onMessage = onMessage; + } + public Task WriteAsync(T message) + { + if (onMessage != null) + { + message = onMessage(message); + } + return writer.WriteAsync(message); + } + public WriteOptions WriteOptions + { + get + { + return writer.WriteOptions; + } + set + { + writer.WriteOptions = value; + } + } + } } } -- cgit v1.2.3 From 0126b1c6e93c70f7b55d54765e3b32d926065a61 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Mon, 12 Feb 2018 01:18:47 -0800 Subject: Simplify ServerInterceptorTest to leverage GenericInterceptor --- src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs index 57ea3e37d5..a602d081ae 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs @@ -34,7 +34,7 @@ namespace Grpc.Core.Interceptors.Tests { const string Host = "127.0.0.1"; - private class AddRequestHeaderServerInterceptor : Interceptor + private class AddRequestHeaderServerInterceptor : GenericInterceptor { readonly Metadata.Entry header; @@ -43,10 +43,10 @@ namespace Grpc.Core.Interceptors.Tests this.header = new Metadata.Entry(key, value); } - public override async Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) + protected override Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) { context.RequestHeaders.Add(header); - return await continuation(request, context).ConfigureAwait(false); + return Task.FromResult>(null); } public Metadata.Entry Header -- cgit v1.2.3 From 62f9f53b7a5cfd88f59fc4d2f88e114307f0628b Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Mon, 12 Feb 2018 01:25:08 -0800 Subject: Add more tests for ServerInterceptor --- .../Interceptors/ServerInterceptorTest.cs | 38 ++++++++++++++++++++++ 1 file changed, 38 insertions(+) (limited to 'src') diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs index a602d081ae..fbace51db5 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs @@ -19,6 +19,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using Grpc.Core; @@ -75,5 +76,42 @@ namespace Grpc.Core.Interceptors.Tests var channel = helper.GetChannel(); Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "")); } + + private class ArbitraryActionInterceptor : GenericInterceptor + { + readonly Action action; + + + public ArbitraryActionInterceptor(Action action) + { + this.action = action; + } + + protected override Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) + { + action(); + return Task.FromResult>(null); + } + } + + [Test] + public void VerifyInterceptorOrdering() + { + var helper = new MockServiceHelper(Host); + helper.UnaryHandler = new UnaryServerMethod((request, context) => + { + return Task.FromResult("PASS"); + }); + var stringBuilder = new StringBuilder(); + helper.ServiceDefinition = helper.ServiceDefinition + .Intercept(new ArbitraryActionInterceptor(() => stringBuilder.Append("A"))) + .Intercept(new ArbitraryActionInterceptor(() => stringBuilder.Append("B"))) + .Intercept(new ArbitraryActionInterceptor(() => stringBuilder.Append("C"))); + var server = helper.GetServer(); + server.Start(); + var channel = helper.GetChannel(); + Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "")); + Assert.AreEqual("CBA", stringBuilder.ToString()); + } } } -- cgit v1.2.3 From 89bdba9f8a46d350e1d7af882e8110cefc700360 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Wed, 21 Feb 2018 07:56:58 -0800 Subject: Add documentation remarks about order of interception --- .../Grpc.Core/Interceptors/CallInvokerExtensions.cs | 21 +++++++++++++++++++++ .../Grpc.Core/Interceptors/ChannelExtensions.cs | 21 +++++++++++++++++++++ src/csharp/Grpc.Core/ServerServiceDefinition.cs | 6 ++++++ 3 files changed, 48 insertions(+) (limited to 'src') diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs index 9cec66282f..f1835f6bd8 100644 --- a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs @@ -132,6 +132,13 @@ namespace Grpc.Core.Interceptors /// and returns a instance that will replace the existing /// invocation metadata. /// + /// + /// Multiple interceptors can be added on top of each other by calling + /// "invoker.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted CallInvoker, effectively + /// building a chain like "invoker.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// public static CallInvoker Intercept(this CallInvoker invoker, Func interceptor) { return new InterceptingCallInvoker(invoker, new MetadataInterceptor(interceptor)); @@ -143,6 +150,13 @@ namespace Grpc.Core.Interceptors /// /// The underlying invoker to intercept. /// The interceptor to intercept calls to the invoker with. + /// + /// Multiple interceptors can be added on top of each other by calling + /// "invoker.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted CallInvoker, effectively + /// building a chain like "invoker.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// public static CallInvoker Intercept(this CallInvoker invoker, Interceptor interceptor) { return new InterceptingCallInvoker(invoker, interceptor); @@ -157,6 +171,13 @@ namespace Grpc.Core.Interceptors /// An array of interceptors to intercept the calls to the invoker with. /// Control is passed to the interceptors in the order specified. /// + /// + /// Multiple interceptors can be added on top of each other by calling + /// "invoker.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted CallInvoker, effectively + /// building a chain like "invoker.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// public static CallInvoker Intercept(this CallInvoker invoker, params Interceptor[] interceptors) { GrpcPreconditions.CheckNotNull(invoker, "invoker"); diff --git a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs index 25d1724803..a095b05925 100644 --- a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs @@ -32,6 +32,13 @@ namespace Grpc.Core.Interceptors /// /// The channel to intercept. /// The interceptor to intercept the channel with. + /// + /// Multiple interceptors can be added on top of each other by calling + /// "channel.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted channel, effectively + /// building a chain like "channel.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// public static CallInvoker Intercept(this Channel channel, Interceptor interceptor) { return new DefaultCallInvoker(channel).Intercept(interceptor); @@ -46,6 +53,13 @@ namespace Grpc.Core.Interceptors /// An array of interceptors to intercept the channel with. /// Control is passed to the interceptors in the order specified. /// + /// + /// Multiple interceptors can be added on top of each other by calling + /// "channel.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted channel, effectively + /// building a chain like "channel.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// public static CallInvoker Intercept(this Channel channel, params Interceptor[] interceptors) { return new DefaultCallInvoker(channel).Intercept(interceptors); @@ -61,6 +75,13 @@ namespace Grpc.Core.Interceptors /// and returns a instance that will replace the existing /// invocation metadata. /// + /// + /// Multiple interceptors can be added on top of each other by calling + /// "channel.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted channel, effectively + /// building a chain like "channel.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// public static CallInvoker Intercept(this Channel channel, Func interceptor) { return new DefaultCallInvoker(channel).Intercept(interceptor); diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs index 3e6c12884b..a42f543a5a 100644 --- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs +++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs @@ -54,6 +54,12 @@ namespace Grpc.Core /// This is an EXPERIMENTAL API. /// /// The interceptor to register on service. + /// + /// Multiple interceptors can be added on top of each other by chaining them + /// like "service.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control, + /// i.e. "a" will run before "b" before "c". + /// public ServerServiceDefinition Intercept(Interceptor interceptor) { GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); -- cgit v1.2.3 From cd918d99e8fa25f89b5b36e15e440e307c41fc7a Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Wed, 21 Feb 2018 10:32:01 -0800 Subject: Simplify service-side interceptor code --- .../ServerServiceDefinitionExtensions.cs | 82 ++++++++++++++++++++++ src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 25 +++---- src/csharp/Grpc.Core/ServerServiceDefinition.cs | 30 +------- 3 files changed, 96 insertions(+), 41 deletions(-) create mode 100644 src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs (limited to 'src') diff --git a/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs new file mode 100644 index 0000000000..21a0782037 --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs @@ -0,0 +1,82 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Linq; +using Grpc.Core.Utils; + +namespace Grpc.Core.Interceptors +{ + /// + /// Extends the ServerServiceDefinition class to add methods used to register interceptors on the server side. + /// This is an EXPERIMENTAL API. + /// + public static class ServerServiceDefinitionExtensions + { + /// + /// Returns a instance that + /// intercepts incoming calls to the underlying service handler through the given interceptor. + /// This is an EXPERIMENTAL API. + /// + /// The instance to register interceptors on. + /// The interceptor to intercept the incoming invocations with. + /// + /// Multiple interceptors can be added on top of each other by calling + /// "serverServiceDefinition.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted service definition, effectively + /// building a chain like "serverServiceDefinition.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// + public static ServerServiceDefinition Intercept(this ServerServiceDefinition serverServiceDefinition, Interceptor interceptor) + { + GrpcPreconditions.CheckNotNull(serverServiceDefinition, "serverServiceDefinition"); + GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + return new ServerServiceDefinition(serverServiceDefinition.CallHandlers.ToDictionary(x => x.Key, x => x.Value.Intercept(interceptor))); + } + + /// + /// Returns a instance that + /// intercepts incoming calls to the underlying service handler through the given interceptors. + /// This is an EXPERIMENTAL API. + /// + /// The instance to register interceptors on. + /// + /// An array of interceptors to intercept the incoming invocations with. + /// Control is passed to the interceptors in the order specified. + /// + /// + /// Multiple interceptors can be added on top of each other by calling + /// "serverServiceDefinition.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". + /// Interceptors can be later added to an existing intercepted service definition, effectively + /// building a chain like "serverServiceDefinition.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// + public static ServerServiceDefinition Intercept(this ServerServiceDefinition serverServiceDefinition, params Interceptor[] interceptors) + { + GrpcPreconditions.CheckNotNull(serverServiceDefinition, "serverServiceDefinition"); + GrpcPreconditions.CheckNotNull(interceptors, "interceptors"); + + foreach (var interceptor in interceptors.Reverse()) + { + serverServiceDefinition = Intercept(serverServiceDefinition, interceptor); + } + + return serverServiceDefinition; + } + } +} \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index f9bf40f237..add72ad68d 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -31,14 +31,10 @@ namespace Grpc.Core.Internal internal interface IServerCallHandler { Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq); - } - - internal interface IInterceptableCallHandler - { IServerCallHandler Intercept(Interceptor interceptor); } - internal class UnaryServerCallHandler : IServerCallHandler, IInterceptableCallHandler + internal class UnaryServerCallHandler : IServerCallHandler where TRequest : class where TResponse : class { @@ -96,13 +92,13 @@ namespace Grpc.Core.Internal await finishedTask.ConfigureAwait(false); } - IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor) + public IServerCallHandler Intercept(Interceptor interceptor) { return new UnaryServerCallHandler(method, (request, context) => interceptor.UnaryServerHandler(request, context, handler)); } } - internal class ServerStreamingServerCallHandler : IServerCallHandler, IInterceptableCallHandler + internal class ServerStreamingServerCallHandler : IServerCallHandler where TRequest : class where TResponse : class { @@ -159,13 +155,13 @@ namespace Grpc.Core.Internal await finishedTask.ConfigureAwait(false); } - IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor) + public IServerCallHandler Intercept(Interceptor interceptor) { return new ServerStreamingServerCallHandler(method, (request, responseStream, context) => interceptor.ServerStreamingServerHandler(request, responseStream, context, handler)); } } - internal class ClientStreamingServerCallHandler : IServerCallHandler, IInterceptableCallHandler + internal class ClientStreamingServerCallHandler : IServerCallHandler where TRequest : class where TResponse : class { @@ -222,13 +218,13 @@ namespace Grpc.Core.Internal await finishedTask.ConfigureAwait(false); } - IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor) + public IServerCallHandler Intercept(Interceptor interceptor) { return new ClientStreamingServerCallHandler(method, (requestStream, context) => interceptor.ClientStreamingServerHandler(requestStream, context, handler)); } } - internal class DuplexStreamingServerCallHandler : IServerCallHandler, IInterceptableCallHandler + internal class DuplexStreamingServerCallHandler : IServerCallHandler where TRequest : class where TResponse : class { @@ -282,7 +278,7 @@ namespace Grpc.Core.Internal await finishedTask.ConfigureAwait(false); } - IServerCallHandler IInterceptableCallHandler.Intercept(Interceptor interceptor) + public IServerCallHandler Intercept(Interceptor interceptor) { return new DuplexStreamingServerCallHandler(method, (requestStream, responseStream, context) => interceptor.DuplexStreamingServerHandler(requestStream, responseStream, context, handler)); } @@ -314,6 +310,11 @@ namespace Grpc.Core.Internal { return callHandlerImpl.HandleCall(newRpc, cq); } + + public IServerCallHandler Intercept(Interceptor interceptor) + { + return this; // Do not intercept unimplemented services + } } internal static class HandlerUtils diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs index a42f543a5a..07c6aa1796 100644 --- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs +++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs @@ -35,7 +35,7 @@ namespace Grpc.Core { readonly ReadOnlyDictionary callHandlers; - private ServerServiceDefinition(Dictionary callHandlers) + internal ServerServiceDefinition(Dictionary callHandlers) { this.callHandlers = new ReadOnlyDictionary(callHandlers); } @@ -48,34 +48,6 @@ namespace Grpc.Core } } - /// - /// Returns a instance that - /// intercepts calls to the underlying service handler via the given interceptor. - /// This is an EXPERIMENTAL API. - /// - /// The interceptor to register on service. - /// - /// Multiple interceptors can be added on top of each other by chaining them - /// like "service.Intercept(c).Intercept(b).Intercept(a)". Note that - /// in this case, the last interceptor added will be the first to take control, - /// i.e. "a" will run before "b" before "c". - /// - public ServerServiceDefinition Intercept(Interceptor interceptor) - { - GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); - return new ServerServiceDefinition(CallHandlers.ToDictionary( - x => x.Key, x => - { - var value = x.Value; - var interceptable = value as IInterceptableCallHandler; - if (interceptable == null) - { - return value; - } - return interceptable.Intercept(interceptor); - })); - } - /// /// Creates a new builder object for ServerServiceDefinition. /// -- cgit v1.2.3 From 6235ea9b607766c0ff08e2c4403cb39c439238fb Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Wed, 21 Feb 2018 11:47:56 -0800 Subject: Turn ClientInterceptorContext into a struct ...and move it into its own file. --- .../Interceptors/ClientInterceptorContext.cs | 65 ++++++++++++++++++++++ src/csharp/Grpc.Core/Interceptors/Interceptor.cs | 41 -------------- 2 files changed, 65 insertions(+), 41 deletions(-) create mode 100644 src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs (limited to 'src') diff --git a/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs b/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs new file mode 100644 index 0000000000..64d7297a0a --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs @@ -0,0 +1,65 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Reflection; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Interceptors +{ + /// + /// Carries along the context associated with intercepted invocations on the client side. + /// This is an EXPERIMENTAL API. + /// + public struct ClientInterceptorContext + where TRequest : class + where TResponse : class + { + /// + /// Creates a new instance of + /// with the specified method, host, and call options. + /// + /// A object representing the method to be invoked. + /// The host to dispatch the current call to. + /// A instance containing the call options of the current call. + public ClientInterceptorContext(Method method, string host, CallOptions options) + { + Method = method; + Host = host; + Options = options; + } + + /// + /// Gets or sets the instance + /// representing the method to be invoked. + /// + public Method Method { get; set; } + + /// + /// Gets or sets the host that the currect invocation will be dispatched to. + /// + public string Host { get; set; } + + /// + /// Gets or sets the structure representing the + /// call options associated with the current invocation. + /// + public CallOptions Options { get; set; } + } +} diff --git a/src/csharp/Grpc.Core/Interceptors/Interceptor.cs b/src/csharp/Grpc.Core/Interceptors/Interceptor.cs index 0f32e8b420..c277aab2a5 100644 --- a/src/csharp/Grpc.Core/Interceptors/Interceptor.cs +++ b/src/csharp/Grpc.Core/Interceptors/Interceptor.cs @@ -23,47 +23,6 @@ using Grpc.Core.Internal; namespace Grpc.Core.Interceptors { - /// - /// Carries along the context associated with intercepted invocations on the client side. - /// This is an EXPERIMENTAL API. - /// - public class ClientInterceptorContext - where TRequest : class - where TResponse : class - { - /// - /// Creates a new instance of - /// with the specified method, host, and call options. - /// - /// A object representing the method to be invoked. - /// The host to dispatch the current call to. - /// A instance containing the call options of the current call. - - public ClientInterceptorContext(Method method, string host, CallOptions options) - { - Method = method; - Host = host; - Options = options; - } - - /// - /// Gets the representing - /// the method to be invoked. - /// - public Method Method { get; } - - /// - /// Gets the host that the currect invocation will be dispatched to. - /// - public string Host { get; } - - /// - /// Gets the structure representing the - /// call options associated with the current invocation. - /// - public CallOptions Options { get; } - } - /// /// Serves as the base class for gRPC interceptors. /// This is an EXPERIMENTAL API. -- cgit v1.2.3 From c1c29e30466c851eb3073d292ea605a3325719e5 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Wed, 21 Feb 2018 11:55:22 -0800 Subject: Make GenericInterceptor internal for now Also change `*Arbitrator` to `*Hooks`. --- .../Interceptors/ClientInterceptorTest.cs | 15 +-- .../Interceptors/ServerInterceptorTest.cs | 8 +- src/csharp/Grpc.Core/ClientBase.cs | 6 +- .../Interceptors/CallInvokerExtensions.cs | 7 +- .../Grpc.Core/Interceptors/GenericInterceptor.cs | 136 ++++++++++----------- 5 files changed, 85 insertions(+), 87 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs index ca7677c41f..0ec2d848f0 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs @@ -71,17 +71,14 @@ namespace Grpc.Core.Interceptors.Tests var stringBuilder = new StringBuilder(); var callInvoker = helper.GetChannel().Intercept(metadata => { - metadata = metadata ?? new Metadata(); stringBuilder.Append("interceptor1"); return metadata; }).Intercept(metadata => { - metadata = metadata ?? new Metadata(); stringBuilder.Append("interceptor2"); return metadata; }).Intercept(metadata => { - metadata = metadata ?? new Metadata(); stringBuilder.Append("interceptor3"); return metadata; }); @@ -91,14 +88,14 @@ namespace Grpc.Core.Interceptors.Tests private class CountingInterceptor : GenericInterceptor { - protected override ClientCallArbitrator InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) { if (!clientStreaming) { return null; } int counter = 0; - return new ClientCallArbitrator + return new ClientCallHooks { OnRequestMessage = m => { counter++; return m; }, OnUnaryResponse = x => (TResponse)(object)counter.ToString() // Cast to object first is needed to satisfy the type-checker @@ -112,14 +109,14 @@ namespace Grpc.Core.Interceptors.Tests var helper = new MockServiceHelper(Host); helper.ClientStreamingHandler = new ClientStreamingServerMethod(async (requestStream, context) => { - string result = ""; - await requestStream.ForEachAsync((request) => + var stringBuilder = new StringBuilder(); + await requestStream.ForEachAsync(request => { - result += request; + stringBuilder.Append(request); return TaskUtils.CompletedTask; }); await Task.Delay(100); - return result; + return stringBuilder.ToString(); }); var callInvoker = helper.GetChannel().Intercept(new CountingInterceptor()); diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs index fbace51db5..57dd68b1eb 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs @@ -44,10 +44,10 @@ namespace Grpc.Core.Interceptors.Tests this.header = new Metadata.Entry(key, value); } - protected override Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) + protected override Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) { context.RequestHeaders.Add(header); - return Task.FromResult>(null); + return Task.FromResult>(null); } public Metadata.Entry Header @@ -87,10 +87,10 @@ namespace Grpc.Core.Interceptors.Tests this.action = action; } - protected override Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) + protected override Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) { action(); - return Task.FromResult>(null); + return Task.FromResult>(null); } } diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index f978d084d9..4bb06ed87f 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -161,12 +161,12 @@ namespace Grpc.Core this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); } - protected override ClientCallArbitrator InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) { var newHostAndCallOptions = interceptor(context.Method, context.Host, context.Options); - return new ClientCallArbitrator + return new ClientCallHooks { - Context = new ClientInterceptorContext(context.Method, newHostAndCallOptions.Item1, newHostAndCallOptions.Item2) + ContextOverride = new ClientInterceptorContext(context.Method, newHostAndCallOptions.Item1, newHostAndCallOptions.Item2) }; } } diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs index f1835f6bd8..1c0831a242 100644 --- a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs @@ -113,11 +113,12 @@ namespace Grpc.Core.Interceptors this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); } - protected override ClientCallArbitrator InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) { - return new ClientCallArbitrator + var metadata = context.Options.Headers ?? new Metadata(); + return new ClientCallHooks { - Context = new ClientInterceptorContext(context.Method, context.Host, context.Options.WithHeaders(interceptor(context.Options.Headers))) + ContextOverride = new ClientInterceptorContext(context.Method, context.Host, context.Options.WithHeaders(interceptor(metadata))), }; } } diff --git a/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs b/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs index ed90ded889..7ee649e9b5 100644 --- a/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs +++ b/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs @@ -27,27 +27,27 @@ namespace Grpc.Core.Interceptors /// Provides a base class for generic interceptor implementations that raises /// events and hooks to control the RPC lifecycle. /// - public abstract class GenericInterceptor : Interceptor + internal abstract class GenericInterceptor : Interceptor { /// /// Provides hooks through which an invocation should be intercepted. /// - public sealed class ClientCallArbitrator + public sealed class ClientCallHooks where TRequest : class where TResponse : class { - internal ClientCallArbitrator Freeze() + internal ClientCallHooks Freeze() { - return (ClientCallArbitrator)MemberwiseClone(); + return (ClientCallHooks)MemberwiseClone(); } /// /// Override the context for the outgoing invocation. /// - public ClientInterceptorContext Context { get; set; } + public ClientInterceptorContext? ContextOverride { get; set; } /// /// Override the request for the outgoing invocation for non-client-streaming invocations. /// - public TRequest UnaryRequest { get; set; } + public TRequest UnaryRequestOverride { get; set; } /// /// Delegate that intercepts a response from a non-server-streaming invocation and optionally overrides it. /// @@ -73,7 +73,7 @@ namespace Grpc.Core.Interceptors /// /// Intercepts an outgoing call from the client side. /// Derived classes that intend to intercept outgoing invocations from the client side should - /// override this and return the appropriate hooks in the form of a ClientCallArbitrator instance. + /// override this and return the appropriate hooks in the form of a ClientCallHooks instance. /// /// The context of the outgoing invocation. /// True if the invocation is client-streaming. @@ -82,10 +82,10 @@ namespace Grpc.Core.Interceptors /// Request message type for the current invocation. /// Response message type for the current invocation. /// - /// The derived class should return an instance of ClientCallArbitrator to control the trajectory + /// The derived class should return an instance of ClientCallHooks to control the trajectory /// as they see fit, or null if it does not intend to pursue the invocation any further. /// - protected virtual ClientCallArbitrator InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + protected virtual ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) where TRequest : class where TResponse : class { @@ -95,18 +95,18 @@ namespace Grpc.Core.Interceptors /// /// Provides hooks through which a server-side handler should be intercepted. /// - public sealed class ServerCallArbitrator + public sealed class ServerCallHooks where TRequest : class where TResponse : class { - internal ServerCallArbitrator Freeze() + internal ServerCallHooks Freeze() { - return (ServerCallArbitrator)MemberwiseClone(); + return (ServerCallHooks)MemberwiseClone(); } /// /// Override the request for the outgoing invocation for non-client-streaming invocations. /// - public TRequest UnaryRequest { get; set; } + public TRequest UnaryRequestOverride { get; set; } /// /// Delegate that intercepts a response from a non-server-streaming invocation and optionally overrides it. /// @@ -132,7 +132,7 @@ namespace Grpc.Core.Interceptors /// /// Intercepts an incoming service handler invocation on the server side. /// Derived classes that intend to intercept incoming handlers on the server side should - /// override this and return the appropriate hooks in the form of a ServerCallArbitrator instance. + /// override this and return the appropriate hooks in the form of a ServerCallHooks instance. /// /// The context of the incoming invocation. /// True if the invocation is client-streaming. @@ -141,14 +141,14 @@ namespace Grpc.Core.Interceptors /// Request message type for the current invocation. /// Response message type for the current invocation. /// - /// The derived class should return an instance of ServerCallArbitrator to control the trajectory + /// The derived class should return an instance of ServerCallHooks to control the trajectory /// as they see fit, or null if it does not intend to pursue the invocation any further. /// - protected virtual Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) + protected virtual Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) where TRequest : class where TResponse : class { - return Task.FromResult>(null); + return Task.FromResult>(null); } /// @@ -156,13 +156,13 @@ namespace Grpc.Core.Interceptors /// public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) { - var arbitrator = InterceptCall(context, false, false, request)?.Freeze(); - context = arbitrator?.Context ?? context; - request = arbitrator?.UnaryRequest ?? request; + var hooks = InterceptCall(context, false, false, request)?.Freeze(); + context = hooks?.ContextOverride ?? context; + request = hooks?.UnaryRequestOverride ?? request; var response = continuation(request, context); - if (arbitrator?.OnUnaryResponse != null) + if (hooks?.OnUnaryResponse != null) { - response = arbitrator.OnUnaryResponse(response); + response = hooks.OnUnaryResponse(response); } return response; } @@ -172,13 +172,13 @@ namespace Grpc.Core.Interceptors /// public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) { - var arbitrator = InterceptCall(context, false, false, request)?.Freeze(); - context = arbitrator?.Context ?? context; - request = arbitrator?.UnaryRequest ?? request; + var hooks = InterceptCall(context, false, false, request)?.Freeze(); + context = hooks?.ContextOverride ?? context; + request = hooks?.UnaryRequestOverride ?? request; var response = continuation(request, context); - if (arbitrator?.OnUnaryResponse != null) + if (hooks?.OnUnaryResponse != null) { - response = new AsyncUnaryCall(response.ResponseAsync.ContinueWith(unaryResponse => arbitrator.OnUnaryResponse(unaryResponse.Result)), + response = new AsyncUnaryCall(response.ResponseAsync.ContinueWith(unaryResponse => hooks.OnUnaryResponse(unaryResponse.Result)), response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); } return response; @@ -189,14 +189,14 @@ namespace Grpc.Core.Interceptors /// public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) { - var arbitrator = InterceptCall(context, false, true, request)?.Freeze(); - context = arbitrator?.Context ?? context; - request = arbitrator?.UnaryRequest ?? request; + var hooks = InterceptCall(context, false, true, request)?.Freeze(); + context = hooks?.ContextOverride ?? context; + request = hooks?.UnaryRequestOverride ?? request; var response = continuation(request, context); - if (arbitrator?.OnResponseMessage != null || arbitrator?.OnResponseStreamEnd != null) + if (hooks?.OnResponseMessage != null || hooks?.OnResponseStreamEnd != null) { response = new AsyncServerStreamingCall( - new WrappedAsyncStreamReader(response.ResponseStream, arbitrator.OnResponseMessage, arbitrator.OnResponseStreamEnd), + new WrappedAsyncStreamReader(response.ResponseStream, hooks.OnResponseMessage, hooks.OnResponseStreamEnd), response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); } return response; @@ -207,20 +207,20 @@ namespace Grpc.Core.Interceptors /// public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) { - var arbitrator = InterceptCall(context, true, false, null)?.Freeze(); - context = arbitrator?.Context ?? context; + var hooks = InterceptCall(context, true, false, null)?.Freeze(); + context = hooks?.ContextOverride ?? context; var response = continuation(context); - if (arbitrator?.OnRequestMessage != null || arbitrator?.OnResponseStreamEnd != null || arbitrator?.OnUnaryResponse != null) + if (hooks?.OnRequestMessage != null || hooks?.OnResponseStreamEnd != null || hooks?.OnUnaryResponse != null) { var requestStream = response.RequestStream; - if (arbitrator?.OnRequestMessage != null || arbitrator?.OnRequestStreamEnd != null) + if (hooks?.OnRequestMessage != null || hooks?.OnRequestStreamEnd != null) { - requestStream = new WrappedClientStreamWriter(response.RequestStream, arbitrator.OnRequestMessage, arbitrator.OnRequestStreamEnd); + requestStream = new WrappedClientStreamWriter(response.RequestStream, hooks.OnRequestMessage, hooks.OnRequestStreamEnd); } var responseAsync = response.ResponseAsync; - if (arbitrator?.OnUnaryResponse != null) + if (hooks?.OnUnaryResponse != null) { - responseAsync = response.ResponseAsync.ContinueWith(unaryResponse => arbitrator.OnUnaryResponse(unaryResponse.Result)); + responseAsync = response.ResponseAsync.ContinueWith(unaryResponse => hooks.OnUnaryResponse(unaryResponse.Result)); } response = new AsyncClientStreamingCall(requestStream, responseAsync, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); } @@ -232,20 +232,20 @@ namespace Grpc.Core.Interceptors /// public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) { - var arbitrator = InterceptCall(context, true, true, null)?.Freeze(); - context = arbitrator?.Context ?? context; + var hooks = InterceptCall(context, true, true, null)?.Freeze(); + context = hooks?.ContextOverride ?? context; var response = continuation(context); - if (arbitrator?.OnRequestMessage != null || arbitrator?.OnRequestStreamEnd != null || arbitrator?.OnResponseMessage != null || arbitrator?.OnResponseStreamEnd != null) + if (hooks?.OnRequestMessage != null || hooks?.OnRequestStreamEnd != null || hooks?.OnResponseMessage != null || hooks?.OnResponseStreamEnd != null) { var requestStream = response.RequestStream; - if (arbitrator?.OnRequestMessage != null || arbitrator?.OnRequestStreamEnd != null) + if (hooks?.OnRequestMessage != null || hooks?.OnRequestStreamEnd != null) { - requestStream = new WrappedClientStreamWriter(response.RequestStream, arbitrator.OnRequestMessage, arbitrator.OnRequestStreamEnd); + requestStream = new WrappedClientStreamWriter(response.RequestStream, hooks.OnRequestMessage, hooks.OnRequestStreamEnd); } var responseStream = response.ResponseStream; - if (arbitrator?.OnResponseMessage != null || arbitrator?.OnResponseStreamEnd != null) + if (hooks?.OnResponseMessage != null || hooks?.OnResponseStreamEnd != null) { - responseStream = new WrappedAsyncStreamReader(response.ResponseStream, arbitrator.OnResponseMessage, arbitrator.OnResponseStreamEnd); + responseStream = new WrappedAsyncStreamReader(response.ResponseStream, hooks.OnResponseMessage, hooks.OnResponseStreamEnd); } response = new AsyncDuplexStreamingCall(requestStream, responseStream, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); } @@ -259,14 +259,14 @@ namespace Grpc.Core.Interceptors /// Response message type for this method. public override async Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) { - var arbitrator = (await InterceptHandler(context, false, false, request))?.Freeze(); - request = arbitrator?.UnaryRequest ?? request; + var hooks = (await InterceptHandler(context, false, false, request))?.Freeze(); + request = hooks?.UnaryRequestOverride ?? request; var response = await continuation(request, context); - if (arbitrator?.OnUnaryResponse != null) + if (hooks?.OnUnaryResponse != null) { - response = arbitrator.OnUnaryResponse(response); + response = hooks.OnUnaryResponse(response); } - arbitrator?.OnHandlerEnd(); + hooks?.OnHandlerEnd(); return response; } @@ -277,17 +277,17 @@ namespace Grpc.Core.Interceptors /// Response message type for this method. public override async Task ClientStreamingServerHandler(IAsyncStreamReader requestStream, ServerCallContext context, ClientStreamingServerMethod continuation) { - var arbitrator = (await InterceptHandler(context, true, false, null))?.Freeze(); - if (arbitrator?.OnRequestMessage != null || arbitrator?.OnRequestStreamEnd != null) + var hooks = (await InterceptHandler(context, true, false, null))?.Freeze(); + if (hooks?.OnRequestMessage != null || hooks?.OnRequestStreamEnd != null) { - requestStream = new WrappedAsyncStreamReader(requestStream, arbitrator.OnRequestMessage, arbitrator.OnRequestStreamEnd); + requestStream = new WrappedAsyncStreamReader(requestStream, hooks.OnRequestMessage, hooks.OnRequestStreamEnd); } var response = await continuation(requestStream, context); - if (arbitrator?.OnUnaryResponse != null) + if (hooks?.OnUnaryResponse != null) { - response = arbitrator.OnUnaryResponse(response); + response = hooks.OnUnaryResponse(response); } - arbitrator?.OnHandlerEnd(); + hooks?.OnHandlerEnd(); return response; } @@ -298,14 +298,14 @@ namespace Grpc.Core.Interceptors /// Response message type for this method. public override async Task ServerStreamingServerHandler(TRequest request, IServerStreamWriter responseStream, ServerCallContext context, ServerStreamingServerMethod continuation) { - var arbitrator = (await InterceptHandler(context, false, true, request))?.Freeze(); - request = arbitrator?.UnaryRequest ?? request; - if (arbitrator?.OnResponseMessage != null) + var hooks = (await InterceptHandler(context, false, true, request))?.Freeze(); + request = hooks?.UnaryRequestOverride ?? request; + if (hooks?.OnResponseMessage != null) { - responseStream = new WrappedAsyncStreamWriter(responseStream, arbitrator.OnResponseMessage); + responseStream = new WrappedAsyncStreamWriter(responseStream, hooks.OnResponseMessage); } await continuation(request, responseStream, context); - arbitrator?.OnHandlerEnd(); + hooks?.OnHandlerEnd(); } /// @@ -315,17 +315,17 @@ namespace Grpc.Core.Interceptors /// Response message type for this method. public override async Task DuplexStreamingServerHandler(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context, DuplexStreamingServerMethod continuation) { - var arbitrator = (await InterceptHandler(context, true, true, null))?.Freeze(); - if (arbitrator?.OnRequestMessage != null || arbitrator?.OnRequestStreamEnd != null) + var hooks = (await InterceptHandler(context, true, true, null))?.Freeze(); + if (hooks?.OnRequestMessage != null || hooks?.OnRequestStreamEnd != null) { - requestStream = new WrappedAsyncStreamReader(requestStream, arbitrator.OnRequestMessage, arbitrator.OnRequestStreamEnd); + requestStream = new WrappedAsyncStreamReader(requestStream, hooks.OnRequestMessage, hooks.OnRequestStreamEnd); } - if (arbitrator?.OnResponseMessage != null) + if (hooks?.OnResponseMessage != null) { - responseStream = new WrappedAsyncStreamWriter(responseStream, arbitrator.OnResponseMessage); + responseStream = new WrappedAsyncStreamWriter(responseStream, hooks.OnResponseMessage); } await continuation(requestStream, responseStream, context); - arbitrator?.OnHandlerEnd(); + hooks?.OnHandlerEnd(); } private class WrappedAsyncStreamReader : IAsyncStreamReader -- cgit v1.2.3 From 23b9e0de2e6f9e6389ad56f7aa22b94c57b9188b Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Wed, 21 Feb 2018 18:08:57 -0800 Subject: Add test for interceptor registration code paths --- .../Interceptors/ClientInterceptorTest.cs | 40 +++++++++++++++++++--- .../Interceptors/ServerInterceptorTest.cs | 30 ++++++++++------ 2 files changed, 56 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs index 0ec2d848f0..d7c01d08ac 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs @@ -58,6 +58,22 @@ namespace Grpc.Core.Interceptors.Tests Assert.AreEqual("PASS", callInvoker.BlockingUnaryCall(new Method(MethodType.Unary, MockServiceHelper.ServiceName, "Unary", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions(), "")); } + private class CallbackInterceptor : GenericInterceptor + { + readonly Action callback; + + public CallbackInterceptor(Action callback) + { + this.callback = callback; + } + + protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + { + callback(); + return null; + } + } + [Test] public void CheckInterceptorOrderInClientInterceptors() { @@ -69,11 +85,13 @@ namespace Grpc.Core.Interceptors.Tests var server = helper.GetServer(); server.Start(); var stringBuilder = new StringBuilder(); - var callInvoker = helper.GetChannel().Intercept(metadata => - { + var callInvoker = helper.GetChannel().Intercept(metadata => { stringBuilder.Append("interceptor1"); return metadata; - }).Intercept(metadata => + }).Intercept(new CallbackInterceptor(() => stringBuilder.Append("array1")), + new CallbackInterceptor(() => stringBuilder.Append("array2")), + new CallbackInterceptor(() => stringBuilder.Append("array3"))) + .Intercept(metadata => { stringBuilder.Append("interceptor2"); return metadata; @@ -83,7 +101,21 @@ namespace Grpc.Core.Interceptors.Tests return metadata; }); Assert.AreEqual("PASS", callInvoker.BlockingUnaryCall(new Method(MethodType.Unary, MockServiceHelper.ServiceName, "Unary", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions(), "")); - Assert.AreEqual("interceptor3interceptor2interceptor1", stringBuilder.ToString()); + Assert.AreEqual("interceptor3interceptor2array1array2array3interceptor1", stringBuilder.ToString()); + } + + [Test] + public void CheckNullInterceptorRegistrationFails() + { + var helper = new MockServiceHelper(Host); + helper.UnaryHandler = new UnaryServerMethod((request, context) => + { + return Task.FromResult("PASS"); + }); + Assert.Throws(() => helper.GetChannel().Intercept(default(Interceptor))); + Assert.Throws(() => helper.GetChannel().Intercept(new[]{default(Interceptor)})); + Assert.Throws(() => helper.GetChannel().Intercept(new[]{new CallbackInterceptor(()=>{}), null})); + Assert.Throws(() => helper.GetChannel().Intercept(default(Interceptor[]))); } private class CountingInterceptor : GenericInterceptor diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs index 57dd68b1eb..c0957a2b42 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs @@ -50,13 +50,7 @@ namespace Grpc.Core.Interceptors.Tests return Task.FromResult>(null); } - public Metadata.Entry Header - { - get - { - return header; - } - } + public Metadata.Entry Header => header; } [Test] @@ -81,7 +75,6 @@ namespace Grpc.Core.Interceptors.Tests { readonly Action action; - public ArbitraryActionInterceptor(Action action) { this.action = action; @@ -105,13 +98,30 @@ namespace Grpc.Core.Interceptors.Tests var stringBuilder = new StringBuilder(); helper.ServiceDefinition = helper.ServiceDefinition .Intercept(new ArbitraryActionInterceptor(() => stringBuilder.Append("A"))) - .Intercept(new ArbitraryActionInterceptor(() => stringBuilder.Append("B"))) + .Intercept(new ArbitraryActionInterceptor(() => stringBuilder.Append("B1")), + new ArbitraryActionInterceptor(() => stringBuilder.Append("B2")), + new ArbitraryActionInterceptor(() => stringBuilder.Append("B3"))) .Intercept(new ArbitraryActionInterceptor(() => stringBuilder.Append("C"))); var server = helper.GetServer(); server.Start(); var channel = helper.GetChannel(); Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "")); - Assert.AreEqual("CBA", stringBuilder.ToString()); + Assert.AreEqual("CB1B2B3A", stringBuilder.ToString()); + } + + [Test] + public void CheckNullInterceptorRegistrationFails() + { + var helper = new MockServiceHelper(Host); + helper.UnaryHandler = new UnaryServerMethod((request, context) => + { + return Task.FromResult("PASS"); + }); + var sd = helper.ServiceDefinition; + Assert.Throws(() => sd.Intercept(default(Interceptor))); + Assert.Throws(() => sd.Intercept(new[]{default(Interceptor)})); + Assert.Throws(() => sd.Intercept(new[]{new ArbitraryActionInterceptor(()=>{}), null})); + Assert.Throws(() => sd.Intercept(default(Interceptor[]))); } } } -- cgit v1.2.3 From c3d71f8b28d1fffff26f93b4da6e3aed75f78466 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Wed, 21 Feb 2018 23:29:10 -0800 Subject: Add more documentation comments for continuation --- src/csharp/Grpc.Core/Interceptors/Interceptor.cs | 174 ++++++++++++++++++++++- 1 file changed, 170 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core/Interceptors/Interceptor.cs b/src/csharp/Grpc.Core/Interceptors/Interceptor.cs index c277aab2a5..56a30c34af 100644 --- a/src/csharp/Grpc.Core/Interceptors/Interceptor.cs +++ b/src/csharp/Grpc.Core/Interceptors/Interceptor.cs @@ -31,6 +31,12 @@ namespace Grpc.Core.Interceptors { /// /// Represents a continuation for intercepting simple blocking invocations. + /// A delegate of this type is passed to the BlockingUnaryCall method + /// when an outgoing invocation is being intercepted and calling the + /// delegate will invoke the next interceptor in the chain, or the underlying + /// call invoker if called from the last interceptor. The interceptor is + /// allowed to call it zero, one, or multiple times, passing it the appropriate + /// context and request values as it sees fit. /// /// Request message type for this invocation. /// Response message type for this invocation. @@ -39,12 +45,23 @@ namespace Grpc.Core.Interceptors /// The /// instance to pass to the next step in the invocation process. /// + /// + /// The response value of the invocation to return to the caller. + /// The interceptor can choose to return the return value of the + /// continuation delegate or an arbitrary value as it sees fit. + /// public delegate TResponse BlockingUnaryCallContinuation(TRequest request, ClientInterceptorContext context) where TRequest : class where TResponse : class; /// /// Represents a continuation for intercepting simple asynchronous invocations. + /// A delegate of this type is passed to the AsyncUnaryCall method + /// when an outgoing invocation is being intercepted and calling the + /// delegate will invoke the next interceptor in the chain, or the underlying + /// call invoker if called from the last interceptor. The interceptor is + /// allowed to call it zero, one, or multiple times, passing it the appropriate + /// request value and context as it sees fit. /// /// Request message type for this invocation. /// Response message type for this invocation. @@ -53,12 +70,24 @@ namespace Grpc.Core.Interceptors /// The /// instance to pass to the next step in the invocation process. /// + /// + /// An instance of + /// representing an asynchronous invocation of a unary RPC. + /// The interceptor can choose to return the same object returned from + /// the continuation delegate or an arbitrarily constructed instance as it sees fit. + /// public delegate AsyncUnaryCall AsyncUnaryCallContinuation(TRequest request, ClientInterceptorContext context) where TRequest : class where TResponse : class; /// /// Represents a continuation for intercepting asynchronous server-streaming invocations. + /// A delegate of this type is passed to the AsyncServerStreamingCall method + /// when an outgoing invocation is being intercepted and calling the + /// delegate will invoke the next interceptor in the chain, or the underlying + /// call invoker if called from the last interceptor. The interceptor is + /// allowed to call it zero, one, or multiple times, passing it the appropriate + /// request value and context as it sees fit. /// /// Request message type for this invocation. /// Response message type for this invocation. @@ -67,12 +96,24 @@ namespace Grpc.Core.Interceptors /// The /// instance to pass to the next step in the invocation process. /// + /// + /// An instance of + /// representing an asynchronous invocation of a server-streaming RPC. + /// The interceptor can choose to return the same object returned from + /// the continuation delegate or an arbitrarily constructed instance as it sees fit. + /// public delegate AsyncServerStreamingCall AsyncServerStreamingCallContinuation(TRequest request, ClientInterceptorContext context) where TRequest : class where TResponse : class; /// /// Represents a continuation for intercepting asynchronous client-streaming invocations. + /// A delegate of this type is passed to the AsyncClientStreamingCall method + /// when an outgoing invocation is being intercepted and calling the + /// delegate will invoke the next interceptor in the chain, or the underlying + /// call invoker if called from the last interceptor. The interceptor is + /// allowed to call it zero, one, or multiple times, passing it the appropriate + /// request value and context as it sees fit. /// /// Request message type for this invocation. /// Response message type for this invocation. @@ -80,17 +121,35 @@ namespace Grpc.Core.Interceptors /// The /// instance to pass to the next step in the invocation process. /// + /// + /// An instance of + /// representing an asynchronous invocation of a client-streaming RPC. + /// The interceptor can choose to return the same object returned from + /// the continuation delegate or an arbitrarily constructed instance as it sees fit. + /// public delegate AsyncClientStreamingCall AsyncClientStreamingCallContinuation(ClientInterceptorContext context) where TRequest : class where TResponse : class; /// /// Represents a continuation for intercepting asynchronous duplex invocations. + /// A delegate of this type is passed to the AsyncDuplexStreamingCall method + /// when an outgoing invocation is being intercepted and calling the + /// delegate will invoke the next interceptor in the chain, or the underlying + /// call invoker if called from the last interceptor. The interceptor is + /// allowed to call it zero, one, or multiple times, passing it the appropriate + /// request value and context as it sees fit. /// /// /// The /// instance to pass to the next step in the invocation process. /// + /// + /// An instance of + /// representing an asynchronous invocation of a duplex-streaming RPC. + /// The interceptor can choose to return the same object returned from + /// the continuation delegate or an arbitrarily constructed instance as it sees fit. + /// public delegate AsyncDuplexStreamingCall AsyncDuplexStreamingCallContinuation(ClientInterceptorContext context) where TRequest : class where TResponse : class; @@ -106,8 +165,15 @@ namespace Grpc.Core.Interceptors /// /// The callback that continues the invocation process. /// This can be invoked zero or more times by the interceptor. + /// The interceptor can invoke the continuation passing the given + /// request value and context arguments, or substitute them as it sees fit. /// - /// The response message of the current invocation. + /// + /// The response message of the current invocation. + /// The interceptor can simply return the return value of the + /// continuation delegate passed to it intact, or an arbitrary + /// value as it sees fit. + /// public virtual TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) where TRequest : class where TResponse : class @@ -126,7 +192,16 @@ namespace Grpc.Core.Interceptors /// /// The callback that continues the invocation process. /// This can be invoked zero or more times by the interceptor. + /// The interceptor can invoke the continuation passing the given + /// request value and context arguments, or substitute them as it sees fit. /// + /// + /// An instance of + /// representing an asynchronous unary invocation. + /// The interceptor can simply return the return value of the + /// continuation delegate passed to it intact, or construct its + /// own substitute as it sees fit. + /// public virtual AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) where TRequest : class where TResponse : class @@ -145,7 +220,16 @@ namespace Grpc.Core.Interceptors /// /// The callback that continues the invocation process. /// This can be invoked zero or more times by the interceptor. + /// The interceptor can invoke the continuation passing the given + /// request value and context arguments, or substitute them as it sees fit. /// + /// + /// An instance of + /// representing an asynchronous server-streaming invocation. + /// The interceptor can simply return the return value of the + /// continuation delegate passed to it intact, or construct its + /// own substitute as it sees fit. + /// public virtual AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) where TRequest : class where TResponse : class @@ -163,7 +247,16 @@ namespace Grpc.Core.Interceptors /// /// The callback that continues the invocation process. /// This can be invoked zero or more times by the interceptor. + /// The interceptor can invoke the continuation passing the given + /// context argument, or substitute as it sees fit. /// + /// + /// An instance of + /// representing an asynchronous client-streaming invocation. + /// The interceptor can simply return the return value of the + /// continuation delegate passed to it intact, or construct its + /// own substitute as it sees fit. + /// public virtual AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) where TRequest : class where TResponse : class @@ -181,7 +274,16 @@ namespace Grpc.Core.Interceptors /// /// The callback that continues the invocation process. /// This can be invoked zero or more times by the interceptor. + /// The interceptor can invoke the continuation passing the given + /// context argument, or substitute as it sees fit. /// + /// + /// An instance of + /// representing an asynchronous duplex-streaming invocation. + /// The interceptor can simply return the return value of the + /// continuation delegate passed to it intact, or construct its + /// own substitute as it sees fit. + /// public virtual AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) where TRequest : class where TResponse : class @@ -190,10 +292,27 @@ namespace Grpc.Core.Interceptors } /// - /// Server-side handler for intercepting unary calls. + /// Server-side handler for intercepting and incoming unary call. /// /// Request message type for this method. /// Response message type for this method. + /// The request value of the incoming invocation. + /// + /// An instance of representing + /// the context of the invocation. + /// + /// + /// A delegate that asynchronously proceeds with the invocation, calling + /// the next interceptor in the chain, or the service request handler, + /// in case of the last interceptor and return the response value of + /// the RPC. The interceptor can choose to call it zero or more times + /// at its discretion. + /// + /// + /// A future representing the response value of the RPC. The interceptor + /// can simply return the return value from the continuation intact, + /// or an arbitrary response value as it sees fit. + /// public virtual Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) where TRequest : class where TResponse : class @@ -206,6 +325,25 @@ namespace Grpc.Core.Interceptors /// /// Request message type for this method. /// Response message type for this method. + /// The request stream of the incoming invocation. + /// + /// An instance of representing + /// the context of the invocation. + /// + /// + /// A delegate that asynchronously proceeds with the invocation, calling + /// the next interceptor in the chain, or the service request handler, + /// in case of the last interceptor and return the response value of + /// the RPC. The interceptor can choose to call it zero or more times + /// at its discretion. + /// + /// + /// A future representing the response value of the RPC. The interceptor + /// can simply return the return value from the continuation intact, + /// or an arbitrary response value as it sees fit. The interceptor has + /// the ability to wrap or substitute the request stream when calling + /// the continuation. + /// public virtual Task ClientStreamingServerHandler(IAsyncStreamReader requestStream, ServerCallContext context, ClientStreamingServerMethod continuation) where TRequest : class where TResponse : class @@ -214,10 +352,24 @@ namespace Grpc.Core.Interceptors } /// - /// Server-side handler for intercepting server streaming calls. + /// Server-side handler for intercepting server streaming call. /// /// Request message type for this method. /// Response message type for this method. + /// The request value of the incoming invocation. + /// The response stream of the incoming invocation. + /// + /// An instance of representing + /// the context of the invocation. + /// + /// + /// A delegate that asynchronously proceeds with the invocation, calling + /// the next interceptor in the chain, or the service request handler, + /// in case of the last interceptor and the interceptor can choose to + /// call it zero or more times at its discretion. The interceptor has + /// the ability to wrap or substitute the request value and the response stream + /// when calling the continuation. + /// public virtual Task ServerStreamingServerHandler(TRequest request, IServerStreamWriter responseStream, ServerCallContext context, ServerStreamingServerMethod continuation) where TRequest : class where TResponse : class @@ -226,10 +378,24 @@ namespace Grpc.Core.Interceptors } /// - /// Server-side handler for intercepting bidi streaming calls. + /// Server-side handler for intercepting bidirectional streaming calls. /// /// Request message type for this method. /// Response message type for this method. + /// The request stream of the incoming invocation. + /// The response stream of the incoming invocation. + /// + /// An instance of representing + /// the context of the invocation. + /// + /// + /// A delegate that asynchronously proceeds with the invocation, calling + /// the next interceptor in the chain, or the service request handler, + /// in case of the last interceptor and the interceptor can choose to + /// call it zero or more times at its discretion. The interceptor has + /// the ability to wrap or substitute the request and response streams + /// when calling the continuation. + /// public virtual Task DuplexStreamingServerHandler(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context, DuplexStreamingServerMethod continuation) where TRequest : class where TResponse : class -- cgit v1.2.3 From 8f1eab1b290491c8db084938981eef124ccf0639 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Thu, 22 Feb 2018 07:10:25 -0800 Subject: Move InterceptingCallInvoker to its own file --- .../Interceptors/CallInvokerExtensions.cs | 159 ++++++--------------- .../Interceptors/InterceptingCallInvoker.cs | 96 +++++++++++++ 2 files changed, 138 insertions(+), 117 deletions(-) create mode 100644 src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs (limited to 'src') diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs index 1c0831a242..a01865cf2f 100644 --- a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs @@ -28,123 +28,6 @@ namespace Grpc.Core.Interceptors /// public static class CallInvokerExtensions { - /// - /// Decorates an underlying to - /// intercept calls through a given interceptor. - /// - private class InterceptingCallInvoker : CallInvoker - { - readonly CallInvoker invoker; - readonly Interceptor interceptor; - - /// - /// Creates a new instance of - /// with the given underlying invoker and interceptor instances. - /// - public InterceptingCallInvoker(CallInvoker invoker, Interceptor interceptor) - { - this.invoker = GrpcPreconditions.CheckNotNull(invoker, "invoker"); - this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); - } - - /// - /// Intercepts a simple blocking call with the registered interceptor. - /// - public override TResponse BlockingUnaryCall(Method method, string host, CallOptions options, TRequest request) - { - return interceptor.BlockingUnaryCall( - request, - new ClientInterceptorContext(method, host, options), - (req, ctx) => invoker.BlockingUnaryCall(ctx.Method, ctx.Host, ctx.Options, req)); - } - - /// - /// Intercepts a simple asynchronous call with the registered interceptor. - /// - public override AsyncUnaryCall AsyncUnaryCall(Method method, string host, CallOptions options, TRequest request) - { - return interceptor.AsyncUnaryCall( - request, - new ClientInterceptorContext(method, host, options), - (req, ctx) => invoker.AsyncUnaryCall(ctx.Method, ctx.Host, ctx.Options, req)); - } - - /// - /// Intercepts an asynchronous server streaming call with the registered interceptor. - /// - public override AsyncServerStreamingCall AsyncServerStreamingCall(Method method, string host, CallOptions options, TRequest request) - { - return interceptor.AsyncServerStreamingCall( - request, - new ClientInterceptorContext(method, host, options), - (req, ctx) => invoker.AsyncServerStreamingCall(ctx.Method, ctx.Host, ctx.Options, req)); - } - - /// - /// Intercepts an asynchronous client streaming call with the registered interceptor. - /// - public override AsyncClientStreamingCall AsyncClientStreamingCall(Method method, string host, CallOptions options) - { - return interceptor.AsyncClientStreamingCall( - new ClientInterceptorContext(method, host, options), - ctx => invoker.AsyncClientStreamingCall(ctx.Method, ctx.Host, ctx.Options)); - } - - /// - /// Intercepts an asynchronous duplex streaming call with the registered interceptor. - /// - public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(Method method, string host, CallOptions options) - { - return interceptor.AsyncDuplexStreamingCall( - new ClientInterceptorContext(method, host, options), - ctx => invoker.AsyncDuplexStreamingCall(ctx.Method, ctx.Host, ctx.Options)); - } - } - - private class MetadataInterceptor : GenericInterceptor - { - readonly Func interceptor; - - /// - /// Creates a new instance of MetadataInterceptor given the specified interceptor function. - /// - public MetadataInterceptor(Func interceptor) - { - this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); - } - - protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) - { - var metadata = context.Options.Headers ?? new Metadata(); - return new ClientCallHooks - { - ContextOverride = new ClientInterceptorContext(context.Method, context.Host, context.Options.WithHeaders(interceptor(metadata))), - }; - } - } - - /// - /// Returns a instance that intercepts - /// the invoker with the given interceptor. - /// - /// The underlying invoker to intercept. - /// - /// An interceptor delegate that takes the request metadata to be sent with an outgoing call - /// and returns a instance that will replace the existing - /// invocation metadata. - /// - /// - /// Multiple interceptors can be added on top of each other by calling - /// "invoker.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". - /// Interceptors can be later added to an existing intercepted CallInvoker, effectively - /// building a chain like "invoker.Intercept(c).Intercept(b).Intercept(a)". Note that - /// in this case, the last interceptor added will be the first to take control. - /// - public static CallInvoker Intercept(this CallInvoker invoker, Func interceptor) - { - return new InterceptingCallInvoker(invoker, new MetadataInterceptor(interceptor)); - } - /// /// Returns a instance that intercepts /// the invoker with the given interceptor. @@ -191,5 +74,47 @@ namespace Grpc.Core.Interceptors return invoker; } + + /// + /// Returns a instance that intercepts + /// the invoker with the given interceptor. + /// + /// The underlying invoker to intercept. + /// + /// An interceptor delegate that takes the request metadata to be sent with an outgoing call + /// and returns a instance that will replace the existing + /// invocation metadata. + /// + /// + /// Multiple interceptors can be added on top of each other by + /// building a chain like "invoker.Intercept(c).Intercept(b).Intercept(a)". Note that + /// in this case, the last interceptor added will be the first to take control. + /// + public static CallInvoker Intercept(this CallInvoker invoker, Func interceptor) + { + return new InterceptingCallInvoker(invoker, new MetadataInterceptor(interceptor)); + } + + private class MetadataInterceptor : GenericInterceptor + { + readonly Func interceptor; + + /// + /// Creates a new instance of MetadataInterceptor given the specified interceptor function. + /// + public MetadataInterceptor(Func interceptor) + { + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + } + + protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + { + var metadata = context.Options.Headers ?? new Metadata(); + return new ClientCallHooks + { + ContextOverride = new ClientInterceptorContext(context.Method, context.Host, context.Options.WithHeaders(interceptor(metadata))), + }; + } + } } } diff --git a/src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs b/src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs new file mode 100644 index 0000000000..fb06523abb --- /dev/null +++ b/src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs @@ -0,0 +1,96 @@ +#region Copyright notice and license + +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using Grpc.Core.Utils; + +namespace Grpc.Core.Interceptors +{ + /// + /// Decorates an underlying to + /// intercept calls through a given interceptor. + /// + internal class InterceptingCallInvoker : CallInvoker + { + readonly CallInvoker invoker; + readonly Interceptor interceptor; + + /// + /// Creates a new instance of + /// with the given underlying invoker and interceptor instances. + /// + public InterceptingCallInvoker(CallInvoker invoker, Interceptor interceptor) + { + this.invoker = GrpcPreconditions.CheckNotNull(invoker, "invoker"); + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + } + + /// + /// Intercepts a simple blocking call with the registered interceptor. + /// + public override TResponse BlockingUnaryCall(Method method, string host, CallOptions options, TRequest request) + { + return interceptor.BlockingUnaryCall( + request, + new ClientInterceptorContext(method, host, options), + (req, ctx) => invoker.BlockingUnaryCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// + /// Intercepts a simple asynchronous call with the registered interceptor. + /// + public override AsyncUnaryCall AsyncUnaryCall(Method method, string host, CallOptions options, TRequest request) + { + return interceptor.AsyncUnaryCall( + request, + new ClientInterceptorContext(method, host, options), + (req, ctx) => invoker.AsyncUnaryCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// + /// Intercepts an asynchronous server streaming call with the registered interceptor. + /// + public override AsyncServerStreamingCall AsyncServerStreamingCall(Method method, string host, CallOptions options, TRequest request) + { + return interceptor.AsyncServerStreamingCall( + request, + new ClientInterceptorContext(method, host, options), + (req, ctx) => invoker.AsyncServerStreamingCall(ctx.Method, ctx.Host, ctx.Options, req)); + } + + /// + /// Intercepts an asynchronous client streaming call with the registered interceptor. + /// + public override AsyncClientStreamingCall AsyncClientStreamingCall(Method method, string host, CallOptions options) + { + return interceptor.AsyncClientStreamingCall( + new ClientInterceptorContext(method, host, options), + ctx => invoker.AsyncClientStreamingCall(ctx.Method, ctx.Host, ctx.Options)); + } + + /// + /// Intercepts an asynchronous duplex streaming call with the registered interceptor. + /// + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(Method method, string host, CallOptions options) + { + return interceptor.AsyncDuplexStreamingCall( + new ClientInterceptorContext(method, host, options), + ctx => invoker.AsyncDuplexStreamingCall(ctx.Method, ctx.Host, ctx.Options)); + } + } +} -- cgit v1.2.3 From 074b802c9f3b1c22f57f5cea57e755487cc01832 Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Thu, 22 Feb 2018 07:19:47 -0800 Subject: Polish and address review comments --- src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs | 6 +++--- src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs | 4 +--- .../Grpc.Core/Interceptors/ClientInterceptorContext.cs | 12 ++++++------ src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs | 4 ++-- .../Interceptors/ServerServiceDefinitionExtensions.cs | 8 ++++---- src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 10 +++++----- 6 files changed, 21 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs index a01865cf2f..277800ce99 100644 --- a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs @@ -64,8 +64,8 @@ namespace Grpc.Core.Interceptors /// public static CallInvoker Intercept(this CallInvoker invoker, params Interceptor[] interceptors) { - GrpcPreconditions.CheckNotNull(invoker, "invoker"); - GrpcPreconditions.CheckNotNull(interceptors, "interceptors"); + GrpcPreconditions.CheckNotNull(invoker, nameof(invoker); + GrpcPreconditions.CheckNotNull(interceptors, nameof(interceptors)); foreach (var interceptor in interceptors.Reverse()) { @@ -104,7 +104,7 @@ namespace Grpc.Core.Interceptors /// public MetadataInterceptor(Func interceptor) { - this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); } protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) diff --git a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs index a095b05925..00b2fa8bec 100644 --- a/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/ChannelExtensions.cs @@ -76,9 +76,7 @@ namespace Grpc.Core.Interceptors /// invocation metadata. /// /// - /// Multiple interceptors can be added on top of each other by calling - /// "channel.Intercept(a, b, c)". The order of invocation will be "a", "b", and then "c". - /// Interceptors can be later added to an existing intercepted channel, effectively + /// Multiple interceptors can be added on top of each other by /// building a chain like "channel.Intercept(c).Intercept(b).Intercept(a)". Note that /// in this case, the last interceptor added will be the first to take control. /// diff --git a/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs b/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs index 64d7297a0a..de06a77077 100644 --- a/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs +++ b/src/csharp/Grpc.Core/Interceptors/ClientInterceptorContext.cs @@ -46,20 +46,20 @@ namespace Grpc.Core.Interceptors } /// - /// Gets or sets the instance + /// Gets the instance /// representing the method to be invoked. /// - public Method Method { get; set; } + public Method Method { get; } /// - /// Gets or sets the host that the currect invocation will be dispatched to. + /// Gets the host that the currect invocation will be dispatched to. /// - public string Host { get; set; } + public string Host { get; } /// - /// Gets or sets the structure representing the + /// Gets the structure representing the /// call options associated with the current invocation. /// - public CallOptions Options { get; set; } + public CallOptions Options { get; } } } diff --git a/src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs b/src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs index fb06523abb..84d2a0b958 100644 --- a/src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs +++ b/src/csharp/Grpc.Core/Interceptors/InterceptingCallInvoker.cs @@ -36,8 +36,8 @@ namespace Grpc.Core.Interceptors /// public InterceptingCallInvoker(CallInvoker invoker, Interceptor interceptor) { - this.invoker = GrpcPreconditions.CheckNotNull(invoker, "invoker"); - this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + this.invoker = GrpcPreconditions.CheckNotNull(invoker, nameof(invoker)); + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); } /// diff --git a/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs b/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs index 21a0782037..b9b53247ce 100644 --- a/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/ServerServiceDefinitionExtensions.cs @@ -44,8 +44,8 @@ namespace Grpc.Core.Interceptors /// public static ServerServiceDefinition Intercept(this ServerServiceDefinition serverServiceDefinition, Interceptor interceptor) { - GrpcPreconditions.CheckNotNull(serverServiceDefinition, "serverServiceDefinition"); - GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + GrpcPreconditions.CheckNotNull(serverServiceDefinition, nameof(serverServiceDefinition)); + GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); return new ServerServiceDefinition(serverServiceDefinition.CallHandlers.ToDictionary(x => x.Key, x => x.Value.Intercept(interceptor))); } @@ -68,8 +68,8 @@ namespace Grpc.Core.Interceptors /// public static ServerServiceDefinition Intercept(this ServerServiceDefinition serverServiceDefinition, params Interceptor[] interceptors) { - GrpcPreconditions.CheckNotNull(serverServiceDefinition, "serverServiceDefinition"); - GrpcPreconditions.CheckNotNull(interceptors, "interceptors"); + GrpcPreconditions.CheckNotNull(serverServiceDefinition, nameof(serverServiceDefinition)); + GrpcPreconditions.CheckNotNull(interceptors, nameof(interceptors)); foreach (var interceptor in interceptors.Reverse()) { diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index add72ad68d..81522cf8fe 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -76,7 +76,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler or interceptors."); + Logger.Warning(e, "Exception occurred in the handler or an interceptor."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -138,7 +138,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler or interceptors."); + Logger.Warning(e, "Exception occurred in the handler or an interceptor."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -201,7 +201,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler or interceptor."); + Logger.Warning(e, "Exception occurred in the handler or an interceptor."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -262,7 +262,7 @@ namespace Grpc.Core.Internal { if (!(e is RpcException)) { - Logger.Warning(e, "Exception occured in handler or interceptor."); + Logger.Warning(e, "Exception occurred in the handler or an interceptor."); } status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } @@ -313,7 +313,7 @@ namespace Grpc.Core.Internal public IServerCallHandler Intercept(Interceptor interceptor) { - return this; // Do not intercept unimplemented services + return this; // Do not intercept unimplemented methods. } } -- cgit v1.2.3 From a7c1b6251c151bbb3b020e88ab340cedb4ca4d0d Mon Sep 17 00:00:00 2001 From: Mehrdad Afshari Date: Thu, 22 Feb 2018 07:52:48 -0800 Subject: Eliminate GenericInterceptor to simplify this PR --- .../Interceptors/ClientInterceptorTest.cs | 130 ++++-- .../Interceptors/ServerInterceptorTest.cs | 93 +++-- src/csharp/Grpc.Core/ClientBase.cs | 46 ++- .../Interceptors/CallInvokerExtensions.cs | 38 +- .../Grpc.Core/Interceptors/GenericInterceptor.cs | 449 --------------------- 5 files changed, 208 insertions(+), 548 deletions(-) delete mode 100644 src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs (limited to 'src') diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs index d7c01d08ac..02f6f6ffc6 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ClientInterceptorTest.cs @@ -58,22 +58,6 @@ namespace Grpc.Core.Interceptors.Tests Assert.AreEqual("PASS", callInvoker.BlockingUnaryCall(new Method(MethodType.Unary, MockServiceHelper.ServiceName, "Unary", Marshallers.StringMarshaller, Marshallers.StringMarshaller), Host, new CallOptions(), "")); } - private class CallbackInterceptor : GenericInterceptor - { - readonly Action callback; - - public CallbackInterceptor(Action callback) - { - this.callback = callback; - } - - protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) - { - callback(); - return null; - } - } - [Test] public void CheckInterceptorOrderInClientInterceptors() { @@ -118,23 +102,6 @@ namespace Grpc.Core.Interceptors.Tests Assert.Throws(() => helper.GetChannel().Intercept(default(Interceptor[]))); } - private class CountingInterceptor : GenericInterceptor - { - protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) - { - if (!clientStreaming) - { - return null; - } - int counter = 0; - return new ClientCallHooks - { - OnRequestMessage = m => { counter++; return m; }, - OnUnaryResponse = x => (TResponse)(object)counter.ToString() // Cast to object first is needed to satisfy the type-checker - }; - } - } - [Test] public async Task CountNumberOfRequestsInClientInterceptors() { @@ -151,7 +118,7 @@ namespace Grpc.Core.Interceptors.Tests return stringBuilder.ToString(); }); - var callInvoker = helper.GetChannel().Intercept(new CountingInterceptor()); + var callInvoker = helper.GetChannel().Intercept(new ClientStreamingCountingInterceptor()); var server = helper.GetServer(); server.Start(); @@ -162,5 +129,100 @@ namespace Grpc.Core.Interceptors.Tests Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode); Assert.IsNotNull(call.GetTrailers()); } + + private class CallbackInterceptor : Interceptor + { + readonly Action callback; + + public CallbackInterceptor(Action callback) + { + this.callback = GrpcPreconditions.CheckNotNull(callback, nameof(callback)); + } + + public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) + { + callback(); + return continuation(request, context); + } + + public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) + { + callback(); + return continuation(request, context); + } + + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) + { + callback(); + return continuation(request, context); + } + + public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) + { + callback(); + return continuation(context); + } + + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) + { + callback(); + return continuation(context); + } + } + + private class ClientStreamingCountingInterceptor : Interceptor + { + public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) + { + var response = continuation(context); + int counter = 0; + var requestStream = new WrappedClientStreamWriter(response.RequestStream, + message => { counter++; return message; }, null); + var responseAsync = response.ResponseAsync.ContinueWith( + unaryResponse => (TResponse)(object)counter.ToString() // Cast to object first is needed to satisfy the type-checker + ); + return new AsyncClientStreamingCall(requestStream, responseAsync, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); + } + } + + private class WrappedClientStreamWriter : IClientStreamWriter + { + readonly IClientStreamWriter writer; + readonly Func onMessage; + readonly Action onResponseStreamEnd; + public WrappedClientStreamWriter(IClientStreamWriter writer, Func onMessage, Action onResponseStreamEnd) + { + this.writer = writer; + this.onMessage = onMessage; + this.onResponseStreamEnd = onResponseStreamEnd; + } + public Task CompleteAsync() + { + if (onResponseStreamEnd != null) + { + return writer.CompleteAsync().ContinueWith(x => onResponseStreamEnd()); + } + return writer.CompleteAsync(); + } + public Task WriteAsync(T message) + { + if (onMessage != null) + { + message = onMessage(message); + } + return writer.WriteAsync(message); + } + public WriteOptions WriteOptions + { + get + { + return writer.WriteOptions; + } + set + { + writer.WriteOptions = value; + } + } + } } } diff --git a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs index c0957a2b42..e76f21d098 100644 --- a/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs +++ b/src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs @@ -35,33 +35,17 @@ namespace Grpc.Core.Interceptors.Tests { const string Host = "127.0.0.1"; - private class AddRequestHeaderServerInterceptor : GenericInterceptor - { - readonly Metadata.Entry header; - - public AddRequestHeaderServerInterceptor(string key, string value) - { - this.header = new Metadata.Entry(key, value); - } - - protected override Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) - { - context.RequestHeaders.Add(header); - return Task.FromResult>(null); - } - - public Metadata.Entry Header => header; - } - [Test] public void AddRequestHeaderInServerInterceptor() { var helper = new MockServiceHelper(Host); - var interceptor = new AddRequestHeaderServerInterceptor("x-interceptor", "hello world"); + const string MetadataKey = "x-interceptor"; + const string MetadataValue = "hello world"; + var interceptor = new ServerCallContextInterceptor(ctx => ctx.RequestHeaders.Add(new Metadata.Entry(MetadataKey, MetadataValue))); helper.UnaryHandler = new UnaryServerMethod((request, context) => { - var interceptorHeader = context.RequestHeaders.Last(m => (m.Key == interceptor.Header.Key)).Value; - Assert.AreEqual(interceptorHeader, interceptor.Header.Value); + var interceptorHeader = context.RequestHeaders.Last(m => (m.Key == MetadataKey)).Value; + Assert.AreEqual(interceptorHeader, MetadataValue); return Task.FromResult("PASS"); }); helper.ServiceDefinition = helper.ServiceDefinition.Intercept(interceptor); @@ -71,22 +55,6 @@ namespace Grpc.Core.Interceptors.Tests Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "")); } - private class ArbitraryActionInterceptor : GenericInterceptor - { - readonly Action action; - - public ArbitraryActionInterceptor(Action action) - { - this.action = action; - } - - protected override Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) - { - action(); - return Task.FromResult>(null); - } - } - [Test] public void VerifyInterceptorOrdering() { @@ -97,11 +65,11 @@ namespace Grpc.Core.Interceptors.Tests }); var stringBuilder = new StringBuilder(); helper.ServiceDefinition = helper.ServiceDefinition - .Intercept(new ArbitraryActionInterceptor(() => stringBuilder.Append("A"))) - .Intercept(new ArbitraryActionInterceptor(() => stringBuilder.Append("B1")), - new ArbitraryActionInterceptor(() => stringBuilder.Append("B2")), - new ArbitraryActionInterceptor(() => stringBuilder.Append("B3"))) - .Intercept(new ArbitraryActionInterceptor(() => stringBuilder.Append("C"))); + .Intercept(new ServerCallContextInterceptor(ctx => stringBuilder.Append("A"))) + .Intercept(new ServerCallContextInterceptor(ctx => stringBuilder.Append("B1")), + new ServerCallContextInterceptor(ctx => stringBuilder.Append("B2")), + new ServerCallContextInterceptor(ctx => stringBuilder.Append("B3"))) + .Intercept(new ServerCallContextInterceptor(ctx => stringBuilder.Append("C"))); var server = helper.GetServer(); server.Start(); var channel = helper.GetChannel(); @@ -113,15 +81,46 @@ namespace Grpc.Core.Interceptors.Tests public void CheckNullInterceptorRegistrationFails() { var helper = new MockServiceHelper(Host); - helper.UnaryHandler = new UnaryServerMethod((request, context) => - { - return Task.FromResult("PASS"); - }); var sd = helper.ServiceDefinition; Assert.Throws(() => sd.Intercept(default(Interceptor))); Assert.Throws(() => sd.Intercept(new[]{default(Interceptor)})); - Assert.Throws(() => sd.Intercept(new[]{new ArbitraryActionInterceptor(()=>{}), null})); + Assert.Throws(() => sd.Intercept(new[]{new ServerCallContextInterceptor(ctx=>{}), null})); Assert.Throws(() => sd.Intercept(default(Interceptor[]))); } + + private class ServerCallContextInterceptor : Interceptor + { + readonly Action interceptor; + + public ServerCallContextInterceptor(Action interceptor) + { + GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); + this.interceptor = interceptor; + } + + public override Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) + { + interceptor(context); + return continuation(request, context); + } + + public override Task ClientStreamingServerHandler(IAsyncStreamReader requestStream, ServerCallContext context, ClientStreamingServerMethod continuation) + { + interceptor(context); + return continuation(requestStream, context); + } + + public override Task ServerStreamingServerHandler(TRequest request, IServerStreamWriter responseStream, ServerCallContext context, ServerStreamingServerMethod continuation) + { + interceptor(context); + return continuation(request, responseStream, context); + } + + public override Task DuplexStreamingServerHandler(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context, DuplexStreamingServerMethod continuation) + { + interceptor(context); + return continuation(requestStream, responseStream, context); + } + } } } diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index 4bb06ed87f..fac34071be 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -149,25 +149,49 @@ namespace Grpc.Core /// protected internal class ClientBaseConfiguration { - private class ClientHeaderInterceptor : GenericInterceptor + private class ClientBaseConfigurationInterceptor : Interceptor { readonly Func> interceptor; /// - /// Creates a new instance of ClientHeaderInterceptor given the specified header interceptor function. + /// Creates a new instance of ClientBaseConfigurationInterceptor given the specified header and host interceptor function. /// - public ClientHeaderInterceptor(Func> interceptor) + public ClientBaseConfigurationInterceptor(Func> interceptor) { - this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, "interceptor"); + this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); } - protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + private ClientInterceptorContext GetNewContext(ClientInterceptorContext context) + where TRequest : class + where TResponse : class { var newHostAndCallOptions = interceptor(context.Method, context.Host, context.Options); - return new ClientCallHooks - { - ContextOverride = new ClientInterceptorContext(context.Method, newHostAndCallOptions.Item1, newHostAndCallOptions.Item2) - }; + return new ClientInterceptorContext(context.Method, newHostAndCallOptions.Item1, newHostAndCallOptions.Item2); + } + + public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) + { + return continuation(GetNewContext(context)); + } + + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) + { + return continuation(GetNewContext(context)); } } @@ -182,12 +206,12 @@ namespace Grpc.Core internal CallInvoker CreateDecoratedCallInvoker() { - return undecoratedCallInvoker.Intercept(new ClientHeaderInterceptor((method, host, options) => Tuple.Create(this.host, options))); + return undecoratedCallInvoker.Intercept(new ClientBaseConfigurationInterceptor((method, host, options) => Tuple.Create(this.host, options))); } internal ClientBaseConfiguration WithHost(string host) { - GrpcPreconditions.CheckNotNull(host, "host"); + GrpcPreconditions.CheckNotNull(host, nameof(host)); return new ClientBaseConfiguration(this.undecoratedCallInvoker, host); } } diff --git a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs index 277800ce99..421b5d379e 100644 --- a/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs +++ b/src/csharp/Grpc.Core/Interceptors/CallInvokerExtensions.cs @@ -64,7 +64,7 @@ namespace Grpc.Core.Interceptors /// public static CallInvoker Intercept(this CallInvoker invoker, params Interceptor[] interceptors) { - GrpcPreconditions.CheckNotNull(invoker, nameof(invoker); + GrpcPreconditions.CheckNotNull(invoker, nameof(invoker)); GrpcPreconditions.CheckNotNull(interceptors, nameof(interceptors)); foreach (var interceptor in interceptors.Reverse()) @@ -95,7 +95,7 @@ namespace Grpc.Core.Interceptors return new InterceptingCallInvoker(invoker, new MetadataInterceptor(interceptor)); } - private class MetadataInterceptor : GenericInterceptor + private class MetadataInterceptor : Interceptor { readonly Func interceptor; @@ -107,13 +107,37 @@ namespace Grpc.Core.Interceptors this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor)); } - protected override ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) + private ClientInterceptorContext GetNewContext(ClientInterceptorContext context) + where TRequest : class + where TResponse : class { var metadata = context.Options.Headers ?? new Metadata(); - return new ClientCallHooks - { - ContextOverride = new ClientInterceptorContext(context.Method, context.Host, context.Options.WithHeaders(interceptor(metadata))), - }; + return new ClientInterceptorContext(context.Method, context.Host, context.Options.WithHeaders(interceptor(metadata))); + } + + public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) + { + return continuation(request, GetNewContext(context)); + } + + public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) + { + return continuation(GetNewContext(context)); + } + + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) + { + return continuation(GetNewContext(context)); } } } diff --git a/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs b/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs deleted file mode 100644 index 7ee649e9b5..0000000000 --- a/src/csharp/Grpc.Core/Interceptors/GenericInterceptor.cs +++ /dev/null @@ -1,449 +0,0 @@ -#region Copyright notice and license - -// Copyright 2018 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#endregion - -using System; -using System.Threading; -using System.Threading.Tasks; -using Grpc.Core.Internal; - -namespace Grpc.Core.Interceptors -{ - /// - /// Provides a base class for generic interceptor implementations that raises - /// events and hooks to control the RPC lifecycle. - /// - internal abstract class GenericInterceptor : Interceptor - { - /// - /// Provides hooks through which an invocation should be intercepted. - /// - public sealed class ClientCallHooks - where TRequest : class - where TResponse : class - { - internal ClientCallHooks Freeze() - { - return (ClientCallHooks)MemberwiseClone(); - } - /// - /// Override the context for the outgoing invocation. - /// - public ClientInterceptorContext? ContextOverride { get; set; } - /// - /// Override the request for the outgoing invocation for non-client-streaming invocations. - /// - public TRequest UnaryRequestOverride { get; set; } - /// - /// Delegate that intercepts a response from a non-server-streaming invocation and optionally overrides it. - /// - public Func OnUnaryResponse { get; set; } - /// - /// Delegate that intercepts each request message for a client-streaming invocation and optionally overrides each message. - /// - public Func OnRequestMessage { get; set; } - /// - /// Delegate that intercepts each response message for a server-streaming invocation and optionally overrides each message. - /// - public Func OnResponseMessage { get; set; } - /// - /// Callback that gets invoked when response stream is finished. - /// - public Action OnResponseStreamEnd { get; set; } - /// - /// Callback that gets invoked when request stream is finished. - /// - public Action OnRequestStreamEnd { get; set; } - } - - /// - /// Intercepts an outgoing call from the client side. - /// Derived classes that intend to intercept outgoing invocations from the client side should - /// override this and return the appropriate hooks in the form of a ClientCallHooks instance. - /// - /// The context of the outgoing invocation. - /// True if the invocation is client-streaming. - /// True if the invocation is server-streaming. - /// The request message for client-unary invocations, null otherwise. - /// Request message type for the current invocation. - /// Response message type for the current invocation. - /// - /// The derived class should return an instance of ClientCallHooks to control the trajectory - /// as they see fit, or null if it does not intend to pursue the invocation any further. - /// - protected virtual ClientCallHooks InterceptCall(ClientInterceptorContext context, bool clientStreaming, bool serverStreaming, TRequest request) - where TRequest : class - where TResponse : class - { - return null; - } - - /// - /// Provides hooks through which a server-side handler should be intercepted. - /// - public sealed class ServerCallHooks - where TRequest : class - where TResponse : class - { - internal ServerCallHooks Freeze() - { - return (ServerCallHooks)MemberwiseClone(); - } - /// - /// Override the request for the outgoing invocation for non-client-streaming invocations. - /// - public TRequest UnaryRequestOverride { get; set; } - /// - /// Delegate that intercepts a response from a non-server-streaming invocation and optionally overrides it. - /// - public Func OnUnaryResponse { get; set; } - /// - /// Delegate that intercepts each request message for a client-streaming invocation and optionally overrides each message. - /// - public Func OnRequestMessage { get; set; } - /// - /// Delegate that intercepts each response message for a server-streaming invocation and optionally overrides each message. - /// - public Func OnResponseMessage { get; set; } - /// - /// Callback that gets invoked when handler is finished executing. - /// - public Action OnHandlerEnd { get; set; } - /// - /// Callback that gets invoked when request stream is finished. - /// - public Action OnRequestStreamEnd { get; set; } - } - - /// - /// Intercepts an incoming service handler invocation on the server side. - /// Derived classes that intend to intercept incoming handlers on the server side should - /// override this and return the appropriate hooks in the form of a ServerCallHooks instance. - /// - /// The context of the incoming invocation. - /// True if the invocation is client-streaming. - /// True if the invocation is server-streaming. - /// The request message for client-unary invocations, null otherwise. - /// Request message type for the current invocation. - /// Response message type for the current invocation. - /// - /// The derived class should return an instance of ServerCallHooks to control the trajectory - /// as they see fit, or null if it does not intend to pursue the invocation any further. - /// - protected virtual Task> InterceptHandler(ServerCallContext context, bool clientStreaming, bool serverStreaming, TRequest request) - where TRequest : class - where TResponse : class - { - return Task.FromResult>(null); - } - - /// - /// Intercepts a blocking invocation of a simple remote call and dispatches the events accordingly. - /// - public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) - { - var hooks = InterceptCall(context, false, false, request)?.Freeze(); - context = hooks?.ContextOverride ?? context; - request = hooks?.UnaryRequestOverride ?? request; - var response = continuation(request, context); - if (hooks?.OnUnaryResponse != null) - { - response = hooks.OnUnaryResponse(response); - } - return response; - } - - /// - /// Intercepts an asynchronous invocation of a simple remote call and dispatches the events accordingly. - /// - public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) - { - var hooks = InterceptCall(context, false, false, request)?.Freeze(); - context = hooks?.ContextOverride ?? context; - request = hooks?.UnaryRequestOverride ?? request; - var response = continuation(request, context); - if (hooks?.OnUnaryResponse != null) - { - response = new AsyncUnaryCall(response.ResponseAsync.ContinueWith(unaryResponse => hooks.OnUnaryResponse(unaryResponse.Result)), - response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); - } - return response; - } - - /// - /// Intercepts an asynchronous invocation of a streaming remote call and dispatches the events accordingly. - /// - public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) - { - var hooks = InterceptCall(context, false, true, request)?.Freeze(); - context = hooks?.ContextOverride ?? context; - request = hooks?.UnaryRequestOverride ?? request; - var response = continuation(request, context); - if (hooks?.OnResponseMessage != null || hooks?.OnResponseStreamEnd != null) - { - response = new AsyncServerStreamingCall( - new WrappedAsyncStreamReader(response.ResponseStream, hooks.OnResponseMessage, hooks.OnResponseStreamEnd), - response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); - } - return response; - } - - /// - /// Intercepts an asynchronous invocation of a client streaming call and dispatches the events accordingly. - /// - public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) - { - var hooks = InterceptCall(context, true, false, null)?.Freeze(); - context = hooks?.ContextOverride ?? context; - var response = continuation(context); - if (hooks?.OnRequestMessage != null || hooks?.OnResponseStreamEnd != null || hooks?.OnUnaryResponse != null) - { - var requestStream = response.RequestStream; - if (hooks?.OnRequestMessage != null || hooks?.OnRequestStreamEnd != null) - { - requestStream = new WrappedClientStreamWriter(response.RequestStream, hooks.OnRequestMessage, hooks.OnRequestStreamEnd); - } - var responseAsync = response.ResponseAsync; - if (hooks?.OnUnaryResponse != null) - { - responseAsync = response.ResponseAsync.ContinueWith(unaryResponse => hooks.OnUnaryResponse(unaryResponse.Result)); - } - response = new AsyncClientStreamingCall(requestStream, responseAsync, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); - } - return response; - } - - /// - /// Intercepts an asynchronous invocation of a duplex streaming call and dispatches the events accordingly. - /// - public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) - { - var hooks = InterceptCall(context, true, true, null)?.Freeze(); - context = hooks?.ContextOverride ?? context; - var response = continuation(context); - if (hooks?.OnRequestMessage != null || hooks?.OnRequestStreamEnd != null || hooks?.OnResponseMessage != null || hooks?.OnResponseStreamEnd != null) - { - var requestStream = response.RequestStream; - if (hooks?.OnRequestMessage != null || hooks?.OnRequestStreamEnd != null) - { - requestStream = new WrappedClientStreamWriter(response.RequestStream, hooks.OnRequestMessage, hooks.OnRequestStreamEnd); - } - var responseStream = response.ResponseStream; - if (hooks?.OnResponseMessage != null || hooks?.OnResponseStreamEnd != null) - { - responseStream = new WrappedAsyncStreamReader(response.ResponseStream, hooks.OnResponseMessage, hooks.OnResponseStreamEnd); - } - response = new AsyncDuplexStreamingCall(requestStream, responseStream, response.ResponseHeadersAsync, response.GetStatus, response.GetTrailers, response.Dispose); - } - return response; - } - - /// - /// Server-side handler for intercepting unary calls. - /// - /// Request message type for this method. - /// Response message type for this method. - public override async Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) - { - var hooks = (await InterceptHandler(context, false, false, request))?.Freeze(); - request = hooks?.UnaryRequestOverride ?? request; - var response = await continuation(request, context); - if (hooks?.OnUnaryResponse != null) - { - response = hooks.OnUnaryResponse(response); - } - hooks?.OnHandlerEnd(); - return response; - } - - /// - /// Server-side handler for intercepting client streaming call. - /// - /// Request message type for this method. - /// Response message type for this method. - public override async Task ClientStreamingServerHandler(IAsyncStreamReader requestStream, ServerCallContext context, ClientStreamingServerMethod continuation) - { - var hooks = (await InterceptHandler(context, true, false, null))?.Freeze(); - if (hooks?.OnRequestMessage != null || hooks?.OnRequestStreamEnd != null) - { - requestStream = new WrappedAsyncStreamReader(requestStream, hooks.OnRequestMessage, hooks.OnRequestStreamEnd); - } - var response = await continuation(requestStream, context); - if (hooks?.OnUnaryResponse != null) - { - response = hooks.OnUnaryResponse(response); - } - hooks?.OnHandlerEnd(); - return response; - } - - /// - /// Server-side handler for intercepting server streaming calls. - /// - /// Request message type for this method. - /// Response message type for this method. - public override async Task ServerStreamingServerHandler(TRequest request, IServerStreamWriter responseStream, ServerCallContext context, ServerStreamingServerMethod continuation) - { - var hooks = (await InterceptHandler(context, false, true, request))?.Freeze(); - request = hooks?.UnaryRequestOverride ?? request; - if (hooks?.OnResponseMessage != null) - { - responseStream = new WrappedAsyncStreamWriter(responseStream, hooks.OnResponseMessage); - } - await continuation(request, responseStream, context); - hooks?.OnHandlerEnd(); - } - - /// - /// Server-side handler for intercepting bidi streaming calls. - /// - /// Request message type for this method. - /// Response message type for this method. - public override async Task DuplexStreamingServerHandler(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context, DuplexStreamingServerMethod continuation) - { - var hooks = (await InterceptHandler(context, true, true, null))?.Freeze(); - if (hooks?.OnRequestMessage != null || hooks?.OnRequestStreamEnd != null) - { - requestStream = new WrappedAsyncStreamReader(requestStream, hooks.OnRequestMessage, hooks.OnRequestStreamEnd); - } - if (hooks?.OnResponseMessage != null) - { - responseStream = new WrappedAsyncStreamWriter(responseStream, hooks.OnResponseMessage); - } - await continuation(requestStream, responseStream, context); - hooks?.OnHandlerEnd(); - } - - private class WrappedAsyncStreamReader : IAsyncStreamReader - { - readonly IAsyncStreamReader reader; - readonly Func onMessage; - readonly Action onStreamEnd; - public WrappedAsyncStreamReader(IAsyncStreamReader reader, Func onMessage, Action onStreamEnd) - { - this.reader = reader; - this.onMessage = onMessage; - this.onStreamEnd = onStreamEnd; - } - - public void Dispose() => ((IDisposable)reader).Dispose(); - - private T current; - public T Current - { - get - { - if (current == null) - { - throw new InvalidOperationException("No current element is available."); - } - return current; - } - } - - public async Task MoveNext(CancellationToken token) - { - if (await reader.MoveNext(token)) - { - var current = reader.Current; - if (onMessage != null) - { - var mappedValue = onMessage(current); - if (mappedValue != null) - { - current = mappedValue; - } - } - this.current = current; - return true; - } - onStreamEnd?.Invoke(); - return false; - } - } - - private class WrappedClientStreamWriter : IClientStreamWriter - { - readonly IClientStreamWriter writer; - readonly Func onMessage; - readonly Action onResponseStreamEnd; - public WrappedClientStreamWriter(IClientStreamWriter writer, Func onMessage, Action onResponseStreamEnd) - { - this.writer = writer; - this.onMessage = onMessage; - this.onResponseStreamEnd = onResponseStreamEnd; - } - public Task CompleteAsync() - { - if (onResponseStreamEnd != null) - { - return writer.CompleteAsync().ContinueWith(x => onResponseStreamEnd()); - } - return writer.CompleteAsync(); - } - public Task WriteAsync(T message) - { - if (onMessage != null) - { - message = onMessage(message); - } - return writer.WriteAsync(message); - } - public WriteOptions WriteOptions - { - get - { - return writer.WriteOptions; - } - set - { - writer.WriteOptions = value; - } - } - } - - private class WrappedAsyncStreamWriter : IServerStreamWriter - { - readonly IAsyncStreamWriter writer; - readonly Func onMessage; - public WrappedAsyncStreamWriter(IAsyncStreamWriter writer, Func onMessage) - { - this.writer = writer; - this.onMessage = onMessage; - } - public Task WriteAsync(T message) - { - if (onMessage != null) - { - message = onMessage(message); - } - return writer.WriteAsync(message); - } - public WriteOptions WriteOptions - { - get - { - return writer.WriteOptions; - } - set - { - writer.WriteOptions = value; - } - } - } - } -} -- cgit v1.2.3