diff options
author | David Garcia Quintas <dgq@google.com> | 2015-07-23 04:39:02 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-07-23 04:39:02 -0700 |
commit | 607dd2eeb7edb055c3cda8c2206d5cfc99ef8190 (patch) | |
tree | 93cac6e9efbaabe0287fcace16447822da2921ce /src | |
parent | 32ed2fd117622a9b2974537be595b8356cb49744 (diff) | |
parent | baa2aa644226b00ad9cb493660356f4473acd212 (diff) |
Merge branch 'compression-accept-encoding' into compression-interop
Diffstat (limited to 'src')
51 files changed, 1039 insertions, 201 deletions
diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc index 1910e9bd2d..64371047e0 100644 --- a/src/compiler/csharp_generator.cc +++ b/src/compiler/csharp_generator.cc @@ -149,7 +149,7 @@ std::string GetMethodRequestParamMaybe(const MethodDescriptor *method) { std::string GetMethodReturnTypeClient(const MethodDescriptor *method) { switch (GetMethodType(method)) { case METHODTYPE_NO_STREAMING: - return "Task<" + GetClassName(method->output_type()) + ">"; + return "AsyncUnaryCall<" + GetClassName(method->output_type()) + ">"; case METHODTYPE_CLIENT_STREAMING: return "AsyncClientStreamingCall<" + GetClassName(method->input_type()) + ", " + GetClassName(method->output_type()) + ">"; @@ -298,7 +298,7 @@ void GenerateServerInterface(Printer* out, const ServiceDescriptor *service) { out->Indent(); for (int i = 0; i < service->method_count(); i++) { const MethodDescriptor *method = service->method(i); - out->Print("$returntype$ $methodname$(ServerCallContext context, $request$$response_stream_maybe$);\n", + out->Print("$returntype$ $methodname$($request$$response_stream_maybe$, ServerCallContext context);\n", "methodname", method->name(), "returntype", GetMethodReturnTypeServer(method), "request", GetMethodRequestParamServer(method), "response_stream_maybe", diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 9b9e82caba..1bd20661d6 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -35,16 +35,19 @@ #include <string.h> #include <grpc/compression.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice_buffer.h> #include "src/core/channel/compress_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/compression/message_compress.h" +#include "src/core/support/string.h" typedef struct call_data { gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; + grpc_linked_mdelem accept_encoding_storage; int remaining_slice_bytes; /**< Input data to be read, as per BEGIN_MESSAGE */ int written_initial_metadata; /**< Already processed initial md? */ /** Compression algorithm we'll try to use. It may be given by incoming @@ -59,8 +62,12 @@ typedef struct channel_data { grpc_mdstr *mdstr_request_compression_algorithm_key; /** Metadata key for the outgoing (used) compression algorithm */ grpc_mdstr *mdstr_outgoing_compression_algorithm_key; + /** Metadata key for the accepted encodings */ + grpc_mdstr *mdstr_compression_capabilities_key; /** Precomputed metadata elements for all available compression algorithms */ grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT]; + /** Precomputed metadata elements for the accepted encodings */ + grpc_mdelem *mdelem_accept_encoding; /** The default, channel-level, compression algorithm */ grpc_compression_algorithm default_compression_algorithm; } channel_data; @@ -93,7 +100,7 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) { if (md->key == channeld->mdstr_request_compression_algorithm_key) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - if (!grpc_compression_algorithm_parse(md_c_str, + if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), &calld->compression_algorithm)) { gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.", md_c_str); @@ -202,10 +209,17 @@ static void process_send_ops(grpc_call_element *elem, channeld->default_compression_algorithm; calld->has_compression_algorithm = 1; /* GPR_TRUE */ } - grpc_metadata_batch_add_head( + /* hint compression algorithm */ + grpc_metadata_batch_add_tail( &(sop->data.metadata), &calld->compression_algorithm_storage, - grpc_mdelem_ref(channeld->mdelem_compression_algorithms + GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms [calld->compression_algorithm])); + + /* convey supported compression algorithms */ + grpc_metadata_batch_add_head( + &(sop->data.metadata), &calld->accept_encoding_storage, + GRPC_MDELEM_REF(channeld->mdelem_accept_encoding)); + calld->written_initial_metadata = 1; /* GPR_TRUE */ } break; @@ -279,6 +293,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, int is_first, int is_last) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; + const char* supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT-1]; + char *accept_encoding_str; + size_t accept_encoding_str_len; channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm(args); @@ -289,16 +306,37 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, channeld->mdstr_outgoing_compression_algorithm_key = grpc_mdstr_from_string(mdctx, "grpc-encoding"); + channeld->mdstr_compression_capabilities_key = + grpc_mdstr_from_string(mdctx, "grpc-accept-encoding"); + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { char *algorith_name; GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0); channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings( mdctx, - grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key), + GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string(mdctx, algorith_name)); + if (algo_idx > 0) { + supported_algorithms_names[algo_idx-1] = algorith_name; + } } + /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated + * arrays, as to avoid the heap allocs */ + accept_encoding_str = + gpr_strjoin_sep(supported_algorithms_names, + GPR_ARRAY_SIZE(supported_algorithms_names), + ", ", + &accept_encoding_str_len); + + channeld->mdelem_accept_encoding = + grpc_mdelem_from_metadata_strings( + mdctx, + GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key), + grpc_mdstr_from_string(mdctx, accept_encoding_str)); + gpr_free(accept_encoding_str); + GPR_ASSERT(!is_last); } @@ -307,12 +345,14 @@ static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; - grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key); - grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key); + GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key); + GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key); + GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key); for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]); + GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]); } + GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding); } const grpc_channel_filter grpc_compress_filter = { diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index bc821b16fa..6e93103a6a 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -98,6 +98,18 @@ static void hc_on_recv(void *user_data, int success) { calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); } +static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + channel_data *channeld = elem->channel_data; + /* eat the things we'd like to set ourselves */ + if (md->key == channeld->method->key) return NULL; + if (md->key == channeld->scheme->key) return NULL; + if (md->key == channeld->te_trailers->key) return NULL; + if (md->key == channeld->content_type->key) return NULL; + if (md->key == channeld->user_agent->key) return NULL; + return md; +} + static void hc_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { /* grab pointers to our data from the call element */ @@ -111,6 +123,7 @@ static void hc_mutate_op(grpc_call_element *elem, grpc_stream_op *op = &ops[i]; if (op->type != GRPC_OP_METADATA) continue; calld->sent_initial_metadata = 1; + grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem); /* Send : prefixed headers, which have to be before any application layer headers. */ grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index e426241d0a..dbf4721d13 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -35,13 +35,20 @@ #include <string.h> #include <grpc/compression.h> -int grpc_compression_algorithm_parse(const char* name, +int grpc_compression_algorithm_parse(const char* name, size_t name_length, grpc_compression_algorithm *algorithm) { - if (strcmp(name, "none") == 0) { + /* we use strncmp not only because it's safer (even though in this case it + * doesn't matter, given that we are comparing against string literals, but + * because this way we needn't have "name" nil-terminated (useful for slice + * data, for example) */ + if (name_length == 0) { + return 0; + } + if (strncmp(name, "none", name_length) == 0) { *algorithm = GRPC_COMPRESS_NONE; - } else if (strcmp(name, "gzip") == 0) { + } else if (strncmp(name, "gzip", name_length) == 0) { *algorithm = GRPC_COMPRESS_GZIP; - } else if (strcmp(name, "deflate") == 0) { + } else if (strncmp(name, "deflate", name_length) == 0) { *algorithm = GRPC_COMPRESS_DEFLATE; } else { return 0; diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 7945186e44..9d02d365b5 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -39,6 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <grpc/support/useful.h> #include "src/core/census/grpc_context.h" #include "src/core/channel/channel_stack.h" @@ -239,6 +240,9 @@ struct grpc_call { /* Compression algorithm for the call */ grpc_compression_algorithm compression_algorithm; + /* Supported encodings (compression algorithms), a bitset */ + gpr_uint32 encodings_accepted_by_peer; + /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -475,9 +479,36 @@ static void set_compression_algorithm(grpc_call *call, call->compression_algorithm = algo; } -grpc_compression_algorithm grpc_call_get_compression_algorithm( - const grpc_call *call) { - return call->compression_algorithm; +static void set_encodings_accepted_by_peer(grpc_call *call, + const gpr_slice accept_encoding_slice) { + size_t i; + grpc_compression_algorithm algorithm; + gpr_slice_buffer accept_encoding_parts; + + gpr_slice_buffer_init(&accept_encoding_parts); + gpr_slice_split(accept_encoding_slice, ", ", &accept_encoding_parts); + + /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already + * zeroes the whole grpc_call */ + /* Always support no compression */ + GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); + for (i = 0; i < accept_encoding_parts.count; i++) { + const gpr_slice* slice = &accept_encoding_parts.slices[i]; + if (grpc_compression_algorithm_parse( + (const char *)GPR_SLICE_START_PTR(*slice), GPR_SLICE_LENGTH(*slice), + &algorithm)) { + GPR_BITSET(&call->encodings_accepted_by_peer, algorithm); + } else { + /* TODO(dgq): it'd be nice to have a slice-to-cstr function to easily + * print the offending entry */ + gpr_log(GPR_ERROR, + "Invalid entry in accept encoding metadata. Ignoring."); + } + } +} + +gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call) { + return call->encodings_accepted_by_peer; } gpr_uint32 grpc_call_get_message_flags(const grpc_call *call) { @@ -1318,10 +1349,12 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) { grpc_compression_algorithm algorithm; void *user_data = grpc_mdelem_get_user_data(md, destroy_compression); if (user_data) { - algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; + algorithm = + ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; } else { const char *md_c_str = grpc_mdstr_as_c_string(md->value); - if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) { + if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), + &algorithm)) { gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); assert(0); } @@ -1349,6 +1382,9 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { } else if (key == grpc_channel_get_compression_algorithm_string(call->channel)) { set_compression_algorithm(call, decode_compression(md)); + } else if (key == grpc_channel_get_encodings_accepted_by_peer_string( + call->channel)) { + set_encodings_accepted_by_peer(call, md->value->slice); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { diff --git a/src/core/surface/call.h b/src/core/surface/call.h index d7202f4f1d..5641561f85 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -169,6 +169,12 @@ grpc_compression_algorithm grpc_call_get_compression_algorithm( gpr_uint32 grpc_call_get_message_flags(const grpc_call *call); +/** Returns a bitset for the encodings (compression algorithms) supported by \a + * call's peer. + * + * To be indexed by grpc_compression_algorithm enum values. */ +gpr_uint32 grpc_call_get_encodings_accepted_by_peer(grpc_call *call); + #ifdef __cplusplus } #endif diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index a6438ff512..cd71e03b19 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -64,6 +64,7 @@ struct grpc_channel { /** mdstr for the grpc-status key */ grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_compression_algorithm_string; + grpc_mdstr *grpc_encodings_accepted_by_peer_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; @@ -100,6 +101,8 @@ grpc_channel *grpc_channel_create_from_filters( channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_compression_algorithm_string = grpc_mdstr_from_string(mdctx, "grpc-encoding"); + channel->grpc_encodings_accepted_by_peer_string = + grpc_mdstr_from_string(mdctx, "grpc-accept-encoding"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { char buf[GPR_LTOA_MIN_BUFSIZE]; @@ -210,6 +213,7 @@ static void destroy_channel(void *p, int ok) { } GRPC_MDSTR_UNREF(channel->grpc_status_string); GRPC_MDSTR_UNREF(channel->grpc_compression_algorithm_string); + GRPC_MDSTR_UNREF(channel->grpc_encodings_accepted_by_peer_string); GRPC_MDSTR_UNREF(channel->grpc_message_string); GRPC_MDSTR_UNREF(channel->path_string); GRPC_MDSTR_UNREF(channel->authority_string); @@ -267,6 +271,11 @@ grpc_mdstr *grpc_channel_get_compression_algorithm_string( return channel->grpc_compression_algorithm_string; } +grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( + grpc_channel *channel) { + return channel->grpc_encodings_accepted_by_peer_string; +} + grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { return GRPC_MDELEM_REF(channel->grpc_status_elem[i]); diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 4e03eb4411..1d1550bbe7 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -56,6 +56,8 @@ grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_compression_algorithm_string( grpc_channel *channel); +grpc_mdstr *grpc_channel_get_encodings_accepted_by_peer_string( + grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index e797dd82f2..8ba2c8a9a2 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -33,6 +33,7 @@ using System; using System.Diagnostics; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Grpc.Core; @@ -99,17 +100,17 @@ namespace Grpc.Core.Tests [Test] public void UnaryCall() { - var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); - Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", CancellationToken.None)); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(internalCall, "ABC", CancellationToken.None)); } [Test] public void UnaryCall_ServerHandlerThrows() { - var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); try { - Calls.BlockingUnaryCall(call, "THROW", CancellationToken.None); + Calls.BlockingUnaryCall(internalCall, "THROW", CancellationToken.None); Assert.Fail(); } catch (RpcException e) @@ -119,10 +120,40 @@ namespace Grpc.Core.Tests } [Test] + public void UnaryCall_ServerHandlerThrowsRpcException() + { + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + try + { + Calls.BlockingUnaryCall(internalCall, "THROW_UNAUTHENTICATED", CancellationToken.None); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode); + } + } + + [Test] + public void UnaryCall_ServerHandlerSetsStatus() + { + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + try + { + Calls.BlockingUnaryCall(internalCall, "SET_UNAUTHENTICATED", CancellationToken.None); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode); + } + } + + [Test] public void AsyncUnaryCall() { - var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); - var result = Calls.AsyncUnaryCall(call, "ABC", CancellationToken.None).Result; + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + var result = Calls.AsyncUnaryCall(internalCall, "ABC", CancellationToken.None).ResponseAsync.Result; Assert.AreEqual("ABC", result); } @@ -131,10 +162,10 @@ namespace Grpc.Core.Tests { Task.Run(async () => { - var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); try { - await Calls.AsyncUnaryCall(call, "THROW", CancellationToken.None); + await Calls.AsyncUnaryCall(internalCall, "THROW", CancellationToken.None); Assert.Fail(); } catch (RpcException e) @@ -149,11 +180,11 @@ namespace Grpc.Core.Tests { Task.Run(async () => { - var call = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty); - var callResult = Calls.AsyncClientStreamingCall(call, CancellationToken.None); + var internalCall = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty); + var call = Calls.AsyncClientStreamingCall(internalCall, CancellationToken.None); - await callResult.RequestStream.WriteAll(new string[] { "A", "B", "C" }); - Assert.AreEqual("ABC", await callResult.Result); + await call.RequestStream.WriteAll(new string[] { "A", "B", "C" }); + Assert.AreEqual("ABC", await call.ResponseAsync); }).Wait(); } @@ -162,10 +193,10 @@ namespace Grpc.Core.Tests { Task.Run(async () => { - var call = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty); var cts = new CancellationTokenSource(); - var callResult = Calls.AsyncClientStreamingCall(call, cts.Token); + var call = Calls.AsyncClientStreamingCall(internalCall, cts.Token); // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. await Task.Delay(1000); @@ -173,7 +204,7 @@ namespace Grpc.Core.Tests try { - await callResult.Result; + await call.ResponseAsync; } catch (RpcException e) { @@ -183,29 +214,53 @@ namespace Grpc.Core.Tests } [Test] + public void AsyncUnaryCall_EchoMetadata() + { + var headers = new Metadata + { + new Metadata.Entry("asciiHeader", "abcdefg"), + new Metadata.Entry("binaryHeader-bin", new byte[] { 1, 2, 3, 0, 0xff }), + }; + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, headers); + var call = Calls.AsyncUnaryCall(internalCall, "ABC", CancellationToken.None); + + Assert.AreEqual("ABC", call.ResponseAsync.Result); + + Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode); + + var trailers = call.GetTrailers(); + Assert.AreEqual(2, trailers.Count); + Assert.AreEqual(headers[0].Key, trailers[0].Key); + Assert.AreEqual(headers[0].Value, trailers[0].Value); + + Assert.AreEqual(headers[1].Key, trailers[1].Key); + CollectionAssert.AreEqual(headers[1].ValueBytes, trailers[1].ValueBytes); + } + + [Test] public void UnaryCall_DisposedChannel() { channel.Dispose(); - var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); - Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(call, "ABC", CancellationToken.None)); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(internalCall, "ABC", CancellationToken.None)); } [Test] public void UnaryCallPerformance() { - var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); BenchmarkUtil.RunBenchmark(100, 100, - () => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); }); + () => { Calls.BlockingUnaryCall(internalCall, "ABC", default(CancellationToken)); }); } [Test] public void UnknownMethodHandler() { - var call = new Call<string, string>(ServiceName, NonexistentMethod, channel, Metadata.Empty); + var internalCall = new Call<string, string>(ServiceName, NonexistentMethod, channel, Metadata.Empty); try { - Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); + Calls.BlockingUnaryCall(internalCall, "ABC", default(CancellationToken)); Assert.Fail(); } catch (RpcException e) @@ -214,16 +269,48 @@ namespace Grpc.Core.Tests } } - private static async Task<string> EchoHandler(ServerCallContext context, string request) + [Test] + public void UserAgentStringPresent() { + var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty); + string userAgent = Calls.BlockingUnaryCall(internalCall, "RETURN-USER-AGENT", CancellationToken.None); + Assert.IsTrue(userAgent.StartsWith("grpc-csharp/")); + } + + private static async Task<string> EchoHandler(string request, ServerCallContext context) + { + foreach (Metadata.Entry metadataEntry in context.RequestHeaders) + { + if (metadataEntry.Key != "user-agent") + { + context.ResponseTrailers.Add(metadataEntry); + } + } + + if (request == "RETURN-USER-AGENT") + { + return context.RequestHeaders.Where(entry => entry.Key == "user-agent").Single().Value; + } + if (request == "THROW") { throw new Exception("This was thrown on purpose by a test"); } + + if (request == "THROW_UNAUTHENTICATED") + { + throw new RpcException(new Status(StatusCode.Unauthenticated, "")); + } + + if (request == "SET_UNAUTHENTICATED") + { + context.Status = new Status(StatusCode.Unauthenticated, ""); + } + return request; } - private static async Task<string> ConcatAndEchoHandler(ServerCallContext context, IAsyncStreamReader<string> requestStream) + private static async Task<string> ConcatAndEchoHandler(IAsyncStreamReader<string> requestStream, ServerCallContext context) { string result = ""; await requestStream.ForEach(async (request) => diff --git a/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs index e03e20c4f7..46469113c5 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs @@ -59,5 +59,26 @@ namespace Grpc.Core.Internal.Tests var nativeMetadata = MetadataArraySafeHandle.Create(metadata); nativeMetadata.Dispose(); } + + [Test] + public void ReadMetadataFromPtrUnsafe() + { + var metadata = new Metadata + { + new Metadata.Entry("host", "somehost"), + new Metadata.Entry("header2", "header value"), + }; + var nativeMetadata = MetadataArraySafeHandle.Create(metadata); + + var copy = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(nativeMetadata.Handle); + Assert.AreEqual(2, copy.Count); + + Assert.AreEqual("host", copy[0].Key); + Assert.AreEqual("somehost", copy[0].Value); + Assert.AreEqual("header2", copy[1].Key); + Assert.AreEqual("header value", copy[1].Value); + + nativeMetadata.Dispose(); + } } } diff --git a/src/csharp/Grpc.Core.Tests/TimespecTest.cs b/src/csharp/Grpc.Core.Tests/TimespecTest.cs index 5831121add..a34b407a01 100644 --- a/src/csharp/Grpc.Core.Tests/TimespecTest.cs +++ b/src/csharp/Grpc.Core.Tests/TimespecTest.cs @@ -59,6 +59,19 @@ namespace Grpc.Core.Internal.Tests } [Test] + public void ToDateTime() + { + Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc), + new Timespec(IntPtr.Zero, 0).ToDateTime()); + + Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 10, DateTimeKind.Utc).AddTicks(50), + new Timespec(new IntPtr(10), 5000).ToDateTime()); + + Assert.AreEqual(new DateTime(2015, 7, 21, 4, 21, 48, DateTimeKind.Utc), + new Timespec(new IntPtr(1437452508), 0).ToDateTime()); + } + + [Test] public void Add() { var t = new Timespec { tv_sec = new IntPtr(12345), tv_nsec = 123456789 }; diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index d66b0d4974..bf020cd627 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -43,24 +43,28 @@ namespace Grpc.Core public sealed class AsyncClientStreamingCall<TRequest, TResponse> : IDisposable { readonly IClientStreamWriter<TRequest> requestStream; - readonly Task<TResponse> result; + readonly Task<TResponse> responseAsync; + readonly Func<Status> getStatusFunc; + readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction) + public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; - this.result = result; + this.responseAsync = responseAsync; + this.getStatusFunc = getStatusFunc; + this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; } /// <summary> /// Asynchronous call result. /// </summary> - public Task<TResponse> Result + public Task<TResponse> ResponseAsync { get { - return this.result; + return this.responseAsync; } } @@ -81,11 +85,11 @@ namespace Grpc.Core /// <returns></returns> public TaskAwaiter<TResponse> GetAwaiter() { - return result.GetAwaiter(); + return responseAsync.GetAwaiter(); } /// <summary> - /// Provides means to provide after the call. + /// Provides means to cleanup after the call. /// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything. /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. /// As a result, all resources being used by the call should be released eventually. diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index 4c0d5936ac..0979de606f 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -44,12 +44,16 @@ namespace Grpc.Core { readonly IClientStreamWriter<TRequest> requestStream; readonly IAsyncStreamReader<TResponse> responseStream; + readonly Func<Status> getStatusFunc; + readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction) + public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; + this.getStatusFunc = getStatusFunc; + this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; } @@ -76,6 +80,24 @@ namespace Grpc.Core } /// <summary> + /// Gets the call status if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Status GetStatus() + { + return getStatusFunc(); + } + + /// <summary> + /// Gets the call trailing metadata if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Metadata GetTrailers() + { + return getTrailersFunc(); + } + + /// <summary> /// Provides means to cleanup after the call. /// If the call has already finished normally (request stream has been completed and response stream has been fully read), doesn't do anything. /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index 7a479b9a23..380efcdb0e 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -43,11 +43,15 @@ namespace Grpc.Core public sealed class AsyncServerStreamingCall<TResponse> : IDisposable { readonly IAsyncStreamReader<TResponse> responseStream; + readonly Func<Status> getStatusFunc; + readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction) + public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.responseStream = responseStream; + this.getStatusFunc = getStatusFunc; + this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; } @@ -63,6 +67,24 @@ namespace Grpc.Core } /// <summary> + /// Gets the call status if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Status GetStatus() + { + return getStatusFunc(); + } + + /// <summary> + /// Gets the call trailing metadata if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Metadata GetTrailers() + { + return getTrailersFunc(); + } + + /// <summary> /// Provides means to cleanup after the call. /// If the call has already finished normally (response stream has been fully read), doesn't do anything. /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs new file mode 100644 index 0000000000..224e343916 --- /dev/null +++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs @@ -0,0 +1,106 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; + +namespace Grpc.Core +{ + /// <summary> + /// Return type for single request - single response call. + /// </summary> + public sealed class AsyncUnaryCall<TResponse> : IDisposable + { + readonly Task<TResponse> responseAsync; + readonly Func<Status> getStatusFunc; + readonly Func<Metadata> getTrailersFunc; + readonly Action disposeAction; + + public AsyncUnaryCall(Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + { + this.responseAsync = responseAsync; + this.getStatusFunc = getStatusFunc; + this.getTrailersFunc = getTrailersFunc; + this.disposeAction = disposeAction; + } + + /// <summary> + /// Asynchronous call result. + /// </summary> + public Task<TResponse> ResponseAsync + { + get + { + return this.responseAsync; + } + } + + /// <summary> + /// Allows awaiting this object directly. + /// </summary> + public TaskAwaiter<TResponse> GetAwaiter() + { + return responseAsync.GetAwaiter(); + } + + /// <summary> + /// Gets the call status if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Status GetStatus() + { + return getStatusFunc(); + } + + /// <summary> + /// Gets the call trailing metadata if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Metadata GetTrailers() + { + return getTrailersFunc(); + } + + /// <summary> + /// Provides means to cleanup after the call. + /// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } + } +} diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index 9e95182c72..359fe53741 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -53,7 +53,7 @@ namespace Grpc.Core return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers); } - public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) + public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) where TRequest : class where TResponse : class { @@ -61,7 +61,7 @@ namespace Grpc.Core asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name); var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers); RegisterCancellationCallback(asyncCall, token); - return await asyncResult; + return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) @@ -73,7 +73,7 @@ namespace Grpc.Core asyncCall.StartServerStreamingCall(req, call.Headers); RegisterCancellationCallback(asyncCall, token); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel); + return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) @@ -85,7 +85,7 @@ namespace Grpc.Core var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers); RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); - return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel); + return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) @@ -98,7 +98,7 @@ namespace Grpc.Core RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel); + return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token) diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 5baf260003..e5c6abd2cb 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -28,11 +28,14 @@ // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endregion + using System; using System.Collections.Generic; +using System.Linq; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; + using Grpc.Core.Internal; namespace Grpc.Core @@ -44,6 +47,7 @@ namespace Grpc.Core { readonly GrpcEnvironment environment; readonly ChannelSafeHandle handle; + readonly List<ChannelOption> options; readonly string target; bool disposed; @@ -57,7 +61,10 @@ namespace Grpc.Core public Channel(string host, Credentials credentials = null, IEnumerable<ChannelOption> options = null) { this.environment = GrpcEnvironment.GetInstance(); - using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(options)) + this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); + + EnsureUserAgentChannelOption(this.options); + using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options)) { if (credentials != null) { @@ -71,7 +78,7 @@ namespace Grpc.Core this.handle = ChannelSafeHandle.Create(host, nativeChannelArgs); } } - this.target = GetOverridenTarget(host, options); + this.target = GetOverridenTarget(host, this.options); } /// <summary> @@ -141,6 +148,20 @@ namespace Grpc.Core } } + private static void EnsureUserAgentChannelOption(List<ChannelOption> options) + { + if (!options.Any((option) => option.Name == ChannelOptions.PrimaryUserAgentString)) + { + options.Add(new ChannelOption(ChannelOptions.PrimaryUserAgentString, GetUserAgentString())); + } + } + + private static string GetUserAgentString() + { + // TODO(jtattermusch): it would be useful to also provide .NET/mono version. + return string.Format("grpc-csharp/{0}", VersionInfo.CurrentVersion); + } + /// <summary> /// Look for SslTargetNameOverride option and return its value instead of originalTarget /// if found. diff --git a/src/csharp/Grpc.Core/ChannelOptions.cs b/src/csharp/Grpc.Core/ChannelOptions.cs index bc23bb59b1..9fe03d2805 100644 --- a/src/csharp/Grpc.Core/ChannelOptions.cs +++ b/src/csharp/Grpc.Core/ChannelOptions.cs @@ -115,41 +115,49 @@ namespace Grpc.Core } } + /// <summary> + /// Defines names of supported channel options. + /// </summary> public static class ChannelOptions { - // Override SSL target check. Only to be used for testing. + /// <summary>Override SSL target check. Only to be used for testing.</summary> public const string SslTargetNameOverride = "grpc.ssl_target_name_override"; - // Enable census for tracing and stats collection + /// <summary>Enable census for tracing and stats collection</summary> public const string Census = "grpc.census"; - // Maximum number of concurrent incoming streams to allow on a http2 connection + /// <summary>Maximum number of concurrent incoming streams to allow on a http2 connection</summary> public const string MaxConcurrentStreams = "grpc.max_concurrent_streams"; - // Maximum message length that the channel can receive + /// <summary>Maximum message length that the channel can receive</summary> public const string MaxMessageLength = "grpc.max_message_length"; - // Initial sequence number for http2 transports + /// <summary>Initial sequence number for http2 transports</summary> public const string Http2InitialSequenceNumber = "grpc.http2.initial_sequence_number"; + /// <summary>Primary user agent: goes at the start of the user-agent metadata</summary> + public const string PrimaryUserAgentString = "grpc.primary_user_agent"; + + /// <summary> Secondary user agent: goes at the end of the user-agent metadata</summary> + public const string SecondaryUserAgentString = "grpc.secondary_user_agent"; + /// <summary> /// Creates native object for a collection of channel options. /// </summary> /// <returns>The native channel arguments.</returns> - internal static ChannelArgsSafeHandle CreateChannelArgs(IEnumerable<ChannelOption> options) + internal static ChannelArgsSafeHandle CreateChannelArgs(List<ChannelOption> options) { - if (options == null) + if (options == null || options.Count == 0) { return ChannelArgsSafeHandle.CreateNull(); } - var optionList = new List<ChannelOption>(options); // It's better to do defensive copy ChannelArgsSafeHandle nativeArgs = null; try { - nativeArgs = ChannelArgsSafeHandle.Create(optionList.Count); - for (int i = 0; i < optionList.Count; i++) + nativeArgs = ChannelArgsSafeHandle.Create(options.Count); + for (int i = 0; i < options.Count; i++) { - var option = optionList[i]; + var option = options[i]; if (option.Type == ChannelOption.OptionType.Integer) { nativeArgs.SetInteger(i, option.Name, option.IntValue); diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index a227fe5477..fd68b91851 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -33,13 +33,12 @@ </PropertyGroup> <ItemGroup> <Reference Include="System" /> - <Reference Include="System.Collections.Immutable, Version=1.1.36.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL"> - <SpecificVersion>False</SpecificVersion> - <HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> - </Reference> <Reference Include="System.Interactive.Async"> <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> </Reference> + <Reference Include="System.Collections.Immutable"> + <HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="AsyncDuplexStreamingCall.cs" /> @@ -102,6 +101,8 @@ <Compile Include="Internal\CompletionRegistry.cs" /> <Compile Include="Internal\BatchContextSafeHandle.cs" /> <Compile Include="ChannelOptions.cs" /> + <Compile Include="AsyncUnaryCall.cs" /> + <Compile Include="VersionInfo.cs" /> </ItemGroup> <ItemGroup> <None Include="Grpc.Core.nuspec" /> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 24b75d1668..f983dbb759 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -52,8 +52,8 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; - // Set after status is received. Only used for streaming response calls. - Status? finishedStatus; + // Set after status is received. Used for both unary and streaming response calls. + ClientSideStatus? finishedStatus; bool readObserverCompleted; // True if readObserver has already been completed. @@ -249,6 +249,32 @@ namespace Grpc.Core.Internal } /// <summary> + /// Gets the resulting status if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Status GetStatus() + { + lock (myLock) + { + Preconditions.CheckState(finishedStatus.HasValue, "Status can only be accessed once the call has finished."); + return finishedStatus.Value.Status; + } + } + + /// <summary> + /// Gets the trailing metadata if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Metadata GetTrailers() + { + lock (myLock) + { + Preconditions.CheckState(finishedStatus.HasValue, "Trailers can only be accessed once the call has finished."); + return finishedStatus.Value.Trailers; + } + } + + /// <summary> /// On client-side, we only fire readCompletionDelegate once all messages have been read /// and status has been received. /// </summary> @@ -265,7 +291,7 @@ namespace Grpc.Core.Internal if (shouldComplete) { - var status = finishedStatus.Value; + var status = finishedStatus.Value.Status; if (status.StatusCode != StatusCode.OK) { FireCompletion(completionDelegate, default(TResponse), new RpcException(status)); @@ -288,9 +314,13 @@ namespace Grpc.Core.Internal /// </summary> private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx) { + var fullStatus = ctx.GetReceivedStatusOnClient(); + lock (myLock) { finished = true; + finishedStatus = fullStatus; + halfclosed = true; ReleaseResourcesIfPossible(); @@ -302,7 +332,8 @@ namespace Grpc.Core.Internal return; } - var status = ctx.GetReceivedStatus(); + var status = fullStatus.Status; + if (status.StatusCode != StatusCode.OK) { unaryResponseTcs.SetException(new RpcException(status)); @@ -321,13 +352,13 @@ namespace Grpc.Core.Internal /// </summary> private void HandleFinished(bool success, BatchContextSafeHandle ctx) { - var status = ctx.GetReceivedStatus(); + var fullStatus = ctx.GetReceivedStatusOnClient(); AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null; lock (myLock) { finished = true; - finishedStatus = status; + finishedStatus = fullStatus; origReadCompletionDelegate = readCompletionDelegate; diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 309067ea9d..f809f4a84c 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -101,14 +101,17 @@ namespace Grpc.Core.Internal /// Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate<object> completionDelegate) + public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate) { lock (myLock) { Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); - call.StartSendStatusFromServer(status, HandleHalfclosed); + using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) + { + call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray); + } halfcloseRequested = true; readingDone = true; sendCompletionDelegate = completionDelegate; diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 861cbbe4c6..6a2add54db 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -38,7 +38,6 @@ using Grpc.Core; namespace Grpc.Core.Internal { /// <summary> - /// Not owned version of /// grpcsharp_batch_context /// </summary> internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid @@ -47,6 +46,9 @@ namespace Grpc.Core.Internal static extern BatchContextSafeHandle grpcsharp_batch_context_create(); [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_initial_metadata(BatchContextSafeHandle ctx); + + [DllImport("grpc_csharp_ext.dll")] static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] @@ -59,12 +61,24 @@ namespace Grpc.Core.Internal static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx); // returns const char* [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata(BatchContextSafeHandle ctx); + + [DllImport("grpc_csharp_ext.dll")] static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandle ctx); // returns const char* [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_server_rpc_new_host(BatchContextSafeHandle ctx); // returns const char* + + [DllImport("grpc_csharp_ext.dll")] + static extern Timespec grpcsharp_batch_context_server_rpc_new_deadline(BatchContextSafeHandle ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_server_rpc_new_request_metadata(BatchContextSafeHandle ctx); + + [DllImport("grpc_csharp_ext.dll")] static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] @@ -87,13 +101,26 @@ namespace Grpc.Core.Internal } } - public Status GetReceivedStatus() + // Gets data of recv_initial_metadata completion. + public Metadata GetReceivedInitialMetadata() + { + IntPtr metadataArrayPtr = grpcsharp_batch_context_recv_initial_metadata(this); + return MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); + } + + // Gets data of recv_status_on_client completion. + public ClientSideStatus GetReceivedStatusOnClient() { - // TODO: can the native method return string directly? string details = Marshal.PtrToStringAnsi(grpcsharp_batch_context_recv_status_on_client_details(this)); - return new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details); + var status = new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details); + + IntPtr metadataArrayPtr = grpcsharp_batch_context_recv_status_on_client_trailing_metadata(this); + var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); + + return new ClientSideStatus(status, metadata); } + // Gets data of recv_message completion. public byte[] GetReceivedMessage() { IntPtr len = grpcsharp_batch_context_recv_message_length(this); @@ -106,16 +133,22 @@ namespace Grpc.Core.Internal return data; } - public CallSafeHandle GetServerRpcNewCall() + // Gets data of server_rpc_new completion. + public ServerRpcNew GetServerRpcNew() { - return grpcsharp_batch_context_server_rpc_new_call(this); - } + var call = grpcsharp_batch_context_server_rpc_new_call(this); - public string GetServerRpcNewMethod() - { - return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this)); + var method = Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this)); + var host = Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_host(this)); + var deadline = grpcsharp_batch_context_server_rpc_new_deadline(this); + + IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this); + var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); + + return new ServerRpcNew(call, method, host, deadline, metadata); } + // Gets data of receive_close_on_server completion. public bool GetReceivedCloseOnServerCancelled() { return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0; @@ -127,4 +160,97 @@ namespace Grpc.Core.Internal return true; } } + + /// <summary> + /// Status + metadata received on client side when call finishes. + /// (when receive_status_on_client operation finishes). + /// </summary> + internal struct ClientSideStatus + { + readonly Status status; + readonly Metadata trailers; + + public ClientSideStatus(Status status, Metadata trailers) + { + this.status = status; + this.trailers = trailers; + } + + public Status Status + { + get + { + return this.status; + } + } + + public Metadata Trailers + { + get + { + return this.trailers; + } + } + } + + /// <summary> + /// Details of a newly received RPC. + /// </summary> + internal struct ServerRpcNew + { + readonly CallSafeHandle call; + readonly string method; + readonly string host; + readonly Timespec deadline; + readonly Metadata requestMetadata; + + public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) + { + this.call = call; + this.method = method; + this.host = host; + this.deadline = deadline; + this.requestMetadata = requestMetadata; + } + + public CallSafeHandle Call + { + get + { + return this.call; + } + } + + public string Method + { + get + { + return this.method; + } + } + + public string Host + { + get + { + return this.host; + } + } + + public Timespec Deadline + { + get + { + return this.deadline; + } + } + + public Metadata RequestMetadata + { + get + { + return this.requestMetadata; + } + } + } }
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 3b246ac01b..19dbb83f24 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -81,7 +81,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, @@ -159,11 +159,11 @@ namespace Grpc.Core.Internal grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } - public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback) + public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk(); + grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk(); } public void StartReceiveMessage(BatchCompletionDelegate callback) diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs index 80aa7f5603..427c16fac6 100644 --- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs @@ -46,12 +46,24 @@ namespace Grpc.Core.Internal static extern void grpcsharp_metadata_array_add(MetadataArraySafeHandle array, string key, byte[] value, UIntPtr valueLength); [DllImport("grpc_csharp_ext.dll")] + static extern UIntPtr grpcsharp_metadata_array_count(IntPtr metadataArray); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_metadata_array_get_key(IntPtr metadataArray, UIntPtr index); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_metadata_array_get_value(IntPtr metadataArray, UIntPtr index); + + [DllImport("grpc_csharp_ext.dll")] + static extern UIntPtr grpcsharp_metadata_array_get_value_length(IntPtr metadataArray, UIntPtr index); + + [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_metadata_array_destroy_full(IntPtr array); private MetadataArraySafeHandle() { } - + public static MetadataArraySafeHandle Create(Metadata metadata) { // TODO(jtattermusch): we might wanna check that the metadata is readonly @@ -63,6 +75,38 @@ namespace Grpc.Core.Internal return metadataArray; } + /// <summary> + /// Reads metadata from pointer to grpc_metadata_array + /// </summary> + public static Metadata ReadMetadataFromPtrUnsafe(IntPtr metadataArray) + { + if (metadataArray == IntPtr.Zero) + { + return null; + } + + ulong count = grpcsharp_metadata_array_count(metadataArray).ToUInt64(); + + var metadata = new Metadata(); + for (ulong i = 0; i < count; i++) + { + var index = new UIntPtr(i); + string key = Marshal.PtrToStringAnsi(grpcsharp_metadata_array_get_key(metadataArray, index)); + var bytes = new byte[grpcsharp_metadata_array_get_value_length(metadataArray, index).ToUInt64()]; + Marshal.Copy(grpcsharp_metadata_array_get_value(metadataArray, index), bytes, 0, bytes.Length); + metadata.Add(new Metadata.Entry(key, bytes)); + } + return metadata; + } + + internal IntPtr Handle + { + get + { + return handle; + } + } + protected override bool ReleaseHandle() { grpcsharp_metadata_array_destroy_full(handle); diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 594e46b159..3680b1e791 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -34,6 +34,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Utils; @@ -42,7 +43,7 @@ namespace Grpc.Core.Internal { internal interface IServerCallHandler { - Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment); + Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment); } internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler @@ -58,27 +59,28 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, environment); - asyncCall.Initialize(call); + asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); - Status status = Status.DefaultSuccess; + Status status; + var context = HandlerUtils.NewContext(newRpc); try { Preconditions.CheckArgument(await requestStream.MoveNext()); var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. Preconditions.CheckArgument(!await requestStream.MoveNext()); - var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context - var result = await handler(context, request); + var result = await handler(request, context); + status = context.Status; await responseStream.WriteAsync(result); } catch (Exception e) @@ -88,7 +90,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status); + await responseStream.WriteStatusAsync(status, context.ResponseTrailers); } catch (OperationCanceledException) { @@ -111,28 +113,28 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, environment); - asyncCall.Initialize(call); + asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); - Status status = Status.DefaultSuccess; + Status status; + var context = HandlerUtils.NewContext(newRpc); try { Preconditions.CheckArgument(await requestStream.MoveNext()); var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. Preconditions.CheckArgument(!await requestStream.MoveNext()); - - var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context - await handler(context, request, responseStream); + await handler(request, responseStream, context); + status = context.Status; } catch (Exception e) { @@ -142,7 +144,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status); + await responseStream.WriteStatusAsync(status, context.ResponseTrailers); } catch (OperationCanceledException) { @@ -165,23 +167,24 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, environment); - asyncCall.Initialize(call); + asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); - var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context - Status status = Status.DefaultSuccess; + Status status; + var context = HandlerUtils.NewContext(newRpc); try { - var result = await handler(context, requestStream); + var result = await handler(requestStream, context); + status = context.Status; try { await responseStream.WriteAsync(result); @@ -199,7 +202,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status); + await responseStream.WriteStatusAsync(status, context.ResponseTrailers); } catch (OperationCanceledException) { @@ -222,23 +225,24 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, environment); - asyncCall.Initialize(call); + asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); - var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context - Status status = Status.DefaultSuccess; + Status status; + var context = HandlerUtils.NewContext(newRpc); try { - await handler(context, requestStream, responseStream); + await handler(requestStream, responseStream, context); + status = context.Status; } catch (Exception e) { @@ -247,7 +251,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status); + await responseStream.WriteStatusAsync(status, context.ResponseTrailers); } catch (OperationCanceledException) { @@ -259,18 +263,19 @@ namespace Grpc.Core.Internal internal class NoSuchMethodCallHandler : IServerCallHandler { - public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment) + public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler(); + + public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) { // We don't care about the payload type here. var asyncCall = new AsyncCallServer<byte[], byte[]>( (payload) => payload, (payload) => payload, environment); - asyncCall.Initialize(call); + asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); - var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall); var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); - await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method.")); + await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."), Metadata.Empty); await finishedTask; } } @@ -279,8 +284,22 @@ namespace Grpc.Core.Internal { public static Status StatusFromException(Exception e) { + var rpcException = e as RpcException; + if (rpcException != null) + { + // use the status thrown by handler. + return rpcException.Status; + } + // TODO(jtattermusch): what is the right status code here? return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } + + public static ServerCallContext NewContext(ServerRpcNew newRpc) + { + return new ServerCallContext( + newRpc.Method, newRpc.Host, newRpc.Deadline.ToDateTime(), + newRpc.RequestMetadata, CancellationToken.None); + } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index a2d77dd5b7..756dcee87f 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -56,10 +56,10 @@ namespace Grpc.Core.Internal return taskSource.Task; } - public Task WriteStatusAsync(Status status) + public Task WriteStatusAsync(Status status, Metadata trailers) { var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendStatusFromServer(status, taskSource.CompletionDelegate); + call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); return taskSource.Task; } } diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs index de783f5a4b..da2819f14d 100644 --- a/src/csharp/Grpc.Core/Internal/Timespec.cs +++ b/src/csharp/Grpc.Core/Internal/Timespec.cs @@ -43,6 +43,8 @@ namespace Grpc.Core.Internal const int NanosPerSecond = 1000 * 1000 * 1000; const int NanosPerTick = 100; + static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc); + [DllImport("grpc_csharp_ext.dll")] static extern Timespec gprsharp_now(); @@ -52,6 +54,13 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern int gprsharp_sizeof_timespec(); + public Timespec(IntPtr tv_sec, int tv_nsec) + { + this.tv_sec = tv_sec; + this.tv_nsec = tv_nsec; + this.clock_type = GPRClockType.Realtime; + } + // NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8 // so IntPtr seems to have the right size to work on both. public System.IntPtr tv_sec; @@ -76,6 +85,11 @@ namespace Grpc.Core.Internal return gprsharp_now(); } } + + public DateTime ToDateTime() + { + return UnixEpoch.AddTicks(tv_sec.ToInt64() * (NanosPerSecond / NanosPerTick) + tv_nsec / NanosPerTick); + } internal static int NativeSize { diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs index 4552d39d88..2f308cbb11 100644 --- a/src/csharp/Grpc.Core/Metadata.cs +++ b/src/csharp/Grpc.Core/Metadata.cs @@ -220,6 +220,11 @@ namespace Grpc.Core return value; } } + + public override string ToString() + { + return string.Format("[Entry: key={0}, value={1}]", Key, Value); + } } } } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index cbf77196cf..fd30735359 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -53,6 +53,7 @@ namespace Grpc.Core public const int PickUnusedPort = 0; readonly GrpcEnvironment environment; + readonly List<ChannelOption> options; readonly ServerSafeHandle handle; readonly object myLock = new object(); @@ -69,7 +70,8 @@ namespace Grpc.Core public Server(IEnumerable<ChannelOption> options = null) { this.environment = GrpcEnvironment.GetInstance(); - using (var channelArgs = ChannelOptions.CreateChannelArgs(options)) + this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); + using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) { this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs); } @@ -218,16 +220,16 @@ namespace Grpc.Core /// <summary> /// Selects corresponding handler for given call and handles the call. /// </summary> - private async Task InvokeCallHandler(CallSafeHandle call, string method) + private async Task HandleCallAsync(ServerRpcNew newRpc) { try { IServerCallHandler callHandler; - if (!callHandlers.TryGetValue(method, out callHandler)) + if (!callHandlers.TryGetValue(newRpc.Method, out callHandler)) { - callHandler = new NoSuchMethodCallHandler(); + callHandler = NoSuchMethodCallHandler.Instance; } - await callHandler.HandleCall(method, call, environment); + await callHandler.HandleCall(newRpc, environment); } catch (Exception e) { @@ -240,15 +242,15 @@ namespace Grpc.Core /// </summary> private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx) { - // TODO: handle error - - CallSafeHandle call = ctx.GetServerRpcNewCall(); - string method = ctx.GetServerRpcNewMethod(); - - // after server shutdown, the callback returns with null call - if (!call.IsInvalid) + if (success) { - Task.Run(async () => await InvokeCallHandler(call, method)); + ServerRpcNew newRpc = ctx.GetServerRpcNew(); + + // after server shutdown, the callback returns with null call + if (!newRpc.Call.IsInvalid) + { + Task.Run(async () => await HandleCallAsync(newRpc)); + } } AllowOneRpc(); diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index bc9a499c51..17a2eefd07 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -33,6 +33,7 @@ using System; using System.Runtime.CompilerServices; +using System.Threading; using System.Threading.Tasks; namespace Grpc.Core @@ -42,14 +43,93 @@ namespace Grpc.Core /// </summary> public sealed class ServerCallContext { - // TODO(jtattermusch): add cancellationToken + // TODO(jtattermusch): expose method to send initial metadata back to client - // TODO(jtattermusch): add deadline info + private readonly string method; + private readonly string host; + private readonly DateTime deadline; + private readonly Metadata requestHeaders; + private readonly CancellationToken cancellationToken; + private readonly Metadata responseTrailers = new Metadata(); - // TODO(jtattermusch): expose initial metadata sent by client for reading + private Status status = Status.DefaultSuccess; - // TODO(jtattermusch): expose method to send initial metadata back to client + public ServerCallContext(string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken) + { + this.method = method; + this.host = host; + this.deadline = deadline; + this.requestHeaders = requestHeaders; + this.cancellationToken = cancellationToken; + } + + /// <summary> Name of method called in this RPC. </summary> + public string Method + { + get + { + return this.method; + } + } + + /// <summary> Name of host called in this RPC. </summary> + public string Host + { + get + { + return this.host; + } + } + + /// <summary> Deadline for this RPC. </summary> + public DateTime Deadline + { + get + { + return this.deadline; + } + } + + /// <summary> Initial metadata sent by client. </summary> + public Metadata RequestHeaders + { + get + { + return this.requestHeaders; + } + } + + // TODO(jtattermusch): support signalling cancellation. + /// <summary> Cancellation token signals when call is cancelled. </summary> + public CancellationToken CancellationToken + { + get + { + return this.cancellationToken; + } + } + + /// <summary> Trailers to send back to client after RPC finishes.</summary> + public Metadata ResponseTrailers + { + get + { + return this.responseTrailers; + } + } + + /// <summary> Status to send back to client after RPC finishes.</summary> + public Status Status + { + get + { + return this.status; + } - // TODO(jtattermusch): allow setting status and trailing metadata to send after handler completes. + set + { + status = value; + } + } } } diff --git a/src/csharp/Grpc.Core/ServerMethods.cs b/src/csharp/Grpc.Core/ServerMethods.cs index 377b78eb30..d457770203 100644 --- a/src/csharp/Grpc.Core/ServerMethods.cs +++ b/src/csharp/Grpc.Core/ServerMethods.cs @@ -42,28 +42,28 @@ namespace Grpc.Core /// <summary> /// Server-side handler for unary call. /// </summary> - public delegate Task<TResponse> UnaryServerMethod<TRequest, TResponse>(ServerCallContext context, TRequest request) + public delegate Task<TResponse> UnaryServerMethod<TRequest, TResponse>(TRequest request, ServerCallContext context) where TRequest : class where TResponse : class; /// <summary> /// Server-side handler for client streaming call. /// </summary> - public delegate Task<TResponse> ClientStreamingServerMethod<TRequest, TResponse>(ServerCallContext context, IAsyncStreamReader<TRequest> requestStream) + public delegate Task<TResponse> ClientStreamingServerMethod<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, ServerCallContext context) where TRequest : class where TResponse : class; /// <summary> /// Server-side handler for server streaming call. /// </summary> - public delegate Task ServerStreamingServerMethod<TRequest, TResponse>(ServerCallContext context, TRequest request, IServerStreamWriter<TResponse> responseStream) + public delegate Task ServerStreamingServerMethod<TRequest, TResponse>(TRequest request, IServerStreamWriter<TResponse> responseStream, ServerCallContext context) where TRequest : class where TResponse : class; /// <summary> /// Server-side handler for bidi streaming call. /// </summary> - public delegate Task DuplexStreamingServerMethod<TRequest, TResponse>(ServerCallContext context, IAsyncStreamReader<TRequest> requestStream, IServerStreamWriter<TResponse> responseStream) + public delegate Task DuplexStreamingServerMethod<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, IServerStreamWriter<TResponse> responseStream, ServerCallContext context) where TRequest : class where TResponse : class; } diff --git a/src/csharp/Grpc.Core/Version.cs b/src/csharp/Grpc.Core/Version.cs index f1db1f6157..b5cb652945 100644 --- a/src/csharp/Grpc.Core/Version.cs +++ b/src/csharp/Grpc.Core/Version.cs @@ -2,4 +2,4 @@ using System.Reflection; using System.Runtime.CompilerServices; // The current version of gRPC C#. -[assembly: AssemblyVersion("0.6.0.*")] +[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".*")] diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs new file mode 100644 index 0000000000..656a3d47bb --- /dev/null +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -0,0 +1,13 @@ +using System.Reflection; +using System.Runtime.CompilerServices; + +namespace Grpc.Core +{ + public static class VersionInfo + { + /// <summary> + /// Current version of gRPC + /// </summary> + public const string CurrentVersion = "0.6.0"; + } +} diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index e7c4b33120..7a957c5b6f 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -144,7 +144,7 @@ namespace math.Tests n => Num.CreateBuilder().SetNum_(n).Build()); await call.RequestStream.WriteAll(numbers); - var result = await call.Result; + var result = await call.ResponseAsync; Assert.AreEqual(60, result.Num_); } }).Wait(); diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs index 7deb651689..06d81a4d83 100644 --- a/src/csharp/Grpc.Examples/MathExamples.cs +++ b/src/csharp/Grpc.Examples/MathExamples.cs @@ -46,8 +46,7 @@ namespace math public static async Task DivAsyncExample(Math.IMathClient client) { - Task<DivReply> resultTask = client.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); - DivReply result = await resultTask; + DivReply result = await client.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build()); Console.WriteLine("DivAsync Result: " + result); } @@ -72,7 +71,7 @@ namespace math using (var call = client.Sum()) { await call.RequestStream.WriteAll(numbers); - Console.WriteLine("Sum Result: " + await call.Result); + Console.WriteLine("Sum Result: " + await call.ResponseAsync); } } @@ -104,7 +103,7 @@ namespace math using (var sumCall = client.Sum()) { await sumCall.RequestStream.WriteAll(numbers); - sum = await sumCall.Result; + sum = await sumCall.ResponseAsync; } DivReply result = await client.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build()); diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs index 1805972ce3..ef787cf1d8 100644 --- a/src/csharp/Grpc.Examples/MathGrpc.cs +++ b/src/csharp/Grpc.Examples/MathGrpc.cs @@ -45,7 +45,7 @@ namespace math { public interface IMathClient { global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - Task<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); @@ -54,10 +54,10 @@ namespace math { // server-side interface public interface IMath { - Task<global::math.DivReply> Div(ServerCallContext context, global::math.DivArgs request); - Task DivMany(ServerCallContext context, IAsyncStreamReader<global::math.DivArgs> requestStream, IServerStreamWriter<global::math.DivReply> responseStream); - Task Fib(ServerCallContext context, global::math.FibArgs request, IServerStreamWriter<global::math.Num> responseStream); - Task<global::math.Num> Sum(ServerCallContext context, IAsyncStreamReader<global::math.Num> requestStream); + Task<global::math.DivReply> Div(global::math.DivArgs request, ServerCallContext context); + Task DivMany(IAsyncStreamReader<global::math.DivArgs> requestStream, IServerStreamWriter<global::math.DivReply> responseStream, ServerCallContext context); + Task Fib(global::math.FibArgs request, IServerStreamWriter<global::math.Num> responseStream, ServerCallContext context); + Task<global::math.Num> Sum(IAsyncStreamReader<global::math.Num> requestStream, ServerCallContext context); } // client stub @@ -71,7 +71,7 @@ namespace math { var call = CreateCall(__ServiceName, __Method_Div, headers); return Calls.BlockingUnaryCall(call, request, cancellationToken); } - public Task<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Div, headers); return Calls.AsyncUnaryCall(call, request, cancellationToken); diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs index e247ac9d73..3dd0f53a0d 100644 --- a/src/csharp/Grpc.Examples/MathServiceImpl.cs +++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs @@ -45,12 +45,12 @@ namespace math /// </summary> public class MathServiceImpl : Math.IMath { - public Task<DivReply> Div(ServerCallContext context, DivArgs request) + public Task<DivReply> Div(DivArgs request, ServerCallContext context) { return Task.FromResult(DivInternal(request)); } - public async Task Fib(ServerCallContext context, FibArgs request, IServerStreamWriter<Num> responseStream) + public async Task Fib(FibArgs request, IServerStreamWriter<Num> responseStream, ServerCallContext context) { if (request.Limit <= 0) { @@ -67,7 +67,7 @@ namespace math } } - public async Task<Num> Sum(ServerCallContext context, IAsyncStreamReader<Num> requestStream) + public async Task<Num> Sum(IAsyncStreamReader<Num> requestStream, ServerCallContext context) { long sum = 0; await requestStream.ForEach(async num => @@ -77,7 +77,7 @@ namespace math return Num.CreateBuilder().SetNum_(sum).Build(); } - public async Task DivMany(ServerCallContext context, IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream) + public async Task DivMany(IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream, ServerCallContext context) { await requestStream.ForEach(async divArgs => { diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs index 73ff0e74b5..bc14a0a62f 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs @@ -87,9 +87,7 @@ namespace Grpc.HealthCheck.Tests [Test] public void ServiceDoesntExist() { - // TODO(jtattermusch): currently, this returns wrong status code, because we don't enable sending arbitrary status code from - // server handlers yet. - Assert.Throws(typeof(RpcException), () => client.Check(HealthCheckRequest.CreateBuilder().SetHost("").SetService("nonexistent.service").Build())); + Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => client.Check(HealthCheckRequest.CreateBuilder().SetHost("").SetService("nonexistent.service").Build())); } // TODO(jtattermusch): add test with timeout once timeouts are supported diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs index 9b7c4f2140..7184415655 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs @@ -101,7 +101,7 @@ namespace Grpc.HealthCheck.Tests private static HealthCheckResponse.Types.ServingStatus GetStatusHelper(HealthServiceImpl impl, string host, string service) { - return impl.Check(null, HealthCheckRequest.CreateBuilder().SetHost(host).SetService(service).Build()).Result.Status; + return impl.Check(HealthCheckRequest.CreateBuilder().SetHost(host).SetService(service).Build(), null).Result.Status; } } } diff --git a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs index 3aebdcb557..217127eca7 100644 --- a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs +++ b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs @@ -25,13 +25,13 @@ namespace Grpc.Health.V1Alpha { public interface IHealthClient { global::Grpc.Health.V1Alpha.HealthCheckResponse Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - Task<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncUnaryCall<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); } // server-side interface public interface IHealth { - Task<global::Grpc.Health.V1Alpha.HealthCheckResponse> Check(ServerCallContext context, global::Grpc.Health.V1Alpha.HealthCheckRequest request); + Task<global::Grpc.Health.V1Alpha.HealthCheckResponse> Check(global::Grpc.Health.V1Alpha.HealthCheckRequest request, ServerCallContext context); } // client stub @@ -45,7 +45,7 @@ namespace Grpc.Health.V1Alpha { var call = CreateCall(__ServiceName, __Method_Check, headers); return Calls.BlockingUnaryCall(call, request, cancellationToken); } - public Task<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncUnaryCall<global::Grpc.Health.V1Alpha.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1Alpha.HealthCheckRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_Check, headers); return Calls.AsyncUnaryCall(call, request, cancellationToken); diff --git a/src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs b/src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs index db3a2a0942..3c3b9c35f1 100644 --- a/src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs +++ b/src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs @@ -95,7 +95,7 @@ namespace Grpc.HealthCheck } } - public Task<HealthCheckResponse> Check(ServerCallContext context, HealthCheckRequest request) + public Task<HealthCheckResponse> Check(HealthCheckRequest request, ServerCallContext context) { lock (myLock) { diff --git a/src/csharp/Grpc.HealthCheck/Settings.StyleCop b/src/csharp/Grpc.HealthCheck/Settings.StyleCop new file mode 100644 index 0000000000..2942add962 --- /dev/null +++ b/src/csharp/Grpc.HealthCheck/Settings.StyleCop @@ -0,0 +1,10 @@ +<StyleCopSettings Version="105"> + <SourceFileList> + <SourceFile>Health.cs</SourceFile> + <Settings> + <GlobalSettings> + <BooleanProperty Name="RulesEnabledByDefault">False</BooleanProperty> + </GlobalSettings> + </Settings> + </SourceFileList> +</StyleCopSettings> diff --git a/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj b/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj index 328acb5b47..dc1d0a44c0 100644 --- a/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj +++ b/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj @@ -3,8 +3,6 @@ <PropertyGroup> <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> <Platform Condition=" '$(Platform)' == '' ">x86</Platform> - <ProductVersion>10.0.0</ProductVersion> - <SchemaVersion>2.0</SchemaVersion> <ProjectGuid>{3D166931-BA2D-416E-95A3-D36E8F6E90B9}</ProjectGuid> <OutputType>Exe</OutputType> <RootNamespace>Grpc.IntegrationTesting.Client</RootNamespace> @@ -48,6 +46,10 @@ <Project>{C61154BA-DD4A-4838-8420-0162A28925E0}</Project> <Name>Grpc.IntegrationTesting</Name> </ProjectReference> + <ProjectReference Include="..\Grpc.Core\Grpc.Core.csproj"> + <Project>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</Project> + <Name>Grpc.Core</Name> + </ProjectReference> </ItemGroup> <ItemGroup> <None Include="app.config" /> diff --git a/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj b/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj index ae184c1dc7..f03c8f3ce3 100644 --- a/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj +++ b/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj @@ -3,8 +3,6 @@ <PropertyGroup> <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> <Platform Condition=" '$(Platform)' == '' ">x86</Platform> - <ProductVersion>10.0.0</ProductVersion> - <SchemaVersion>2.0</SchemaVersion> <ProjectGuid>{A654F3B8-E859-4E6A-B30D-227527DBEF0D}</ProjectGuid> <OutputType>Exe</OutputType> <RootNamespace>Grpc.IntegrationTesting.Server</RootNamespace> @@ -48,6 +46,10 @@ <Project>{C61154BA-DD4A-4838-8420-0162A28925E0}</Project> <Name>Grpc.IntegrationTesting</Name> </ProjectReference> + <ProjectReference Include="..\Grpc.Core\Grpc.Core.csproj"> + <Project>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</Project> + <Name>Grpc.Core</Name> + </ProjectReference> </ItemGroup> <ItemGroup> <None Include="app.config" /> diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index ea83aaf2c1..ce255f9423 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -219,7 +219,7 @@ namespace Grpc.IntegrationTesting { await call.RequestStream.WriteAll(bodySizes); - var response = await call.Result; + var response = await call.ResponseAsync; Assert.AreEqual(74922, response.AggregatedPayloadSize); } Console.WriteLine("Passed!"); @@ -399,7 +399,7 @@ namespace Grpc.IntegrationTesting .SetFillOauthScope(true) .Build(); - var response = client.UnaryCall(request, headers: new Metadata { new Metadata.Entry("Authorization", "Bearer " + oauth2Token) } ); + var response = client.UnaryCall(request, headers: new Metadata { new Metadata.Entry("Authorization", "Bearer " + oauth2Token) }); Assert.AreEqual(AuthScopeResponse, response.OauthScope); Assert.AreEqual(ServiceAccountUser, response.Username); @@ -421,7 +421,7 @@ namespace Grpc.IntegrationTesting try { - var response = await call.Result; + var response = await call.ResponseAsync; Assert.Fail(); } catch (RpcException e) diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs index 96d9b23717..de2fa07441 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs @@ -60,9 +60,9 @@ namespace grpc.testing { public interface ITestServiceClient { global::grpc.testing.Empty EmptyCall(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncUnaryCall<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); global::grpc.testing.SimpleResponse UnaryCall(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); - Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncUnaryCall<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); AsyncServerStreamingCall<global::grpc.testing.StreamingOutputCallResponse> StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); AsyncClientStreamingCall<global::grpc.testing.StreamingInputCallRequest, global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); AsyncDuplexStreamingCall<global::grpc.testing.StreamingOutputCallRequest, global::grpc.testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)); @@ -72,12 +72,12 @@ namespace grpc.testing { // server-side interface public interface ITestService { - Task<global::grpc.testing.Empty> EmptyCall(ServerCallContext context, global::grpc.testing.Empty request); - Task<global::grpc.testing.SimpleResponse> UnaryCall(ServerCallContext context, global::grpc.testing.SimpleRequest request); - Task StreamingOutputCall(ServerCallContext context, global::grpc.testing.StreamingOutputCallRequest request, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream); - Task<global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingInputCallRequest> requestStream); - Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream); - Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream); + Task<global::grpc.testing.Empty> EmptyCall(global::grpc.testing.Empty request, ServerCallContext context); + Task<global::grpc.testing.SimpleResponse> UnaryCall(global::grpc.testing.SimpleRequest request, ServerCallContext context); + Task StreamingOutputCall(global::grpc.testing.StreamingOutputCallRequest request, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); + Task<global::grpc.testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::grpc.testing.StreamingInputCallRequest> requestStream, ServerCallContext context); + Task FullDuplexCall(IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); + Task HalfDuplexCall(IAsyncStreamReader<global::grpc.testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::grpc.testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); } // client stub @@ -91,7 +91,7 @@ namespace grpc.testing { var call = CreateCall(__ServiceName, __Method_EmptyCall, headers); return Calls.BlockingUnaryCall(call, request, cancellationToken); } - public Task<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncUnaryCall<global::grpc.testing.Empty> EmptyCallAsync(global::grpc.testing.Empty request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_EmptyCall, headers); return Calls.AsyncUnaryCall(call, request, cancellationToken); @@ -101,7 +101,7 @@ namespace grpc.testing { var call = CreateCall(__ServiceName, __Method_UnaryCall, headers); return Calls.BlockingUnaryCall(call, request, cancellationToken); } - public Task<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) + public AsyncUnaryCall<global::grpc.testing.SimpleResponse> UnaryCallAsync(global::grpc.testing.SimpleRequest request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken)) { var call = CreateCall(__ServiceName, __Method_UnaryCall, headers); return Calls.AsyncUnaryCall(call, request, cancellationToken); diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs index 6bd997d1f4..ccf9fe6ced 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs @@ -46,19 +46,19 @@ namespace grpc.testing /// </summary> public class TestServiceImpl : TestService.ITestService { - public Task<Empty> EmptyCall(ServerCallContext context, Empty request) + public Task<Empty> EmptyCall(Empty request, ServerCallContext context) { return Task.FromResult(Empty.DefaultInstance); } - public Task<SimpleResponse> UnaryCall(ServerCallContext context, SimpleRequest request) + public Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context) { var response = SimpleResponse.CreateBuilder() .SetPayload(CreateZerosPayload(request.ResponseSize)).Build(); return Task.FromResult(response); } - public async Task StreamingOutputCall(ServerCallContext context, StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream) + public async Task StreamingOutputCall(StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context) { foreach (var responseParam in request.ResponseParametersList) { @@ -68,7 +68,7 @@ namespace grpc.testing } } - public async Task<StreamingInputCallResponse> StreamingInputCall(ServerCallContext context, IAsyncStreamReader<StreamingInputCallRequest> requestStream) + public async Task<StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<StreamingInputCallRequest> requestStream, ServerCallContext context) { int sum = 0; await requestStream.ForEach(async request => @@ -78,7 +78,7 @@ namespace grpc.testing return StreamingInputCallResponse.CreateBuilder().SetAggregatedPayloadSize(sum).Build(); } - public async Task FullDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream) + public async Task FullDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context) { await requestStream.ForEach(async request => { @@ -91,7 +91,7 @@ namespace grpc.testing }); } - public async Task HalfDuplexCall(ServerCallContext context, IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream) + public async Task HalfDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream, ServerCallContext context) { throw new NotImplementedException(); } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 7dd1959a5f..682521446f 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -167,6 +167,29 @@ grpcsharp_metadata_array_add(grpc_metadata_array *array, const char *key, array->count++; } +GPR_EXPORT gpr_intptr GPR_CALLTYPE +grpcsharp_metadata_array_count(grpc_metadata_array *array) { + return (gpr_intptr) array->count; +} + +GPR_EXPORT const char *GPR_CALLTYPE +grpcsharp_metadata_array_get_key(grpc_metadata_array *array, size_t index) { + GPR_ASSERT(index < array->count); + return array->metadata[index].key; +} + +GPR_EXPORT const char *GPR_CALLTYPE +grpcsharp_metadata_array_get_value(grpc_metadata_array *array, size_t index) { + GPR_ASSERT(index < array->count); + return array->metadata[index].value; +} + +GPR_EXPORT gpr_intptr GPR_CALLTYPE +grpcsharp_metadata_array_get_value_length(grpc_metadata_array *array, size_t index) { + GPR_ASSERT(index < array->count); + return (gpr_intptr) array->metadata[index].value_length; +} + /* Move contents of metadata array */ void grpcsharp_metadata_array_move(grpc_metadata_array *dest, grpc_metadata_array *src) { @@ -218,6 +241,12 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_destroy(grpcsharp_batch_con gpr_free(ctx); } +GPR_EXPORT const grpc_metadata_array *GPR_CALLTYPE +grpcsharp_batch_context_recv_initial_metadata( + const grpcsharp_batch_context *ctx) { + return &(ctx->recv_initial_metadata); +} + GPR_EXPORT gpr_intptr GPR_CALLTYPE grpcsharp_batch_context_recv_message_length( const grpcsharp_batch_context *ctx) { if (!ctx->recv_message) { @@ -260,6 +289,12 @@ grpcsharp_batch_context_recv_status_on_client_details( return ctx->recv_status_on_client.status_details; } +GPR_EXPORT const grpc_metadata_array *GPR_CALLTYPE +grpcsharp_batch_context_recv_status_on_client_trailing_metadata( + const grpcsharp_batch_context *ctx) { + return &(ctx->recv_status_on_client.trailing_metadata); +} + GPR_EXPORT grpc_call *GPR_CALLTYPE grpcsharp_batch_context_server_rpc_new_call( const grpcsharp_batch_context *ctx) { return ctx->server_rpc_new.call; @@ -271,6 +306,24 @@ grpcsharp_batch_context_server_rpc_new_method( return ctx->server_rpc_new.call_details.method; } +GPR_EXPORT const char *GPR_CALLTYPE +grpcsharp_batch_context_server_rpc_new_host( + const grpcsharp_batch_context *ctx) { + return ctx->server_rpc_new.call_details.host; +} + +GPR_EXPORT gpr_timespec GPR_CALLTYPE +grpcsharp_batch_context_server_rpc_new_deadline( + const grpcsharp_batch_context *ctx) { + return ctx->server_rpc_new.call_details.deadline; +} + +GPR_EXPORT const grpc_metadata_array *GPR_CALLTYPE +grpcsharp_batch_context_server_rpc_new_request_metadata( + const grpcsharp_batch_context *ctx) { + return &(ctx->server_rpc_new.request_metadata); +} + GPR_EXPORT gpr_int32 GPR_CALLTYPE grpcsharp_batch_context_recv_close_on_server_cancelled( const grpcsharp_batch_context *ctx) { @@ -589,15 +642,20 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code, - const char *status_details) { + const char *status_details, + grpc_metadata_array *trailing_metadata) { /* TODO: don't use magic number */ grpc_op ops[1]; ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].data.send_status_from_server.status = status_code; ops[0].data.send_status_from_server.status_details = gpr_strdup(status_details); - ops[0].data.send_status_from_server.trailing_metadata = NULL; - ops[0].data.send_status_from_server.trailing_metadata_count = 0; + grpcsharp_metadata_array_move(&(ctx->send_status_from_server.trailing_metadata), + trailing_metadata); + ops[0].data.send_status_from_server.trailing_metadata_count = + ctx->send_status_from_server.trailing_metadata.count; + ops[0].data.send_status_from_server.trailing_metadata = + ctx->send_status_from_server.trailing_metadata.metadata; ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); diff --git a/src/node/src/client.js b/src/node/src/client.js index b7bad949d4..da6327b432 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -47,6 +47,7 @@ var Readable = stream.Readable; var Writable = stream.Writable; var Duplex = stream.Duplex; var util = require('util'); +var version = require('../package.json').version; util.inherits(ClientWritableStream, Writable); @@ -517,9 +518,12 @@ function makeClientConstructor(methods, serviceName) { callback(null, metadata); }; } - - this.server_address = address.replace(/\/$/, ''); + if (!options) { + options = {}; + } + options['grpc.primary_user_agent'] = 'grpc-node/' + version; this.channel = new grpc.Channel(address, options); + this.server_address = address.replace(/\/$/, ''); this.auth_uri = this.server_address + '/' + serviceName; this.updateMetadata = updateMetadata; } diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 18178e49e4..3cb68f8cd8 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -258,6 +258,16 @@ describe('Echo metadata', function() { }); call.end(); }); + it('shows the correct user-agent string', function(done) { + var version = require('../package.json').version; + var call = client.unary({}, function(err, data) { + assert.ifError(err); + }, {key: ['value']}); + call.on('metadata', function(metadata) { + assert(_.startsWith(metadata['user-agent'], 'grpc-node/' + version)); + done(); + }); + }); }); describe('Other conditions', function() { var client; diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c index 000c8d0c38..d9f911a41a 100644 --- a/src/python/src/grpc/_adapter/_c/utility.c +++ b/src/python/src/grpc/_adapter/_c/utility.c @@ -489,10 +489,10 @@ PyObject *pygrpc_cast_metadata_array_to_pyseq(grpc_metadata_array metadata) { void pygrpc_byte_buffer_to_bytes( grpc_byte_buffer *buffer, char **result, size_t *result_size) { grpc_byte_buffer_reader reader; - grpc_byte_buffer_reader_init(&reader, buffer); gpr_slice slice; char *read_result = NULL; size_t size = 0; + grpc_byte_buffer_reader_init(&reader, buffer); while (grpc_byte_buffer_reader_next(&reader, &slice)) { read_result = gpr_realloc(read_result, size + GPR_SLICE_LENGTH(slice)); memcpy(read_result + size, GPR_SLICE_START_PTR(slice), |