diff options
author | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2015-08-12 01:14:17 +0200 |
---|---|---|
committer | Nicolas "Pixel" Noble <pixel@nobis-crew.org> | 2015-08-12 01:14:17 +0200 |
commit | b457cd831ad519d5ec023005ba6ccfb52ff4c9cb (patch) | |
tree | 75575c3da2adaf0e44a382558428b0eb47f5ab43 /src | |
parent | f0b417dc84d52ebd3890751b0ee331f9eee49e37 (diff) | |
parent | 73578f7f62df4fd03035ad8e04c43b492649a064 (diff) |
Merge remote-tracking branch 'google/master' into the-ultimate-showdown
Conflicts:
src/csharp/ext/grpc_csharp_ext.c
Diffstat (limited to 'src')
71 files changed, 2569 insertions, 543 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index a293c93ec6..6c2e6b38a8 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -527,6 +527,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { } if (old_lb_policy != NULL) { + grpc_lb_policy_shutdown(old_lb_policy); GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); } diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index f368819597..d1f228665f 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -84,6 +84,8 @@ static void on_compute_engine_detection_http_response( gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset)); } +static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } + static int is_stack_running_on_compute_engine(void) { compute_engine_detector detector; grpc_httpcli_request request; @@ -114,12 +116,12 @@ static int is_stack_running_on_compute_engine(void) { while (!detector.is_done) { grpc_pollset_worker worker; grpc_pollset_work(&detector.pollset, &worker, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_MONOTONIC)); } gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); grpc_httpcli_context_destroy(&context); - grpc_pollset_destroy(&detector.pollset); + grpc_pollset_shutdown(&detector.pollset, destroy_pollset, &detector.pollset); return detector.success; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index e1a1f38a12..6a1a6cbf30 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1544,6 +1544,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, /* Flag validation: currently allow no flags */ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_INITIAL_METADATA; req->data.send_metadata.count = op->data.send_initial_metadata.count; req->data.send_metadata.metadata = @@ -1558,6 +1559,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return GRPC_CALL_ERROR_INVALID_MESSAGE; } req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; req->flags = op->flags; @@ -1569,6 +1571,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_CLOSE; req->flags = op->flags; break; @@ -1579,6 +1582,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return GRPC_CALL_ERROR_NOT_ON_CLIENT; } req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_TRAILING_METADATA; req->flags = op->flags; req->data.send_metadata.count = @@ -1586,6 +1590,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->data.send_metadata.metadata = op->data.send_status_from_server.trailing_metadata; req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_STATUS; req->data.send_status.code = op->data.send_status_from_server.status; req->data.send_status.details = @@ -1595,6 +1600,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.send_status_from_server.status_details, 0) : NULL; req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_SEND_CLOSE; break; case GRPC_OP_RECV_INITIAL_METADATA: @@ -1604,6 +1610,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_INITIAL_METADATA; req->data.recv_metadata = op->data.recv_initial_metadata; req->data.recv_metadata->count = 0; @@ -1613,6 +1620,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, /* Flag validation: currently allow no flags */ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_MESSAGE; req->data.recv_message = op->data.recv_message; req->flags = op->flags; @@ -1624,22 +1632,26 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return GRPC_CALL_ERROR_NOT_ON_SERVER; } req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_STATUS; req->flags = op->flags; req->data.recv_status.set_value = set_status_value_directly; req->data.recv_status.user_data = op->data.recv_status_on_client.status; req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_STATUS_DETAILS; req->data.recv_status_details.details = op->data.recv_status_on_client.status_details; req->data.recv_status_details.details_capacity = op->data.recv_status_on_client.status_details_capacity; req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_TRAILING_METADATA; req->data.recv_metadata = op->data.recv_status_on_client.trailing_metadata; req->data.recv_metadata->count = 0; req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_CLOSE; finish_func = finish_batch_with_close; break; @@ -1647,12 +1659,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, /* Flag validation: currently allow no flags */ if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_STATUS; req->flags = op->flags; req->data.recv_status.set_value = set_cancelled_value; req->data.recv_status.user_data = op->data.recv_close_on_server.cancelled; req = &reqs[out++]; + if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG; req->op = GRPC_IOREQ_RECV_CLOSE; finish_func = finish_batch_with_close; break; diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 1f89353025..c3150250b8 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -88,8 +88,8 @@ static void on_secure_transport_setup_done(void *arg, c->args.channel_args, secure_endpoint, c->args.metadata_context, 1); grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2); - c->result->filters[0] = &grpc_client_auth_filter; - c->result->filters[1] = &grpc_http_client_filter; + c->result->filters[0] = &grpc_http_client_filter; + c->result->filters[1] = &grpc_client_auth_filter; c->result->num_filters = 2; } notify = c->notify; diff --git a/src/core/surface/version.c b/src/core/surface/version.c index 4f5d648371..d7aaba3868 100644 --- a/src/core/surface/version.c +++ b/src/core/surface/version.c @@ -37,5 +37,5 @@ #include <grpc/grpc.h> const char *grpc_version_string(void) { - return "0.10.0.0"; + return "0.10.1.0"; } diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h index 936b0c25b0..e27e6b9fc9 100644 --- a/src/core/tsi/transport_security_interface.h +++ b/src/core/tsi/transport_security_interface.h @@ -158,6 +158,8 @@ tsi_result tsi_frame_protector_protect_flush( value is expected to be at most max_protected_frame_size minus overhead which means that max_protected_frame_size is a safe bet. The output value is the number of bytes actually written. + If *unprotected_bytes_size is unchanged, there may be more data remaining + to unprotect, and the caller should call this function again. - This method returns TSI_OK in case of success. Success includes cases where there is not enough data to output a frame in which case diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 2d6114e06b..6cd6b77fcf 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -34,6 +34,7 @@ #include <grpc/support/log.h> #include <grpc++/channel_arguments.h> +#include <grpc++/impl/grpc_library.h> #include "src/cpp/client/channel.h" #include "src/cpp/client/secure_credentials.h" @@ -61,12 +62,14 @@ std::shared_ptr<Credentials> WrapCredentials(grpc_credentials* creds) { } // namespace std::shared_ptr<Credentials> GoogleDefaultCredentials() { + GrpcLibrary init; // To call grpc_init(). return WrapCredentials(grpc_google_default_credentials_create()); } // Builds SSL Credentials given SSL specific options std::shared_ptr<Credentials> SslCredentials( const SslCredentialsOptions& options) { + GrpcLibrary init; // To call grpc_init(). grpc_ssl_pem_key_cert_pair pem_key_cert_pair = { options.pem_private_key.c_str(), options.pem_cert_chain.c_str()}; @@ -78,6 +81,7 @@ std::shared_ptr<Credentials> SslCredentials( // Builds credentials for use when running in GCE std::shared_ptr<Credentials> ComputeEngineCredentials() { + GrpcLibrary init; // To call grpc_init(). return WrapCredentials(grpc_compute_engine_credentials_create()); } @@ -85,6 +89,7 @@ std::shared_ptr<Credentials> ComputeEngineCredentials() { std::shared_ptr<Credentials> ServiceAccountCredentials( const grpc::string& json_key, const grpc::string& scope, long token_lifetime_seconds) { + GrpcLibrary init; // To call grpc_init(). if (token_lifetime_seconds <= 0) { gpr_log(GPR_ERROR, "Trying to create ServiceAccountCredentials " @@ -100,6 +105,7 @@ std::shared_ptr<Credentials> ServiceAccountCredentials( // Builds JWT credentials. std::shared_ptr<Credentials> ServiceAccountJWTAccessCredentials( const grpc::string& json_key, long token_lifetime_seconds) { + GrpcLibrary init; // To call grpc_init(). if (token_lifetime_seconds <= 0) { gpr_log(GPR_ERROR, "Trying to create JWTCredentials with non-positive lifetime"); @@ -114,6 +120,7 @@ std::shared_ptr<Credentials> ServiceAccountJWTAccessCredentials( // Builds refresh token credentials. std::shared_ptr<Credentials> RefreshTokenCredentials( const grpc::string& json_refresh_token) { + GrpcLibrary init; // To call grpc_init(). return WrapCredentials( grpc_refresh_token_credentials_create(json_refresh_token.c_str())); } @@ -121,6 +128,7 @@ std::shared_ptr<Credentials> RefreshTokenCredentials( // Builds access token credentials. std::shared_ptr<Credentials> AccessTokenCredentials( const grpc::string& access_token) { + GrpcLibrary init; // To call grpc_init(). return WrapCredentials( grpc_access_token_credentials_create(access_token.c_str())); } @@ -129,6 +137,7 @@ std::shared_ptr<Credentials> AccessTokenCredentials( std::shared_ptr<Credentials> IAMCredentials( const grpc::string& authorization_token, const grpc::string& authority_selector) { + GrpcLibrary init; // To call grpc_init(). return WrapCredentials(grpc_iam_credentials_create( authorization_token.c_str(), authority_selector.c_str())); } diff --git a/src/cpp/common/auth_property_iterator.cc b/src/cpp/common/auth_property_iterator.cc index e706c6c921..ba88983515 100644 --- a/src/cpp/common/auth_property_iterator.cc +++ b/src/cpp/common/auth_property_iterator.cc @@ -31,7 +31,7 @@ * */ -#include <grpc++/auth_property_iterator.h> +#include <grpc++/auth_context.h> #include <grpc/grpc_security.h> diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 64ea21800f..c5fc85b3fe 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -46,47 +46,18 @@ namespace Grpc.Core.Tests public class ClientServerTest { const string Host = "127.0.0.1"; - const string ServiceName = "tests.Test"; - - static readonly Method<string, string> EchoMethod = new Method<string, string>( - MethodType.Unary, - ServiceName, - "Echo", - Marshallers.StringMarshaller, - Marshallers.StringMarshaller); - - static readonly Method<string, string> ConcatAndEchoMethod = new Method<string, string>( - MethodType.ClientStreaming, - ServiceName, - "ConcatAndEcho", - Marshallers.StringMarshaller, - Marshallers.StringMarshaller); - - static readonly Method<string, string> NonexistentMethod = new Method<string, string>( - MethodType.Unary, - ServiceName, - "NonexistentMethod", - Marshallers.StringMarshaller, - Marshallers.StringMarshaller); - - static readonly ServerServiceDefinition ServiceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName) - .AddMethod(EchoMethod, EchoHandler) - .AddMethod(ConcatAndEchoMethod, ConcatAndEchoHandler) - .Build(); + MockServiceHelper helper; Server server; Channel channel; [SetUp] public void Init() { - server = new Server - { - Services = { ServiceDefinition }, - Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } } - }; + helper = new MockServiceHelper(Host); + server = helper.GetServer(); server.Start(); - channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure); + channel = helper.GetChannel(); } [TearDown] @@ -103,86 +74,79 @@ namespace Grpc.Core.Tests } [Test] - public void UnaryCall() + public async Task UnaryCall() { - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); - Assert.AreEqual("ABC", Calls.BlockingUnaryCall(callDetails, "ABC")); + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + return request; + }); + + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC")); + + Assert.AreEqual("ABC", await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC")); } [Test] public void UnaryCall_ServerHandlerThrows() { - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); - try + helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => { - Calls.BlockingUnaryCall(callDetails, "THROW"); - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); - } + throw new Exception("This was thrown on purpose by a test"); + }); + + var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc")); + Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode); + + var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc")); + Assert.AreEqual(StatusCode.Unknown, ex2.Status.StatusCode); } [Test] public void UnaryCall_ServerHandlerThrowsRpcException() { - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); - try - { - Calls.BlockingUnaryCall(callDetails, "THROW_UNAUTHENTICATED"); - Assert.Fail(); - } - catch (RpcException e) + helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => { - Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode); - } + throw new RpcException(new Status(StatusCode.Unauthenticated, "")); + }); + + var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc")); + Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode); + + var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc")); + Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode); } [Test] public void UnaryCall_ServerHandlerSetsStatus() { - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); - try + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => { - Calls.BlockingUnaryCall(callDetails, "SET_UNAUTHENTICATED"); - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode); - } - } + context.Status = new Status(StatusCode.Unauthenticated, ""); + return ""; + }); - [Test] - public async Task AsyncUnaryCall() - { - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); - var result = await Calls.AsyncUnaryCall(callDetails, "ABC"); - Assert.AreEqual("ABC", result); - } + var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc")); + Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode); - [Test] - public async Task AsyncUnaryCall_ServerHandlerThrows() - { - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); - try - { - await Calls.AsyncUnaryCall(callDetails, "THROW"); - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); - } + var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc")); + Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode); } [Test] public async Task ClientStreamingCall() { - var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallOptions()); - var call = Calls.AsyncClientStreamingCall(callDetails); + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + string result = ""; + await requestStream.ForEach(async (request) => + { + result += request; + }); + await Task.Delay(100); + return result; + }); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall()); await call.RequestStream.WriteAll(new string[] { "A", "B", "C" }); Assert.AreEqual("ABC", await call.ResponseAsync); } @@ -190,36 +154,47 @@ namespace Grpc.Core.Tests [Test] public async Task ClientStreamingCall_CancelAfterBegin() { + var barrier = new TaskCompletionSource<object>(); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + barrier.SetResult(null); + await requestStream.ToList(); + return ""; + }); + var cts = new CancellationTokenSource(); - var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallOptions(cancellationToken: cts.Token)); - var call = Calls.AsyncClientStreamingCall(callDetails); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); - // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it. - await Task.Delay(1000); + await barrier.Task; // make sure the handler has started. cts.Cancel(); - try - { - await call.ResponseAsync; - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); - } + var ex = Assert.Throws<RpcException>(async () => await call.ResponseAsync); + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); } [Test] - public void AsyncUnaryCall_EchoMetadata() + public async Task AsyncUnaryCall_EchoMetadata() { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + foreach (Metadata.Entry metadataEntry in context.RequestHeaders) + { + if (metadataEntry.Key != "user-agent") + { + context.ResponseTrailers.Add(metadataEntry); + } + } + return ""; + }); + var headers = new Metadata { - new Metadata.Entry("ascii-header", "abcdefg"), - new Metadata.Entry("binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff }), + { "ascii-header", "abcdefg" }, + { "binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff } } }; - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions(headers: headers)); - var call = Calls.AsyncUnaryCall(callDetails, "ABC"); - - Assert.AreEqual("ABC", call.ResponseAsync.Result); + var call = Calls.AsyncUnaryCall(helper.CreateUnaryCall(new CallOptions(headers: headers)), "ABC"); + await call; Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode); @@ -236,15 +211,18 @@ namespace Grpc.Core.Tests public void UnaryCall_DisposedChannel() { channel.Dispose(); - - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); - Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(callDetails, "ABC")); + Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC")); } [Test] public void UnaryCallPerformance() { - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + return request; + }); + + var callDetails = helper.CreateUnaryCall(); BenchmarkUtil.RunBenchmark(100, 100, () => { Calls.BlockingUnaryCall(callDetails, "ABC"); }); } @@ -252,44 +230,57 @@ namespace Grpc.Core.Tests [Test] public void UnknownMethodHandler() { - var callDetails = new CallInvocationDetails<string, string>(channel, NonexistentMethod, new CallOptions()); - try - { - Calls.BlockingUnaryCall(callDetails, "ABC"); - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Unimplemented, e.Status.StatusCode); - } + var nonexistentMethod = new Method<string, string>( + MethodType.Unary, + MockServiceHelper.ServiceName, + "NonExistentMethod", + Marshallers.StringMarshaller, + Marshallers.StringMarshaller); + + var callDetails = new CallInvocationDetails<string, string>(channel, nonexistentMethod, new CallOptions()); + + var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(callDetails, "abc")); + Assert.AreEqual(StatusCode.Unimplemented, ex.Status.StatusCode); } [Test] public void UserAgentStringPresent() { - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); - string userAgent = Calls.BlockingUnaryCall(callDetails, "RETURN-USER-AGENT"); + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + return context.RequestHeaders.Where(entry => entry.Key == "user-agent").Single().Value; + }); + + string userAgent = Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"); Assert.IsTrue(userAgent.StartsWith("grpc-csharp/")); } [Test] public void PeerInfoPresent() { - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); - string peer = Calls.BlockingUnaryCall(callDetails, "RETURN-PEER"); + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + return context.Peer; + }); + + string peer = Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"); Assert.IsTrue(peer.Contains(Host)); } [Test] public async Task Channel_WaitForStateChangedAsync() { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + return request; + }); + Assert.Throws(typeof(TaskCanceledException), async () => await channel.WaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(10))); var stateChangedTask = channel.WaitForStateChangedAsync(channel.State); - var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions()); - await Calls.AsyncUnaryCall(callDetails, "abc"); + await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"); await stateChangedTask; Assert.AreEqual(ChannelState.Ready, channel.State); @@ -300,62 +291,9 @@ namespace Grpc.Core.Tests { await channel.ConnectAsync(); Assert.AreEqual(ChannelState.Ready, channel.State); + await channel.ConnectAsync(DateTime.UtcNow.AddMilliseconds(1000)); Assert.AreEqual(ChannelState.Ready, channel.State); } - - private static async Task<string> EchoHandler(string request, ServerCallContext context) - { - foreach (Metadata.Entry metadataEntry in context.RequestHeaders) - { - if (metadataEntry.Key != "user-agent") - { - context.ResponseTrailers.Add(metadataEntry); - } - } - - if (request == "RETURN-USER-AGENT") - { - return context.RequestHeaders.Where(entry => entry.Key == "user-agent").Single().Value; - } - - if (request == "RETURN-PEER") - { - return context.Peer; - } - - if (request == "THROW") - { - throw new Exception("This was thrown on purpose by a test"); - } - - if (request == "THROW_UNAUTHENTICATED") - { - throw new RpcException(new Status(StatusCode.Unauthenticated, "")); - } - - if (request == "SET_UNAUTHENTICATED") - { - context.Status = new Status(StatusCode.Unauthenticated, ""); - } - - return request; - } - - private static async Task<string> ConcatAndEchoHandler(IAsyncStreamReader<string> requestStream, ServerCallContext context) - { - string result = ""; - await requestStream.ForEach(async (request) => - { - if (request == "THROW") - { - throw new Exception("This was thrown on purpose by a test"); - } - result += request; - }); - // simulate processing takes some time. - await Task.Delay(250); - return result; - } } } diff --git a/src/csharp/Grpc.Core.Tests/CompressionTest.cs b/src/csharp/Grpc.Core.Tests/CompressionTest.cs new file mode 100644 index 0000000000..ac0c3d6b5f --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/CompressionTest.cs @@ -0,0 +1,128 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class CompressionTest + { + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(); + + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.Dispose(); + server.ShutdownAsync().Wait(); + } + + [TestFixtureTearDown] + public void CleanupClass() + { + GrpcEnvironment.Shutdown(); + } + + [Test] + public void WriteOptions_Unary() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + context.WriteOptions = new WriteOptions(WriteFlags.NoCompress); + return request; + }); + + var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress)); + Calls.BlockingUnaryCall(helper.CreateUnaryCall(callOptions), "abc"); + } + + [Test] + public async Task WriteOptions_DuplexStreaming() + { + helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => + { + await requestStream.ToList(); + + context.WriteOptions = new WriteOptions(WriteFlags.NoCompress); + + await context.WriteResponseHeadersAsync(new Metadata { { "ascii-header", "abcdefg" } }); + + await responseStream.WriteAsync("X"); + + responseStream.WriteOptions = null; + await responseStream.WriteAsync("Y"); + + responseStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress); + await responseStream.WriteAsync("Z"); + }); + + var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress)); + var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(callOptions)); + + // check that write options from call options are propagated to request stream. + Assert.IsTrue((call.RequestStream.WriteOptions.Flags & WriteFlags.NoCompress) != 0); + + call.RequestStream.WriteOptions = new WriteOptions(); + await call.RequestStream.WriteAsync("A"); + + call.RequestStream.WriteOptions = null; + await call.RequestStream.WriteAsync("B"); + + call.RequestStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress); + await call.RequestStream.WriteAsync("C"); + + await call.RequestStream.CompleteAsync(); + + await call.ResponseStream.ToList(); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs new file mode 100644 index 0000000000..a7f5075874 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs @@ -0,0 +1,122 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class ContextPropagationTest + { + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(); + + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.Dispose(); + server.ShutdownAsync().Wait(); + } + + [TestFixtureTearDown] + public void CleanupClass() + { + GrpcEnvironment.Shutdown(); + } + + [Test] + public async Task PropagateCancellation() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + // check that we didn't obtain the default cancellation token. + Assert.IsTrue(context.CancellationToken.CanBeCanceled); + return "PASS"; + }); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + var propagationToken = context.CreatePropagationToken(); + Assert.IsNotNull(propagationToken.ParentCall); + + var callOptions = new CallOptions(propagationToken: propagationToken); + return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz"); + }); + + var cts = new CancellationTokenSource(); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); + await call.RequestStream.CompleteAsync(); + Assert.AreEqual("PASS", await call); + } + + [Test] + public async Task PropagateDeadline() + { + var deadline = DateTime.UtcNow.AddDays(7); + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + Assert.IsTrue(context.Deadline < deadline.AddMinutes(1)); + Assert.IsTrue(context.Deadline > deadline.AddMinutes(-1)); + return "PASS"; + }); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + var callOptions = new CallOptions(propagationToken: context.CreatePropagationToken()); + return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz"); + }); + + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(deadline: deadline))); + await call.RequestStream.CompleteAsync(); + Assert.AreEqual("PASS", await call); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index f2bf459dc5..97ee0454bb 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -77,6 +77,10 @@ <Compile Include="TimeoutsTest.cs" /> <Compile Include="NUnitVersionTest.cs" /> <Compile Include="ChannelTest.cs" /> + <Compile Include="MockServiceHelper.cs" /> + <Compile Include="ResponseHeadersTest.cs" /> + <Compile Include="CompressionTest.cs" /> + <Compile Include="ContextPropagationTest.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index 9ae12776f3..4ed93c7eca 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -69,5 +69,13 @@ namespace Grpc.Core.Tests Assert.IsFalse(object.ReferenceEquals(env1, env2)); } + + [Test] + public void GetCoreVersionString() + { + var coreVersion = GrpcEnvironment.GetCoreVersionString(); + var parts = coreVersion.Split('.'); + Assert.AreEqual(4, parts.Length); + } } } diff --git a/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs index 46469113c5..33534fdd3c 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs @@ -53,8 +53,8 @@ namespace Grpc.Core.Internal.Tests { var metadata = new Metadata { - new Metadata.Entry("host", "somehost"), - new Metadata.Entry("header2", "header value"), + { "host", "somehost" }, + { "header2", "header value" }, }; var nativeMetadata = MetadataArraySafeHandle.Create(metadata); nativeMetadata.Dispose(); @@ -65,8 +65,8 @@ namespace Grpc.Core.Internal.Tests { var metadata = new Metadata { - new Metadata.Entry("host", "somehost"), - new Metadata.Entry("header2", "header value"), + { "host", "somehost" }, + { "header2", "header value" } }; var nativeMetadata = MetadataArraySafeHandle.Create(metadata); diff --git a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs new file mode 100644 index 0000000000..b642286b11 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs @@ -0,0 +1,248 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + /// <summary> + /// Allows setting up a mock service in the client-server tests easily. + /// </summary> + public class MockServiceHelper + { + public const string ServiceName = "tests.Test"; + + public static readonly Method<string, string> UnaryMethod = new Method<string, string>( + MethodType.Unary, + ServiceName, + "Unary", + Marshallers.StringMarshaller, + Marshallers.StringMarshaller); + + public static readonly Method<string, string> ClientStreamingMethod = new Method<string, string>( + MethodType.ClientStreaming, + ServiceName, + "ClientStreaming", + Marshallers.StringMarshaller, + Marshallers.StringMarshaller); + + public static readonly Method<string, string> ServerStreamingMethod = new Method<string, string>( + MethodType.ServerStreaming, + ServiceName, + "ServerStreaming", + Marshallers.StringMarshaller, + Marshallers.StringMarshaller); + + public static readonly Method<string, string> DuplexStreamingMethod = new Method<string, string>( + MethodType.DuplexStreaming, + ServiceName, + "DuplexStreaming", + Marshallers.StringMarshaller, + Marshallers.StringMarshaller); + + readonly string host; + readonly ServerServiceDefinition serviceDefinition; + + UnaryServerMethod<string, string> unaryHandler; + ClientStreamingServerMethod<string, string> clientStreamingHandler; + ServerStreamingServerMethod<string, string> serverStreamingHandler; + DuplexStreamingServerMethod<string, string> duplexStreamingHandler; + + Server server; + Channel channel; + + public MockServiceHelper(string host = null) + { + this.host = host ?? "localhost"; + + serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName) + .AddMethod(UnaryMethod, (request, context) => unaryHandler(request, context)) + .AddMethod(ClientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context)) + .AddMethod(ServerStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context)) + .AddMethod(DuplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context)) + .Build(); + + var defaultStatus = new Status(StatusCode.Unknown, "Default mock implementation. Please provide your own."); + + unaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + context.Status = defaultStatus; + return ""; + }); + + clientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + context.Status = defaultStatus; + return ""; + }); + + serverStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) => + { + context.Status = defaultStatus; + }); + + duplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => + { + context.Status = defaultStatus; + }); + } + + /// <summary> + /// Returns the default server for this service and creates one if not yet created. + /// </summary> + public Server GetServer() + { + if (server == null) + { + server = new Server + { + Services = { serviceDefinition }, + Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } } + }; + } + return server; + } + + /// <summary> + /// Returns the default channel for this service and creates one if not yet created. + /// </summary> + public Channel GetChannel() + { + if (channel == null) + { + channel = new Channel(Host, GetServer().Ports.Single().BoundPort, Credentials.Insecure); + } + return channel; + } + + public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = null) + { + options = options ?? new CallOptions(); + return new CallInvocationDetails<string, string>(channel, UnaryMethod, options); + } + + public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = null) + { + options = options ?? new CallOptions(); + return new CallInvocationDetails<string, string>(channel, ClientStreamingMethod, options); + } + + public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = null) + { + options = options ?? new CallOptions(); + return new CallInvocationDetails<string, string>(channel, ServerStreamingMethod, options); + } + + public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = null) + { + options = options ?? new CallOptions(); + return new CallInvocationDetails<string, string>(channel, DuplexStreamingMethod, options); + } + + public string Host + { + get + { + return this.host; + } + } + + public ServerServiceDefinition ServiceDefinition + { + get + { + return this.serviceDefinition; + } + } + + public UnaryServerMethod<string, string> UnaryHandler + { + get + { + return this.unaryHandler; + } + + set + { + unaryHandler = value; + } + } + + public ClientStreamingServerMethod<string, string> ClientStreamingHandler + { + get + { + return this.clientStreamingHandler; + } + + set + { + clientStreamingHandler = value; + } + } + + public ServerStreamingServerMethod<string, string> ServerStreamingHandler + { + get + { + return this.serverStreamingHandler; + } + + set + { + serverStreamingHandler = value; + } + } + + public DuplexStreamingServerMethod<string, string> DuplexStreamingHandler + { + get + { + return this.duplexStreamingHandler; + } + + set + { + duplexStreamingHandler = value; + } + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs new file mode 100644 index 0000000000..8925041ba4 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs @@ -0,0 +1,136 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + /// <summary> + /// Tests for response headers support. + /// </summary> + public class ResponseHeadersTest + { + MockServiceHelper helper; + Server server; + Channel channel; + + Metadata headers; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(); + + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + + headers = new Metadata { { "ascii-header", "abcdefg" } }; + } + + [TearDown] + public void Cleanup() + { + channel.Dispose(); + server.ShutdownAsync().Wait(); + } + + [TestFixtureTearDown] + public void CleanupClass() + { + GrpcEnvironment.Shutdown(); + } + + [Test] + public void WriteResponseHeaders_NullNotAllowed() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + Assert.Throws(typeof(NullReferenceException), async () => await context.WriteResponseHeadersAsync(null)); + return "PASS"; + }); + + Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "")); + } + + [Test] + public void WriteResponseHeaders_AllowedOnlyOnce() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + await context.WriteResponseHeadersAsync(headers); + try + { + await context.WriteResponseHeadersAsync(headers); + Assert.Fail(); + } + catch (InvalidOperationException expected) + { + } + return "PASS"; + }); + + Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "")); + } + + [Test] + public async Task WriteResponseHeaders_NotAllowedAfterWrite() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) => + { + await responseStream.WriteAsync("A"); + try + { + await context.WriteResponseHeadersAsync(headers); + Assert.Fail(); + } + catch (InvalidOperationException expected) + { + } + await responseStream.WriteAsync("B"); + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + var responses = await call.ResponseStream.ToList(); + CollectionAssert.AreEqual(new[] { "A", "B" }, responses); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs index fc395b0acd..d875d601b9 100644 --- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs +++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs @@ -48,38 +48,18 @@ namespace Grpc.Core.Tests /// </summary> public class TimeoutsTest { - const string Host = "localhost"; - const string ServiceName = "tests.Test"; - - static readonly Method<string, string> TestMethod = new Method<string, string>( - MethodType.Unary, - ServiceName, - "Test", - Marshallers.StringMarshaller, - Marshallers.StringMarshaller); - - static readonly ServerServiceDefinition ServiceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName) - .AddMethod(TestMethod, TestMethodHandler) - .Build(); - - // provides a way how to retrieve an out-of-band result value from server handler - static TaskCompletionSource<string> stringFromServerHandlerTcs; - + MockServiceHelper helper; Server server; Channel channel; [SetUp] public void Init() { - server = new Server - { - Services = { ServiceDefinition }, - Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } } - }; - server.Start(); - channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure); + helper = new MockServiceHelper(); - stringFromServerHandlerTcs = new TaskCompletionSource<string>(); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); } [TearDown] @@ -98,115 +78,83 @@ namespace Grpc.Core.Tests [Test] public void InfiniteDeadline() { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + Assert.AreEqual(DateTime.MaxValue, context.Deadline); + return "PASS"; + }); + // no deadline specified, check server sees infinite deadline - var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions()); - Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE")); + Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc")); // DateTime.MaxValue deadline specified, check server sees infinite deadline - var callDetails2 = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions()); - Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails2, "RETURN_DEADLINE")); + Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.MaxValue)), "abc")); } [Test] public void DeadlineTransferredToServer() { - var remainingTimeClient = TimeSpan.FromDays(7); - var deadline = DateTime.UtcNow + remainingTimeClient; - Thread.Sleep(1000); - var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline)); - - var serverDeadlineTicksString = Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE"); - var serverDeadline = new DateTime(long.Parse(serverDeadlineTicksString), DateTimeKind.Utc); - - // A fairly relaxed check that the deadline set by client and deadline seen by server - // are in agreement. C core takes care of the work with transferring deadline over the wire, - // so we don't need an exact check here. - Assert.IsTrue(Math.Abs((deadline - serverDeadline).TotalMilliseconds) < 5000); + var clientDeadline = DateTime.UtcNow + TimeSpan.FromDays(7); + + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + // A fairly relaxed check that the deadline set by client and deadline seen by server + // are in agreement. C core takes care of the work with transferring deadline over the wire, + // so we don't need an exact check here. + Assert.IsTrue(Math.Abs((clientDeadline - context.Deadline).TotalMilliseconds) < 5000); + return "PASS"; + }); + Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: clientDeadline)), "abc"); } [Test] public void DeadlineInThePast() { - var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: DateTime.MinValue)); - - try + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => { - Calls.BlockingUnaryCall(callDetails, "TIMEOUT"); - Assert.Fail(); - } - catch (RpcException e) - { - // We can't guarantee the status code always DeadlineExceeded. See issue #2685. - Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); - } + await Task.Delay(60000); + return "FAIL"; + }); + + var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.MinValue)), "abc")); + // We can't guarantee the status code always DeadlineExceeded. See issue #2685. + Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); } [Test] public void DeadlineExceededStatusOnTimeout() { - var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); - var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline)); - - try + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => { - Calls.BlockingUnaryCall(callDetails, "TIMEOUT"); - Assert.Fail(); - } - catch (RpcException e) - { - // We can't guarantee the status code always DeadlineExceeded. See issue #2685. - Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); - } + await Task.Delay(60000); + return "FAIL"; + }); + + var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)))), "abc")); + // We can't guarantee the status code always DeadlineExceeded. See issue #2685. + Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); } [Test] - public void ServerReceivesCancellationOnTimeout() + public async Task ServerReceivesCancellationOnTimeout() { - var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)); - var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline)); + var serverReceivedCancellationTcs = new TaskCompletionSource<bool>(); - try - { - Calls.BlockingUnaryCall(callDetails, "CHECK_CANCELLATION_RECEIVED"); - Assert.Fail(); - } - catch (RpcException e) - { - // We can't guarantee the status code is always DeadlineExceeded. See issue #2685. - Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); - } - Assert.AreEqual("CANCELLED", stringFromServerHandlerTcs.Task.Result); - } - - private static async Task<string> TestMethodHandler(string request, ServerCallContext context) - { - if (request == "TIMEOUT") - { - await Task.Delay(60000); - return ""; - } - - if (request == "RETURN_DEADLINE") - { - if (context.Deadline == DateTime.MaxValue) - { - return "DATETIME_MAXVALUE"; - } - - return context.Deadline.Ticks.ToString(); - } - - if (request == "CHECK_CANCELLATION_RECEIVED") + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => { // wait until cancellation token is fired. var tcs = new TaskCompletionSource<object>(); context.CancellationToken.Register(() => { tcs.SetResult(null); }); await tcs.Task; - stringFromServerHandlerTcs.SetResult("CANCELLED"); + serverReceivedCancellationTcs.SetResult(true); return ""; - } + }); + + var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)))), "abc")); + // We can't guarantee the status code always DeadlineExceeded. See issue #2685. + Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); - return ""; + Assert.IsTrue(await serverReceivedCancellationTcs.Task); } } } diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs index 8e9739335f..0d82b5a28e 100644 --- a/src/csharp/Grpc.Core/CallOptions.cs +++ b/src/csharp/Grpc.Core/CallOptions.cs @@ -47,6 +47,8 @@ namespace Grpc.Core readonly Metadata headers; readonly DateTime deadline; readonly CancellationToken cancellationToken; + readonly WriteOptions writeOptions; + readonly ContextPropagationToken propagationToken; /// <summary> /// Creates a new instance of <c>CallOptions</c>. @@ -54,12 +56,17 @@ namespace Grpc.Core /// <param name="headers">Headers to be sent with the call.</param> /// <param name="deadline">Deadline for the call to finish. null means no deadline.</param> /// <param name="cancellationToken">Can be used to request cancellation of the call.</param> - public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) + /// <param name="writeOptions">Write options that will be used for this call.</param> + /// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param> + public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken? cancellationToken = null, + WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null) { // TODO(jtattermusch): consider only creating metadata object once it's really needed. - this.headers = headers != null ? headers : new Metadata(); - this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue; - this.cancellationToken = cancellationToken; + this.headers = headers ?? new Metadata(); + this.deadline = deadline ?? (propagationToken != null ? propagationToken.Deadline : DateTime.MaxValue); + this.cancellationToken = cancellationToken ?? (propagationToken != null ? propagationToken.CancellationToken : CancellationToken.None); + this.writeOptions = writeOptions; + this.propagationToken = propagationToken; } /// <summary> @@ -85,5 +92,27 @@ namespace Grpc.Core { get { return cancellationToken; } } + + /// <summary> + /// Write options that will be used for this call. + /// </summary> + public WriteOptions WriteOptions + { + get + { + return this.writeOptions; + } + } + + /// <summary> + /// Token for propagating parent call context. + /// </summary> + public ContextPropagationToken PropagationToken + { + get + { + return this.propagationToken; + } + } } } diff --git a/src/csharp/Grpc.Core/CompressionLevel.cs b/src/csharp/Grpc.Core/CompressionLevel.cs new file mode 100644 index 0000000000..399652b85e --- /dev/null +++ b/src/csharp/Grpc.Core/CompressionLevel.cs @@ -0,0 +1,63 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + /// <summary> + /// Compression level based on grpc_compression_level from grpc/compression.h + /// </summary> + public enum CompressionLevel + { + /// <summary> + /// No compression. + /// </summary> + None = 0, + + /// <summary> + /// Low compression. + /// </summary> + Low, + + /// <summary> + /// Medium compression. + /// </summary> + Medium, + + /// <summary> + /// High compression. + /// </summary> + High, + } +} diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs new file mode 100644 index 0000000000..b6ea5115a4 --- /dev/null +++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs @@ -0,0 +1,139 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading; + +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core +{ + /// <summary> + /// Token for propagating context of server side handlers to child calls. + /// In situations when a backend is making calls to another backend, + /// it makes sense to propagate properties like deadline and cancellation + /// token of the server call to the child call. + /// C core provides some other contexts (like tracing context) that + /// are not accessible to C# layer, but this token still allows propagating them. + /// </summary> + public class ContextPropagationToken + { + /// <summary> + /// Default propagation mask used by C core. + /// </summary> + const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff; + + /// <summary> + /// Default propagation mask used by C# - we want to propagate deadline + /// and cancellation token by our own means. + /// </summary> + internal const ContextPropagationFlags DefaultMask = DefaultCoreMask + & ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation; + + readonly CallSafeHandle parentCall; + readonly DateTime deadline; + readonly CancellationToken cancellationToken; + readonly ContextPropagationOptions options; + + internal ContextPropagationToken(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options) + { + this.parentCall = Preconditions.CheckNotNull(parentCall); + this.deadline = deadline; + this.cancellationToken = cancellationToken; + this.options = options ?? ContextPropagationOptions.Default; + } + + internal CallSafeHandle ParentCall + { + get + { + return this.parentCall; + } + } + + internal DateTime Deadline + { + get + { + return this.deadline; + } + } + + internal CancellationToken CancellationToken + { + get + { + return this.cancellationToken; + } + } + + internal ContextPropagationOptions Options + { + get + { + return this.options; + } + } + + internal bool IsPropagateDeadline + { + get { return false; } + } + + internal bool IsPropagateCancellation + { + get { return false; } + } + } + + /// <summary> + /// Options for <see cref="ContextPropagationToken"/>. + /// </summary> + public class ContextPropagationOptions + { + public static readonly ContextPropagationOptions Default = new ContextPropagationOptions(); + } + + /// <summary> + /// Context propagation flags from grpc/grpc.h. + /// </summary> + [Flags] + internal enum ContextPropagationFlags + { + Deadline = 1, + CensusStatsContext = 2, + CensusTracingContext = 4, + Cancellation = 8 + } +} diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 52defd1965..e535c47f55 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -115,6 +115,9 @@ <Compile Include="ChannelState.cs" /> <Compile Include="CallInvocationDetails.cs" /> <Compile Include="CallOptions.cs" /> + <Compile Include="CompressionLevel.cs" /> + <Compile Include="WriteOptions.cs" /> + <Compile Include="ContextPropagationToken.cs" /> </ItemGroup> <ItemGroup> <None Include="Grpc.Core.nuspec" /> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 034a66be3c..1bb83c9962 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -53,6 +53,9 @@ namespace Grpc.Core [DllImport("grpc_csharp_ext.dll")] static extern void grpcsharp_shutdown(); + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_version_string(); // returns not-owned const char* + static object staticLock = new object(); static GrpcEnvironment instance; @@ -164,6 +167,15 @@ namespace Grpc.Core } /// <summary> + /// Gets version of gRPC C core. + /// </summary> + internal static string GetCoreVersionString() + { + var ptr = grpcsharp_version_string(); // the pointer is not owned + return Marshal.PtrToStringAnsi(ptr); + } + + /// <summary> /// Shuts down this environment. /// </summary> private void Close() diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 371fbf27ce..c0a0674e50 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -43,7 +43,7 @@ namespace Grpc.Core /// A stream of messages to be read. /// </summary> /// <typeparam name="T"></typeparam> - public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse> + public interface IAsyncStreamReader<T> : IAsyncEnumerator<T> { // TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface. } diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs index 2000210252..4e2acb9c71 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs @@ -50,5 +50,13 @@ namespace Grpc.Core /// </summary> /// <param name="message">the message to be written. Cannot be null.</param> Task WriteAsync(T message); + + /// <summary> + /// Write options that will be used for the next write. + /// If null, default options will be used. + /// Once set, this property maintains its value across subsequent + /// writes. + /// <value>The write options.</value> + WriteOptions WriteOptions { get; set; } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 414b5c4282..0db9d2a515 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -50,7 +50,7 @@ namespace Grpc.Core.Internal { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); - readonly CallInvocationDetails<TRequest, TResponse> callDetails; + readonly CallInvocationDetails<TRequest, TResponse> details; // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; @@ -63,7 +63,8 @@ namespace Grpc.Core.Internal public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) { - this.callDetails = callDetails; + this.details = callDetails; + this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. } // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but @@ -89,11 +90,11 @@ namespace Grpc.Core.Internal readingDone = true; } - using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { using (var ctx = BatchContextSafeHandle.Create()) { - call.StartUnary(payload, ctx, metadataArray); + call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall()); var ev = cq.Pluck(ctx.Handle); bool success = (ev.success != 0); @@ -130,7 +131,7 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(callDetails.Channel.Environment.CompletionQueue); + Initialize(details.Channel.Environment.CompletionQueue); halfcloseRequested = true; readingDone = true; @@ -138,9 +139,9 @@ namespace Grpc.Core.Internal byte[] payload = UnsafeSerialize(msg); unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartUnary(payload, HandleUnaryResponse, metadataArray); + call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall()); } return unaryResponseTcs.Task; } @@ -157,12 +158,12 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(callDetails.Channel.Environment.CompletionQueue); + Initialize(details.Channel.Environment.CompletionQueue); readingDone = true; unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartClientStreaming(HandleUnaryResponse, metadataArray); } @@ -181,16 +182,16 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(callDetails.Channel.Environment.CompletionQueue); + Initialize(details.Channel.Environment.CompletionQueue); halfcloseRequested = true; halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. byte[] payload = UnsafeSerialize(msg); - using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartServerStreaming(payload, HandleFinished, metadataArray); + call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall()); } } } @@ -206,9 +207,9 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(callDetails.Channel.Environment.CompletionQueue); + Initialize(details.Channel.Environment.CompletionQueue); - using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers)) + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartDuplexStreaming(HandleFinished, metadataArray); } @@ -219,9 +220,9 @@ namespace Grpc.Core.Internal /// Sends a streaming request. Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate) + public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) { - StartSendMessageInternal(msg, completionDelegate); + StartSendMessageInternal(msg, writeFlags, completionDelegate); } /// <summary> @@ -278,6 +279,14 @@ namespace Grpc.Core.Internal } } + public CallInvocationDetails<TRequest, TResponse> Details + { + get + { + return this.details; + } + } + /// <summary> /// On client-side, we only fire readCompletionDelegate once all messages have been read /// and status has been received. @@ -310,14 +319,18 @@ namespace Grpc.Core.Internal protected override void OnReleaseResources() { - callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement(); + details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement(); } private void Initialize(CompletionQueueSafeHandle cq) { - var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq, - callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Options.Deadline)); - callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment(); + var propagationToken = details.Options.PropagationToken; + var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance; + + var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, + parentCall, ContextPropagationToken.DefaultMask, cq, + details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline)); + details.Channel.Environment.DebugStats.ActiveClientCalls.Increment(); InitializeInternal(call); RegisterCancellationCallback(); } @@ -325,7 +338,7 @@ namespace Grpc.Core.Internal // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called. private void RegisterCancellationCallback() { - var token = callDetails.Options.CancellationToken; + var token = details.Options.CancellationToken; if (token.CanBeCanceled) { token.Register(() => this.Cancel()); @@ -333,6 +346,15 @@ namespace Grpc.Core.Internal } /// <summary> + /// Gets WriteFlags set in callDetails.Options.WriteOptions + /// </summary> + private WriteFlags GetWriteFlagsForCall() + { + var writeOptions = details.Options.WriteOptions; + return writeOptions != null ? writeOptions.Flags : default(WriteFlags); + } + + /// <summary> /// Handler for unary response completion. /// </summary> private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 38f2a5baeb..9fa0baca87 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -71,6 +71,9 @@ namespace Grpc.Core.Internal protected bool halfclosed; protected bool finished; // True if close has been received from the peer. + protected bool initialMetadataSent; + protected long streamingWritesCounter; + public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) { this.serializer = Preconditions.CheckNotNull(serializer); @@ -123,7 +126,7 @@ namespace Grpc.Core.Internal /// Initiates sending a message. Only one send operation can be active at a time. /// completionDelegate is invoked upon completion. /// </summary> - protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate) + protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) { byte[] payload = UnsafeSerialize(msg); @@ -132,8 +135,11 @@ namespace Grpc.Core.Internal Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); - call.StartSendMessage(payload, HandleSendFinished); + call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); + sendCompletionDelegate = completionDelegate; + initialMetadataSent = true; + streamingWritesCounter++; } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 513902ee36..3710a65d6b 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -83,9 +83,9 @@ namespace Grpc.Core.Internal /// Sends a streaming response. Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate) + public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) { - StartSendMessageInternal(msg, completionDelegate); + StartSendMessageInternal(msg, writeFlags, completionDelegate); } /// <summary> @@ -98,6 +98,35 @@ namespace Grpc.Core.Internal } /// <summary> + /// Initiates sending a initial metadata. + /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation + /// to make things simpler. + /// completionDelegate is invoked upon completion. + /// </summary> + public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate) + { + lock (myLock) + { + Preconditions.CheckNotNull(headers, "metadata"); + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + + Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call."); + Preconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts."); + CheckSendingAllowed(); + + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + + using (var metadataArray = MetadataArraySafeHandle.Create(headers)) + { + call.StartSendInitialMetadata(HandleSendFinished, metadataArray); + } + + this.initialMetadataSent = true; + sendCompletionDelegate = completionDelegate; + } + } + + /// <summary> /// Sends call result status, also indicating server is done with streaming responses. /// Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. @@ -111,7 +140,7 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray); + call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent); } halfcloseRequested = true; readingDone = true; diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 714749b171..3cb01e29bd 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -42,6 +42,8 @@ namespace Grpc.Core.Internal /// </summary> internal class CallSafeHandle : SafeHandleZeroIsInvalid { + public static readonly CallSafeHandle NullInstance = new CallSafeHandle(); + const uint GRPC_WRITE_BUFFER_HINT = 1; CompletionRegistry completionRegistry; @@ -53,7 +55,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray); + BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, @@ -62,7 +64,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, - MetadataArraySafeHandle metadataArray); + MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, @@ -70,7 +72,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len); + BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, @@ -78,7 +80,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, @@ -89,6 +91,10 @@ namespace Grpc.Core.Internal BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call, + BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); + + [DllImport("grpc_csharp_ext.dll")] static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call); [DllImport("grpc_csharp_ext.dll")] @@ -103,17 +109,17 @@ namespace Grpc.Core.Internal this.completionRegistry = completionRegistry; } - public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray) + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } - public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray) + public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { - grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray) + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } @@ -124,11 +130,11 @@ namespace Grpc.Core.Internal grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk(); + grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); } public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) @@ -138,11 +144,11 @@ namespace Grpc.Core.Internal grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback) + public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk(); + grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); } public void StartSendCloseFromClient(BatchCompletionDelegate callback) @@ -152,11 +158,11 @@ namespace Grpc.Core.Internal grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } - public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk(); + grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); } public void StartReceiveMessage(BatchCompletionDelegate callback) @@ -173,6 +179,13 @@ namespace Grpc.Core.Internal grpcsharp_call_start_serverside(this, ctx).CheckOk(); } + public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, callback); + grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); + } + public void Cancel() { grpcsharp_call_cancel(this).CheckOk(); diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 7324ebdf57..7f03bf4ea5 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -47,7 +47,7 @@ namespace Grpc.Core.Internal static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs); [DllImport("grpc_csharp_ext.dll")] - static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); + static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); [DllImport("grpc_csharp_ext.dll")] static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect); @@ -76,9 +76,9 @@ namespace Grpc.Core.Internal return grpcsharp_secure_channel_create(credentials, target, channelArgs); } - public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) + public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) { - var result = grpcsharp_channel_create_call(this, cq, method, host, deadline); + var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline); result.SetCompletionRegistry(registry); return result; } diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 58f493463b..013f00ff6f 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -40,16 +40,18 @@ namespace Grpc.Core.Internal internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest> { readonly AsyncCall<TRequest, TResponse> call; + WriteOptions writeOptions; public ClientRequestStream(AsyncCall<TRequest, TResponse> call) { this.call = call; + this.writeOptions = call.Details.Options.WriteOptions; } public Task WriteAsync(TRequest message) { var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendMessage(message, taskSource.CompletionDelegate); + call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); return taskSource.Task; } @@ -59,5 +61,24 @@ namespace Grpc.Core.Internal call.StartSendCloseFromClient(taskSource.CompletionDelegate); return taskSource.Task; } + + public WriteOptions WriteOptions + { + get + { + return this.writeOptions; + } + + set + { + writeOptions = value; + } + } + + private WriteFlags GetWriteFlags() + { + var options = writeOptions; + return options != null ? options.Flags : default(WriteFlags); + } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 19f0e3c57f..688f9f6fec 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -75,7 +75,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { Preconditions.CheckArgument(await requestStream.MoveNext()); @@ -131,7 +131,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { Preconditions.CheckArgument(await requestStream.MoveNext()); @@ -187,7 +187,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { var result = await handler(requestStream, context); @@ -247,7 +247,7 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; - var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken); + var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { await handler(requestStream, responseStream, context); @@ -304,13 +304,14 @@ namespace Grpc.Core.Internal return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } - public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken) + public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken) + where TRequest : class + where TResponse : class { DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime(); - return new ServerCallContext( - newRpc.Method, newRpc.Host, peer, realtimeDeadline, - newRpc.RequestMetadata, cancellationToken); + return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline, + newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream); } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 756dcee87f..03e39efc02 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -38,11 +38,12 @@ namespace Grpc.Core.Internal /// <summary> /// Writes responses asynchronously to an underlying AsyncCallServer object. /// </summary> - internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse> + internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions where TRequest : class where TResponse : class { readonly AsyncCallServer<TRequest, TResponse> call; + WriteOptions writeOptions; public ServerResponseStream(AsyncCallServer<TRequest, TResponse> call) { @@ -52,7 +53,7 @@ namespace Grpc.Core.Internal public Task WriteAsync(TResponse message) { var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendMessage(message, taskSource.CompletionDelegate); + call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); return taskSource.Task; } @@ -62,5 +63,31 @@ namespace Grpc.Core.Internal call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); return taskSource.Task; } + + public Task WriteResponseHeadersAsync(Metadata responseHeaders) + { + var taskSource = new AsyncCompletionTaskSource<object>(); + call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate); + return taskSource.Task; + } + + public WriteOptions WriteOptions + { + get + { + return writeOptions; + } + + set + { + writeOptions = value; + } + } + + private WriteFlags GetWriteFlags() + { + var options = writeOptions; + return options != null ? options.Flags : default(WriteFlags); + } } } diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs index 6fd0a7109d..a58dbdbc93 100644 --- a/src/csharp/Grpc.Core/Metadata.cs +++ b/src/csharp/Grpc.Core/Metadata.cs @@ -114,6 +114,16 @@ namespace Grpc.Core entries.Add(item); } + public void Add(string key, string value) + { + Add(new Entry(key, value)); + } + + public void Add(string key, byte[] valueBytes) + { + Add(new Entry(key, valueBytes)); + } + public void Clear() { CheckWriteable(); diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index 032b1390db..75d81c64f3 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -36,15 +36,16 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using Grpc.Core.Internal; + namespace Grpc.Core { /// <summary> /// Context for a server-side call. /// </summary> - public sealed class ServerCallContext + public class ServerCallContext { - // TODO(jtattermusch): expose method to send initial metadata back to client - + private readonly CallSafeHandle callHandle; private readonly string method; private readonly string host; private readonly string peer; @@ -54,15 +55,34 @@ namespace Grpc.Core private readonly Metadata responseTrailers = new Metadata(); private Status status = Status.DefaultSuccess; + private Func<Metadata, Task> writeHeadersFunc; + private IHasWriteOptions writeOptionsHolder; - public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken) + internal ServerCallContext(CallSafeHandle callHandle, string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, + Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder) { + this.callHandle = callHandle; this.method = method; this.host = host; this.peer = peer; this.deadline = deadline; this.requestHeaders = requestHeaders; this.cancellationToken = cancellationToken; + this.writeHeadersFunc = writeHeadersFunc; + this.writeOptionsHolder = writeOptionsHolder; + } + + public Task WriteResponseHeadersAsync(Metadata responseHeaders) + { + return writeHeadersFunc(responseHeaders); + } + + /// <summary> + /// Creates a propagation token to be used to propagate call context to a child call. + /// </summary> + public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null) + { + return new ContextPropagationToken(callHandle, deadline, cancellationToken, options); } /// <summary>Name of method called in this RPC.</summary> @@ -110,7 +130,7 @@ namespace Grpc.Core } } - ///<summary>Cancellation token signals when call is cancelled.</summary> + /// <summary>Cancellation token signals when call is cancelled.</summary> public CancellationToken CancellationToken { get @@ -141,5 +161,31 @@ namespace Grpc.Core status = value; } } + + /// <summary> + /// Allows setting write options for the following write. + /// For streaming response calls, this property is also exposed as on IServerStreamWriter for convenience. + /// Both properties are backed by the same underlying value. + /// </summary> + public WriteOptions WriteOptions + { + get + { + return writeOptionsHolder.WriteOptions; + } + + set + { + writeOptionsHolder.WriteOptions = value; + } + } + } + + /// <summary> + /// Allows sharing write options between ServerCallContext and other objects. + /// </summary> + public interface IHasWriteOptions + { + WriteOptions WriteOptions { get; set; } } } diff --git a/src/csharp/Grpc.Core/Version.cs b/src/csharp/Grpc.Core/Version.cs index b5cb652945..d2a029fbb4 100644 --- a/src/csharp/Grpc.Core/Version.cs +++ b/src/csharp/Grpc.Core/Version.cs @@ -2,4 +2,4 @@ using System.Reflection; using System.Runtime.CompilerServices; // The current version of gRPC C#. -[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".*")] +[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".0")] diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index 656a3d47bb..939372e237 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -8,6 +8,6 @@ namespace Grpc.Core /// <summary> /// Current version of gRPC /// </summary> - public const string CurrentVersion = "0.6.0"; + public const string CurrentVersion = "0.6.1"; } } diff --git a/src/csharp/Grpc.Core/WriteOptions.cs b/src/csharp/Grpc.Core/WriteOptions.cs new file mode 100644 index 0000000000..7ef3189d76 --- /dev/null +++ b/src/csharp/Grpc.Core/WriteOptions.cs @@ -0,0 +1,82 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + /// <summary> + /// Flags for write operations. + /// </summary> + [Flags] + public enum WriteFlags + { + /// <summary> + /// Hint that the write may be buffered and need not go out on the wire immediately. + /// gRPC is free to buffer the message until the next non-buffered + /// write, or until write stream completion, but it need not buffer completely or at all. + /// </summary> + BufferHint = 0x1, + + /// <summary> + /// Force compression to be disabled for a particular write. + /// </summary> + NoCompress = 0x2 + } + + /// <summary> + /// Options for write operations. + /// </summary> + public class WriteOptions + { + /// <summary> + /// Default write options. + /// </summary> + public static readonly WriteOptions Default = new WriteOptions(); + + private WriteFlags flags; + + public WriteOptions(WriteFlags flags = default(WriteFlags)) + { + this.flags = flags; + } + + public WriteFlags Flags + { + get + { + return this.flags; + } + } + } +} diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index 08aece7ef2..73d2a1ca9b 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -92,15 +92,8 @@ namespace math.Tests [Test] public void DivByZero() { - try - { - DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build()); - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); - } + var ex = Assert.Throws<RpcException>(() => client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build())); + Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode); } [Test] @@ -158,15 +151,10 @@ namespace math.Tests using (var call = client.Fib(new FibArgs.Builder { Limit = 0 }.Build(), deadline: DateTime.UtcNow.AddMilliseconds(500))) { - try - { - await call.ResponseStream.ToList(); - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode); - } + var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.ToList()); + + // We can't guarantee the status code always DeadlineExceeded. See issue #2685. + Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); } } diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 7411d91d5a..6802de489d 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -404,15 +404,8 @@ namespace Grpc.IntegrationTesting await Task.Delay(1000); cts.Cancel(); - try - { - var response = await call.ResponseAsync; - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); - } + var ex = Assert.Throws<RpcException>(async () => await call.ResponseAsync); + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); } Console.WriteLine("Passed!"); } @@ -435,15 +428,8 @@ namespace Grpc.IntegrationTesting cts.Cancel(); - try - { - await call.ResponseStream.MoveNext(); - Assert.Fail(); - } - catch (RpcException e) - { - Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode); - } + var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext()); + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); } Console.WriteLine("Passed!"); } diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat index 9e1253bf0b..8a11d01430 100644 --- a/src/csharp/build_packages.bat +++ b/src/csharp/build_packages.bat @@ -1,8 +1,8 @@ @rem Builds gRPC NuGet packages @rem Current package versions -set VERSION=0.6.0 -set CORE_VERSION=0.10.0 +set VERSION=0.6.1 +set CORE_VERSION=0.10.1 @rem Adjust the location of nuget.exe set NUGET=C:\nuget\nuget.exe diff --git a/src/csharp/doc/README.md b/src/csharp/doc/README.md new file mode 100644 index 0000000000..585500b5ca --- /dev/null +++ b/src/csharp/doc/README.md @@ -0,0 +1,2 @@ + +SandCastle project files to generate HTML reference documentation.
\ No newline at end of file diff --git a/src/csharp/doc/grpc_csharp_public.shfbproj b/src/csharp/doc/grpc_csharp_public.shfbproj new file mode 100644 index 0000000000..05c93f4a13 --- /dev/null +++ b/src/csharp/doc/grpc_csharp_public.shfbproj @@ -0,0 +1,70 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <PropertyGroup> + <!-- The configuration and platform will be used to determine which assemblies to include from solution and + project documentation sources --> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <SchemaVersion>2.0</SchemaVersion> + <ProjectGuid>{77e3da09-fc92-486f-a90a-99ca788e8b59}</ProjectGuid> + <SHFBSchemaVersion>2015.6.5.0</SHFBSchemaVersion> + <!-- AssemblyName, Name, and RootNamespace are not used by SHFB but Visual Studio adds them anyway --> + <AssemblyName>Documentation</AssemblyName> + <RootNamespace>Documentation</RootNamespace> + <Name>Documentation</Name> + <!-- SHFB properties --> + <FrameworkVersion>.NET Framework 4.5</FrameworkVersion> + <OutputPath>..\..\..\doc\ref\csharp\html</OutputPath> + <Language>en-US</Language> + <DocumentationSources> + <DocumentationSource sourceFile="..\Grpc.Auth\Grpc.Auth.csproj" /> +<DocumentationSource sourceFile="..\Grpc.Core\Grpc.Core.csproj" /></DocumentationSources> + <BuildAssemblerVerbosity>OnlyWarningsAndErrors</BuildAssemblerVerbosity> + <HelpFileFormat>Website</HelpFileFormat> + <IndentHtml>False</IndentHtml> + <KeepLogFile>True</KeepLogFile> + <DisableCodeBlockComponent>False</DisableCodeBlockComponent> + <CleanIntermediates>True</CleanIntermediates> + <HelpFileVersion>1.0.0.0</HelpFileVersion> + <MaximumGroupParts>2</MaximumGroupParts> + <NamespaceGrouping>False</NamespaceGrouping> + <SyntaxFilters>Standard</SyntaxFilters> + <SdkLinkTarget>Blank</SdkLinkTarget> + <RootNamespaceContainer>True</RootNamespaceContainer> + <PresentationStyle>VS2013</PresentationStyle> + <Preliminary>False</Preliminary> + <NamingMethod>MemberName</NamingMethod> + <HelpTitle>gRPC C#</HelpTitle> + <ContentPlacement>AboveNamespaces</ContentPlacement> + <HtmlHelpName>Documentation</HtmlHelpName> + </PropertyGroup> + <!-- There are no properties for these groups. AnyCPU needs to appear in order for Visual Studio to perform + the build. The others are optional common platform types that may appear. --> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' "> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x86' "> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' "> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' "> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|Win32' "> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|Win32' "> + </PropertyGroup> + <!-- Import the SHFB build targets --> + <Import Project="$(SHFBROOT)\SandcastleHelpFileBuilder.targets" /> + <!-- The pre-build and post-build event properties must appear *after* the targets file import in order to be + evaluated correctly. --> + <PropertyGroup> + <PreBuildEvent> + </PreBuildEvent> + <PostBuildEvent> + </PostBuildEvent> + <RunPostBuildEvent>OnBuildSuccess</RunPostBuildEvent> + </PropertyGroup> +</Project>
\ No newline at end of file diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index d28ce1b383..88f092573e 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -377,10 +377,12 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) { } GPR_EXPORT grpc_call *GPR_CALLTYPE -grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq, +grpcsharp_channel_create_call(grpc_channel *channel, grpc_call *parent_call, + gpr_uint32 propagation_mask, + grpc_completion_queue *cq, const char *method, const char *host, gpr_timespec deadline) { - return grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq, + return grpc_channel_create_call(channel, parent_call, propagation_mask, cq, method, host, deadline, NULL); } @@ -498,7 +500,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) { GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len, - grpc_metadata_array *initial_metadata) { + grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) { /* TODO: don't use magic number */ grpc_op ops[6]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; @@ -512,7 +514,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, ops[1].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[1].data.send_message = ctx->send_message; - ops[1].flags = 0; + ops[1].flags = write_flags; ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; ops[2].flags = 0; @@ -581,7 +583,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call, GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, - size_t send_buffer_len, grpc_metadata_array *initial_metadata) { + size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) { /* TODO: don't use magic number */ grpc_op ops[5]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; @@ -595,7 +597,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( ops[1].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[1].data.send_message = ctx->send_message; - ops[1].flags = 0; + ops[1].flags = write_flags; ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; ops[2].flags = 0; @@ -656,16 +658,22 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, - const char *send_buffer, size_t send_buffer_len) { + const char *send_buffer, size_t send_buffer_len, + gpr_uint32 write_flags, + gpr_int32 send_empty_initial_metadata) { /* TODO: don't use magic number */ - grpc_op ops[1]; + grpc_op ops[2]; + size_t nops = send_empty_initial_metadata ? 2 : 1; ops[0].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ops[0].data.send_message = ctx->send_message; - ops[0].flags = 0; + ops[0].flags = write_flags; + ops[1].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[1].data.send_initial_metadata.count = 0; + ops[1].data.send_initial_metadata.metadata = NULL; + ops[1].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpc_call_start_batch(call, ops, nops, ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -682,9 +690,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call, GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code, - const char *status_details, grpc_metadata_array *trailing_metadata) { + const char *status_details, grpc_metadata_array *trailing_metadata, + gpr_int32 send_empty_initial_metadata) { /* TODO: don't use magic number */ - grpc_op ops[1]; + grpc_op ops[2]; + size_t nops = send_empty_initial_metadata ? 2 : 1; ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].data.send_status_from_server.status = status_code; ops[0].data.send_status_from_server.status_details = @@ -696,9 +706,12 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( ops[0].data.send_status_from_server.trailing_metadata = ctx->send_status_from_server.trailing_metadata.metadata; ops[0].flags = 0; + ops[1].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[1].data.send_initial_metadata.count = 0; + ops[1].data.send_initial_metadata.metadata = NULL; + ops[1].flags = 0; - return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, - NULL); + return grpc_call_start_batch(call, ops, nops, ctx, NULL); } GPR_EXPORT grpc_call_error GPR_CALLTYPE @@ -715,16 +728,28 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) { GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) { /* TODO: don't use magic number */ - grpc_op ops[2]; - ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[0].data.send_initial_metadata.count = 0; - ops[0].data.send_initial_metadata.metadata = NULL; + grpc_op ops[1]; + ops[0].op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops[0].data.recv_close_on_server.cancelled = + (&ctx->recv_close_on_server_cancelled); ops[0].flags = 0; - ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; - ops[1].data.recv_close_on_server.cancelled = - (&ctx->recv_close_on_server_cancelled); - ops[1].flags = 0; + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); +} + +GPR_EXPORT grpc_call_error GPR_CALLTYPE +grpcsharp_call_send_initial_metadata(grpc_call *call, + grpcsharp_batch_context *ctx, + grpc_metadata_array *initial_metadata) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), + initial_metadata); + ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count; + ops[0].data.send_initial_metadata.metadata = + ctx->send_initial_metadata.metadata; + ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, NULL); @@ -859,6 +884,11 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_redirect_log(grpcsharp_log_func func) { typedef void(GPR_CALLTYPE *test_callback_funcptr)(gpr_int32 success); +/* Version info */ +GPR_EXPORT const char *GPR_CALLTYPE grpcsharp_version_string() { + return grpc_version_string(); +} + /* For testing */ GPR_EXPORT void GPR_CALLTYPE grpcsharp_test_callback(test_callback_funcptr callback) { diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 236b36616c..221d69e246 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -69,9 +69,6 @@ function zeroBuffer(size) { function emptyUnary(client, done) { var call = client.emptyCall({}, function(err, resp) { assert.ifError(err); - }); - call.on('status', function(status) { - assert.strictEqual(status.code, grpc.status.OK); if (done) { done(); } @@ -96,9 +93,6 @@ function largeUnary(client, done) { assert.ifError(err); assert.strictEqual(resp.payload.type, 'COMPRESSABLE'); assert.strictEqual(resp.payload.body.length, 314159); - }); - call.on('status', function(status) { - assert.strictEqual(status.code, grpc.status.OK); if (done) { done(); } @@ -115,9 +109,6 @@ function clientStreaming(client, done) { var call = client.streamingInputCall(function(err, resp) { assert.ifError(err); assert.strictEqual(resp.aggregated_payload_size, 74922); - }); - call.on('status', function(status) { - assert.strictEqual(status.code, grpc.status.OK); if (done) { done(); } @@ -308,9 +299,6 @@ function authTest(expected_user, scope, client, done) { assert.strictEqual(resp.payload.body.length, 314159); assert.strictEqual(resp.username, expected_user); assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE); - }); - call.on('status', function(status) { - assert.strictEqual(status.code, grpc.status.OK); if (done) { done(); } @@ -344,9 +332,6 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) { assert.ifError(err); assert.strictEqual(resp.username, expected_user); assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE); - }); - call.on('status', function(status) { - assert.strictEqual(status.code, grpc.status.OK); if (done) { done(); } @@ -358,7 +343,6 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) { client.updateMetadata = updateMetadata; makeTestCall(null, {}); } - }); }); } diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 5f7d74bca8..0f4c811ce4 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -74,11 +74,20 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; // all. This wrapper over our actual writeable ensures thread-safety and // correct ordering. GRXConcurrentWriteable *_responseWriteable; + + // The network thread wants the requestWriter to resume (when the server is ready for more input), + // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop + // it. Because a writer isn't thread-safe, we'll synchronize those operations on it. + // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or + // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to + // pause the writer immediately on writeValue:, so we need our locking to be recursive. GRXWriter *_requestWriter; // To create a retain cycle when a call is started, up until it finishes. See - // |startWithWriteable:| and |finishWithError:|. - GRPCCall *_self; + // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a + // reference to the call object if all they're interested in is the handler being executed when + // the response arrives. + GRPCCall *_retainSelf; NSMutableDictionary *_requestMetadata; NSMutableDictionary *_responseMetadata; @@ -136,11 +145,12 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; - (void)finishWithError:(NSError *)errorOrNil { // If the call isn't retained anywhere else, it can be deallocated now. - _self = nil; + _retainSelf = nil; // If there were still request messages coming, stop them. - _requestWriter.state = GRXWriterStateFinished; - _requestWriter = nil; + @synchronized(_requestWriter) { + _requestWriter.state = GRXWriterStateFinished; + } if (errorOrNil) { [_responseWriteable cancelWithError:errorOrNil]; @@ -240,12 +250,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; // Resume the request writer. GRPCCall *strongSelf = weakSelf; if (strongSelf) { - strongSelf->_requestWriter.state = GRXWriterStateStarted; + @synchronized(strongSelf->_requestWriter) { + strongSelf->_requestWriter.state = GRXWriterStateStarted; + } } }; - [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] - initWithMessage:message - handler:resumingHandler]] errorHandler:errorHandler]; + [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message + handler:resumingHandler]] + errorHandler:errorHandler]; } - (void)writeValue:(id)value { @@ -253,7 +265,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; // Pause the input and only resume it when the C layer notifies us that writes // can proceed. - _requestWriter.state = GRXWriterStatePaused; + @synchronized(_requestWriter) { + _requestWriter.state = GRXWriterStatePaused; + } __weak GRPCCall *weakSelf = self; dispatch_async(_callQueue, ^{ @@ -273,7 +287,6 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; } - (void)writesFinishedWithError:(NSError *)errorOrNil { - _requestWriter = nil; if (errorOrNil) { [self cancel]; } else { @@ -327,7 +340,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; } }]; // Now that the RPC has been initiated, request writes can start. - [_requestWriter startWithWriteable:self]; + @synchronized(_requestWriter) { + [_requestWriter startWithWriteable:self]; + } } #pragma mark GRXWriter implementation @@ -338,7 +353,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; // before being autoreleased). // Care is taken not to retain self strongly in any of the blocks used in this implementation, so // that the life of the instance is determined by this retain cycle. - _self = self; + _retainSelf = self; _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; [self sendHeaders:_requestMetadata]; diff --git a/src/objective-c/GRPCClient/private/GRPCSecureChannel.m b/src/objective-c/GRPCClient/private/GRPCSecureChannel.m index 9b4b6768f8..0a54804bb2 100644 --- a/src/objective-c/GRPCClient/private/GRPCSecureChannel.m +++ b/src/objective-c/GRPCClient/private/GRPCSecureChannel.m @@ -38,15 +38,18 @@ // Returns NULL if the file at path couldn't be read. In that case, if errorPtr isn't NULL, // *errorPtr will be an object describing what went wrong. static grpc_credentials *CertificatesAtPath(NSString *path, NSError **errorPtr) { - NSString *certsContent = [NSString stringWithContentsOfFile:path - encoding:NSASCIIStringEncoding + // Files in PEM format can have non-ASCII characters in their comments (e.g. for the name of the + // issuer). Load them as UTF8 and produce an ASCII equivalent. + NSString *contentInUTF8 = [NSString stringWithContentsOfFile:path + encoding:NSUTF8StringEncoding error:errorPtr]; - if (!certsContent) { + NSData *contentInASCII = [contentInUTF8 dataUsingEncoding:NSASCIIStringEncoding + allowLossyConversion:YES]; + if (!contentInASCII.bytes) { // Passing NULL to grpc_ssl_credentials_create produces behavior we don't want, so return. return NULL; } - const char * asCString = [certsContent cStringUsingEncoding:NSASCIIStringEncoding]; - return grpc_ssl_credentials_create(asCString, NULL); + return grpc_ssl_credentials_create(contentInASCII.bytes, NULL); } @implementation GRPCSecureChannel diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index b6296e1ed7..ca94ce275f 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -36,13 +36,11 @@ #import "GRXWriteable.h" #import "GRXWriter.h" -// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed -// to -startWithWriteable:). +// A buffered pipe is a Writer that also acts as a Writeable. // Once it is started, whatever values are written into it (via -writeValue:) will be propagated // immediately, unless flow control prevents it. // If it is throttled and keeps receiving values, as well as if it receives values before being -// started, it will buffer them and propagate them in order as soon as its state becomes -// GRXWriterStateStarted. +// started, it will buffer them and propagate them in order as soon as its state becomes Started. // If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and // propagate the error immediately. // @@ -51,6 +49,9 @@ // pipe will keep buffering all data written to it, your application could run out of memory and // crash. If you want to react to flow control signals to prevent that, instead of using this class // you can implement an object that conforms to GRXWriter. +// +// Thread-safety: +// The methods of an object of this class should not be called concurrently from different threads. @interface GRXBufferedPipe : GRXWriter<GRXWriteable> // Convenience constructor. diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h index d004333d2b..f310832284 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.h +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h @@ -33,11 +33,17 @@ #import "GRXWriter.h" -// A "proxy" class that simply forwards values, completion, and errors from its -// input writer to its writeable. +// A "proxy" class that simply forwards values, completion, and errors from its input writer to its +// writeable. // It is useful as a superclass for pipes that act as a transformation of their // input writer, and for classes that represent objects with input and // output sequences of values, like an RPC. +// +// Thread-safety: +// All messages sent to this object need to be serialized. When it is started, the writer it wraps +// is started in the same thread. Manual state changes are propagated to the wrapped writer in the +// same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be +// serialized with any message sent to this object. @interface GRXForwardingWriter : GRXWriter - (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER; @end diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m index 2342f51ab3..a72be9ace2 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.m +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m @@ -48,7 +48,11 @@ // Designated initializer - (instancetype)initWithWriter:(GRXWriter *)writer { if (!writer) { - [NSException raise:NSInvalidArgumentException format:@"writer can't be nil."]; + return nil; + } + if (writer.state != GRXWriterStateNotStarted) { + [NSException raise:NSInvalidArgumentException + format:@"The writer argument must not have already started."]; } if ((self = [super init])) { _writer = writer; diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.h b/src/objective-c/RxLibrary/GRXImmediateWriter.h index b171f0c760..3fcc259434 100644 --- a/src/objective-c/RxLibrary/GRXImmediateWriter.h +++ b/src/objective-c/RxLibrary/GRXImmediateWriter.h @@ -36,10 +36,17 @@ #import "GRXWriter.h" // Utility to construct GRXWriter instances from values that are immediately available when -// required. The returned writers all support pausing and early termination. +// required. // -// Unless the writeable callback pauses them or stops them early, these writers will do all their -// interactions with the writeable before the start method returns. +// Thread-safety: +// +// An object of this class shouldn't be messaged concurrently by more than one thread. It will start +// messaging the writeable before |startWithWriteable:| returns, in the same thread. That is the +// only place where the writer can be paused or stopped prematurely. +// +// If a paused writer of this class is resumed, it will start messaging the writeable, in the same +// thread, before |setState:| returns. Because the object can't be legally accessed concurrently, +// that's the only place where it can be paused again (or stopped). @interface GRXImmediateWriter : GRXWriter // Returns a writer that pulls values from the passed NSEnumerator instance and pushes them to diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h index 5d6e1a472a..b1c994aa38 100644 --- a/src/objective-c/RxLibrary/GRXWriter.h +++ b/src/objective-c/RxLibrary/GRXWriter.h @@ -35,84 +35,73 @@ #import "GRXWriteable.h" +// States of a writer. typedef NS_ENUM(NSInteger, GRXWriterState) { - // The writer has not yet been given a writeable to which it can push its - // values. To have an writer transition to the Started state, send it a - // startWithWriteable: message. + // The writer has not yet been given a writeable to which it can push its values. To have a writer + // transition to the Started state, send it a startWithWriteable: message. // - // An writer's state cannot be manually set to this value. + // A writer's state cannot be manually set to this value. GRXWriterStateNotStarted, // The writer might push values to the writeable at any moment. GRXWriterStateStarted, - // The writer is temporarily paused, and won't send any more values to the - // writeable unless its state is set back to Started. The writer might still - // transition to the Finished state at any moment, and is allowed to send - // writesFinishedWithError: to its writeable. - // - // Not all implementations of writer have to support pausing, and thus - // trying to set an writer's state to this value might have no effect. + // The writer is temporarily paused, and won't send any more values to the writeable unless its + // state is set back to Started. The writer might still transition to the Finished state at any + // moment, and is allowed to send writesFinishedWithError: to its writeable. GRXWriterStatePaused, // The writer has released its writeable and won't interact with it anymore. // - // One seldomly wants to set an writer's state to this value, as its - // writeable isn't notified with a writesFinishedWithError: message. Instead, sending - // finishWithError: to the writer will make it notify the writeable and then - // transition to this state. + // One seldomly wants to set a writer's state to this value, as its writeable isn't notified with + // a writesFinishedWithError: message. Instead, sending finishWithError: to the writer will make + // it notify the writeable and then transition to this state. GRXWriterStateFinished }; -// An object that conforms to this protocol can produce, on demand, a sequence -// of values. The sequence may be produced asynchronously, and it may consist of -// any number of elements, including none or an infinite number. +// An GRXWriter object can produce, on demand, a sequence of values. The sequence may be produced +// asynchronously, and it may consist of any number of elements, including none or an infinite +// number. +// +// GRXWriter is the active dual of NSEnumerator. The difference between them is thus whether the +// object plays an active or passive role during usage: A user of NSEnumerator pulls values off it, +// and passes the values to a writeable. A user of GRXWriter, though, just gives it a writeable, and +// the GRXWriter instance pushes values to the writeable. This makes this protocol suitable to +// represent a sequence of future values, as well as collections with internal iteration. // -// GRXWriter is the active dual of NSEnumerator. The difference between them -// is thus whether the object plays an active or passive role during usage: A -// user of NSEnumerator pulls values off it, and passes the values to a writeable. -// A user of GRXWriter, though, just gives it a writeable, and the -// GRXWriter instance pushes values to the writeable. This makes this protocol -// suitable to represent a sequence of future values, as well as collections -// with internal iteration. +// An instance of GRXWriter can start producing values after a writeable is passed to it. It can +// also be commanded to finish the sequence immediately (with an optional error). Finally, it can be +// asked to pause, and resumed later. All GRXWriter objects support pausing and early termination. // -// An instance of GRXWriter can start producing values after a writeable is -// passed to it. It can also be commanded to finish the sequence immediately -// (with an optional error). Finally, it can be asked to pause, but the -// conforming instance is not required to oblige. +// Thread-safety: // -// Unless otherwise indicated by a conforming class, no messages should be sent -// concurrently to a GRXWriter. I.e., conforming classes aren't required to -// be thread-safe. +// State transitions take immediate effect if the object is used from a single thread. Subclasses +// might offer stronger guarantees. +// +// Unless otherwise indicated by a conforming subclass, no messages should be sent concurrently to a +// GRXWriter. I.e., conforming classes aren't required to be thread-safe. @interface GRXWriter : NSObject -// This property can be used to query the current state of the writer, which -// determines how it might currently use its writeable. Some state transitions can -// be triggered by setting this property to the corresponding value, and that's -// useful for advanced use cases like pausing an writer. For more details, -// see the documentation of the enum. +// This property can be used to query the current state of the writer, which determines how it might +// currently use its writeable. Some state transitions can be triggered by setting this property to +// the corresponding value, and that's useful for advanced use cases like pausing an writer. For +// more details, see the documentation of the enum further down. @property(nonatomic) GRXWriterState state; -// Start sending messages to the writeable. Messages may be sent before the method -// returns, or they may be sent later in the future. See GRXWriteable.h for the -// different messages a writeable can receive. +// Transition to the Started state, and start sending messages to the writeable (a reference to it +// is retained). Messages to the writeable may be sent before the method returns, or they may be +// sent later in the future. See GRXWriteable.h for the different messages a writeable can receive. // -// If this writer draws its values from an external source (e.g. from the -// filesystem or from a server), calling this method will commonly trigger side -// effects (like network connections). +// If this writer draws its values from an external source (e.g. from the filesystem or from a +// server), calling this method will commonly trigger side effects (like network connections). // // This method might only be called on writers in the NotStarted state. - (void)startWithWriteable:(id<GRXWriteable>)writeable; -// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send -// any more messages to it. -// -// This method might only be called on writers in the Started or Paused -// state. +// Send writesFinishedWithError:errorOrNil to the writeable. Then release the reference to it and +// transition to the Finished state. // -// TODO(jcanizales): Consider adding some guarantee about the immediacy of that -// stopping. I know I've relied on it in part of the code that uses this, but -// can't remember the details in the presence of concurrency. +// This method might only be called on writers in the Started or Paused state. - (void)finishWithError:(NSError *)errorOrNil; @end diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index e85dd6e65c..f23102988b 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -114,7 +114,7 @@ static ProtoMethod *kUnaryCallMethod; [call startWithWriteable:responsesWriteable]; - [self waitForExpectationsWithTimeout:4 handler:nil]; + [self waitForExpectationsWithTimeout:8 handler:nil]; } - (void)testSimpleProtoRPC { @@ -146,7 +146,7 @@ static ProtoMethod *kUnaryCallMethod; [call startWithWriteable:responsesWriteable]; - [self waitForExpectationsWithTimeout:4 handler:nil]; + [self waitForExpectationsWithTimeout:8 handler:nil]; } - (void)testMetadata { diff --git a/src/objective-c/tests/InteropTests.h b/src/objective-c/tests/InteropTests.h index 4eb97e9e06..1045c3d124 100644 --- a/src/objective-c/tests/InteropTests.h +++ b/src/objective-c/tests/InteropTests.h @@ -37,8 +37,7 @@ // https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md @interface InteropTests : XCTestCase -// Returns @"localhost:5050". +// Returns @"grpc-test.sandbox.google.com". // Override in a subclass to perform the same tests against a different address. -// For interop tests, use @"grpc-test.sandbox.google.com". + (NSString *)host; @end diff --git a/src/objective-c/tests/InteropTests.m b/src/objective-c/tests/InteropTests.m index b61d567464..1b63fe2059 100644 --- a/src/objective-c/tests/InteropTests.m +++ b/src/objective-c/tests/InteropTests.m @@ -78,20 +78,17 @@ #pragma mark Tests -static NSString * const kLocalCleartextHost = @"localhost:5050"; +static NSString * const kRemoteSSLHost = @"grpc-test.sandbox.google.com"; @implementation InteropTests { RMTTestService *_service; } + (NSString *)host { - return kLocalCleartextHost; + return kRemoteSSLHost; } - (void)setUp { - // Register test server as non-SSL. - [GRPCCall useInsecureConnectionsForHost:kLocalCleartextHost]; - _service = [[RMTTestService alloc] initWithHost:self.class.host]; } @@ -131,7 +128,7 @@ static NSString * const kLocalCleartextHost = @"localhost:5050"; [expectation fulfill]; }]; - [self waitForExpectationsWithTimeout:8 handler:nil]; + [self waitForExpectationsWithTimeout:16 handler:nil]; } - (void)testClientStreamingRPC { diff --git a/src/objective-c/tests/InteropTestsLocalCleartext.m b/src/objective-c/tests/InteropTestsLocalCleartext.m new file mode 100644 index 0000000000..2d7d3c4b2c --- /dev/null +++ b/src/objective-c/tests/InteropTestsLocalCleartext.m @@ -0,0 +1,59 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// Repeat of the tests in InteropTests.m, but sending the RPCs to a local cleartext server instead +// of the remote SSL one. + +#import <GRPCClient/GRPCCall+Tests.h> + +#import "InteropTests.h" + +static NSString * const kLocalCleartextHost = @"localhost:5050"; + +@interface InteropTestsLocalCleartext : InteropTests +@end + +@implementation InteropTestsLocalCleartext + ++ (NSString *)host { + return kLocalCleartextHost; +} + +- (void)setUp { + // Register test server as non-SSL. + [GRPCCall useInsecureConnectionsForHost:kLocalCleartextHost]; + + [super setUp]; +} + +@end diff --git a/src/objective-c/tests/InteropTestsLocalSSL.m b/src/objective-c/tests/InteropTestsLocalSSL.m index 227ca79659..f69f806dcf 100644 --- a/src/objective-c/tests/InteropTestsLocalSSL.m +++ b/src/objective-c/tests/InteropTestsLocalSSL.m @@ -31,8 +31,8 @@ * */ -// Repeat of the tests in InteropTests.m, but using SSL to communicate with the local server instead -// of cleartext. +// Repeat of the tests in InteropTests.m, but sending the RPCs to a local SSL server instead of the +// remote one. #import <GRPCClient/GRPCCall+Tests.h> diff --git a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj index af98aba9c0..3a1c3d940a 100644 --- a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj +++ b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj @@ -13,6 +13,7 @@ 63423F511B151B77006CF63C /* RxLibraryUnitTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 63423F501B151B77006CF63C /* RxLibraryUnitTests.m */; }; 635697CD1B14FC11007A7283 /* Tests.m in Sources */ = {isa = PBXBuildFile; fileRef = 635697CC1B14FC11007A7283 /* Tests.m */; }; 635ED2EC1B1A3BC400FDE5C3 /* InteropTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */; }; + 63715F561B780C020029CB0B /* InteropTestsLocalCleartext.m in Sources */ = {isa = PBXBuildFile; fileRef = 63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */; }; 63E240CE1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m in Sources */ = {isa = PBXBuildFile; fileRef = 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */; }; 63E240D01B6C63DC005F3B0E /* TestCertificates.bundle in Resources */ = {isa = PBXBuildFile; fileRef = 63E240CF1B6C63DC005F3B0E /* TestCertificates.bundle */; }; 7D8A186224D39101F90230F6 /* libPods.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 35F2B6BF3BAE8F0DC4AFD76E /* libPods.a */; }; @@ -51,6 +52,7 @@ 635697CC1B14FC11007A7283 /* Tests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = Tests.m; sourceTree = "<group>"; }; 635697D81B14FC11007A7283 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; }; 635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTests.m; sourceTree = "<group>"; }; + 63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTestsLocalCleartext.m; sourceTree = "<group>"; }; 63E240CC1B6C4D3A005F3B0E /* InteropTests.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = InteropTests.h; sourceTree = "<group>"; }; 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTestsLocalSSL.m; sourceTree = "<group>"; }; 63E240CF1B6C63DC005F3B0E /* TestCertificates.bundle */ = {isa = PBXFileReference; lastKnownFileType = "wrapper.plug-in"; path = TestCertificates.bundle; sourceTree = "<group>"; }; @@ -117,14 +119,15 @@ 635697C91B14FC11007A7283 /* Tests */ = { isa = PBXGroup; children = ( - 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */, 6312AE4D1B1BF49B00341DEE /* GRPCClientTests.m */, - 63175DFE1B1B9FAF00027841 /* LocalClearTextTests.m */, + 63E240CC1B6C4D3A005F3B0E /* InteropTests.h */, 635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */, + 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */, + 63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */, 63423F501B151B77006CF63C /* RxLibraryUnitTests.m */, + 63175DFE1B1B9FAF00027841 /* LocalClearTextTests.m */, 635697CC1B14FC11007A7283 /* Tests.m */, 635697D71B14FC11007A7283 /* Supporting Files */, - 63E240CC1B6C4D3A005F3B0E /* InteropTests.h */, ); name = Tests; sourceTree = SOURCE_ROOT; @@ -261,6 +264,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + 63715F561B780C020029CB0B /* InteropTestsLocalCleartext.m in Sources */, 63175DFF1B1B9FAF00027841 /* LocalClearTextTests.m in Sources */, 63423F511B151B77006CF63C /* RxLibraryUnitTests.m in Sources */, 63E240CE1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m in Sources */, diff --git a/src/python/grpcio/grpc/_adapter/_c/module.c b/src/python/grpcio/grpc/_adapter/_c/module.c index 1f3aedd9d8..9b93b051f6 100644 --- a/src/python/grpcio/grpc/_adapter/_c/module.c +++ b/src/python/grpcio/grpc/_adapter/_c/module.c @@ -53,6 +53,12 @@ PyMODINIT_FUNC init_c(void) { return; } + if (PyModule_AddStringConstant( + module, "PRIMARY_USER_AGENT_KEY", + GRPC_ARG_PRIMARY_USER_AGENT_STRING) < 0) { + return; + } + /* GRPC maintains an internal counter of how many times it has been initialized and handles multiple pairs of grpc_init()/grpc_shutdown() invocations accordingly. */ diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py index dcf67dbc11..239aac81b2 100644 --- a/src/python/grpcio/grpc/_adapter/_low.py +++ b/src/python/grpcio/grpc/_adapter/_low.py @@ -27,9 +27,12 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from grpc import _grpcio_metadata from grpc._adapter import _c from grpc._adapter import _types +_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__) + ClientCredentials = _c.ClientCredentials ServerCredentials = _c.ServerCredentials @@ -76,6 +79,7 @@ class Call(_types.Call): class Channel(_types.Channel): def __init__(self, target, args, creds=None): + args = list(args) + [(_c.PRIMARY_USER_AGENT_KEY, _USER_AGENT)] if creds is None: self.channel = _c.Channel(target, args) else: diff --git a/src/python/grpcio_test/grpc_interop/_interop_test_case.py b/src/python/grpcio_test/grpc_interop/_interop_test_case.py index ed8f7ef009..b6d06b300d 100644 --- a/src/python/grpcio_test/grpc_interop/_interop_test_case.py +++ b/src/python/grpcio_test/grpc_interop/_interop_test_case.py @@ -59,3 +59,6 @@ class InteropTestCase(object): def testCancelAfterFirstResponse(self): methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(self.stub, None) + + def testTimeoutOnSleepingServer(self): + methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability(self.stub, None) diff --git a/src/python/grpcio_test/grpc_interop/methods.py b/src/python/grpcio_test/grpc_interop/methods.py index f4c94685ee..7a831f3cbd 100644 --- a/src/python/grpcio_test/grpc_interop/methods.py +++ b/src/python/grpcio_test/grpc_interop/methods.py @@ -33,10 +33,12 @@ import enum import json import os import threading +import time from oauth2client import client as oauth2client_client from grpc.framework.alpha import utilities +from grpc.framework.alpha import exceptions from grpc_interop import empty_pb2 from grpc_interop import messages_pb2 @@ -318,6 +320,24 @@ def _cancel_after_first_response(stub): raise ValueError('expected call to be cancelled') +def _timeout_on_sleeping_server(stub): + request_payload_size = 27182 + with stub, _Pipe() as pipe: + response_iterator = stub.FullDuplexCall(pipe, 0.001) + + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + payload=messages_pb2.Payload(body=b'\x00' * request_payload_size)) + pipe.add(request) + time.sleep(0.1) + try: + next(response_iterator) + except exceptions.ExpirationError: + pass + else: + raise ValueError('expected call to exceed deadline') + + def _compute_engine_creds(stub, args): response = _large_unary_common_behavior(stub, True, True) if args.default_service_account != response.username: @@ -351,6 +371,7 @@ class TestCase(enum.Enum): CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response' COMPUTE_ENGINE_CREDS = 'compute_engine_creds' SERVICE_ACCOUNT_CREDS = 'service_account_creds' + TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server' def test_interoperability(self, stub, args): if self is TestCase.EMPTY_UNARY: @@ -367,6 +388,8 @@ class TestCase(enum.Enum): _cancel_after_begin(stub) elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE: _cancel_after_first_response(stub) + elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER: + _timeout_on_sleeping_server(stub) elif self is TestCase.COMPUTE_ENGINE_CREDS: _compute_engine_creds(stub, args) elif self is TestCase.SERVICE_ACCOUNT_CREDS: diff --git a/src/python/grpcio_test/grpc_protoc_plugin/__init__.py b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py new file mode 100644 index 0000000000..b200d129a9 --- /dev/null +++ b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py @@ -0,0 +1,541 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse +import contextlib +import distutils.spawn +import errno +import itertools +import os +import pkg_resources +import shutil +import subprocess +import sys +import tempfile +import threading +import time +import unittest + +from grpc.framework.alpha import exceptions +from grpc.framework.foundation import future + +# Identifiers of entities we expect to find in the generated module. +SERVICER_IDENTIFIER = 'EarlyAdopterTestServiceServicer' +SERVER_IDENTIFIER = 'EarlyAdopterTestServiceServer' +STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub' +SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server' +STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub' + +# The timeout used in tests of RPCs that are supposed to expire. +SHORT_TIMEOUT = 2 +# The timeout used in tests of RPCs that are not supposed to expire. The +# absurdly large value doesn't matter since no passing execution of this test +# module will ever wait the duration. +LONG_TIMEOUT = 600 +NO_DELAY = 0 + + +class _ServicerMethods(object): + + def __init__(self, test_pb2, delay): + self._condition = threading.Condition() + self._delay = delay + self._paused = False + self._fail = False + self._test_pb2 = test_pb2 + + @contextlib.contextmanager + def pause(self): # pylint: disable=invalid-name + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + @contextlib.contextmanager + def fail(self): # pylint: disable=invalid-name + with self._condition: + self._fail = True + yield + with self._condition: + self._fail = False + + def _control(self): # pylint: disable=invalid-name + with self._condition: + if self._fail: + raise ValueError() + while self._paused: + self._condition.wait() + time.sleep(self._delay) + + def UnaryCall(self, request, unused_rpc_context): + response = self._test_pb2.SimpleResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * request.response_size + self._control() + return response + + def StreamingOutputCall(self, request, unused_rpc_context): + for parameter in request.response_parameters: + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + yield response + + def StreamingInputCall(self, request_iter, unused_rpc_context): + response = self._test_pb2.StreamingInputCallResponse() + aggregated_payload_size = 0 + for request in request_iter: + aggregated_payload_size += len(request.payload.payload_compressable) + response.aggregated_payload_size = aggregated_payload_size + self._control() + return response + + def FullDuplexCall(self, request_iter, unused_rpc_context): + for request in request_iter: + for parameter in request.response_parameters: + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + yield response + + def HalfDuplexCall(self, request_iter, unused_rpc_context): + responses = [] + for request in request_iter: + for parameter in request.response_parameters: + response = self._test_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._test_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + responses.append(response) + for response in responses: + yield response + + +@contextlib.contextmanager +def _CreateService(test_pb2, delay): + """Provides a servicer backend and a stub. + + The servicer is just the implementation + of the actual servicer passed to the face player of the python RPC + implementation; the two are detached. + + Non-zero delay puts a delay on each call to the servicer, representative of + communication latency. Timeout is the default timeout for the stub while + waiting for the service. + + Args: + test_pb2: The test_pb2 module generated by this test. + delay: Delay in seconds per response from the servicer. + + Yields: + A (servicer_methods, servicer, stub) three-tuple where servicer_methods is + the back-end of the service bound to the stub and the server and stub + are both activated and ready for use. + """ + servicer_methods = _ServicerMethods(test_pb2, delay) + + class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)): + + def UnaryCall(self, request, context): + return servicer_methods.UnaryCall(request, context) + + def StreamingOutputCall(self, request, context): + return servicer_methods.StreamingOutputCall(request, context) + + def StreamingInputCall(self, request_iter, context): + return servicer_methods.StreamingInputCall(request_iter, context) + + def FullDuplexCall(self, request_iter, context): + return servicer_methods.FullDuplexCall(request_iter, context) + + def HalfDuplexCall(self, request_iter, context): + return servicer_methods.HalfDuplexCall(request_iter, context) + + servicer = Servicer() + server = getattr( + test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0) + with server: + port = server.port() + stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port) + with stub: + yield servicer_methods, stub, server + + +def _streaming_input_request_iterator(test_pb2): + for _ in range(3): + request = test_pb2.StreamingInputCallRequest() + request.payload.payload_type = test_pb2.COMPRESSABLE + request.payload.payload_compressable = 'a' + yield request + + +def _streaming_output_request(test_pb2): + request = test_pb2.StreamingOutputCallRequest() + sizes = [1, 2, 3] + request.response_parameters.add(size=sizes[0], interval_us=0) + request.response_parameters.add(size=sizes[1], interval_us=0) + request.response_parameters.add(size=sizes[2], interval_us=0) + return request + + +def _full_duplex_request_iterator(test_pb2): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + + +class PythonPluginTest(unittest.TestCase): + """Test case for the gRPC Python protoc-plugin. + + While reading these tests, remember that the futures API + (`stub.method.async()`) only gives futures for the *non-streaming* responses, + else it behaves like its blocking cousin. + """ + + def setUp(self): + # Assume that the appropriate protoc and grpc_python_plugins are on the + # path. + protoc_command = 'protoc' + protoc_plugin_filename = distutils.spawn.find_executable( + 'grpc_python_plugin') + test_proto_filename = pkg_resources.resource_filename( + 'grpc_protoc_plugin', 'test.proto') + if not os.path.isfile(protoc_command): + # Assume that if we haven't built protoc that it's on the system. + protoc_command = 'protoc' + + # Ensure that the output directory exists. + self.outdir = tempfile.mkdtemp() + + # Invoke protoc with the plugin. + cmd = [ + protoc_command, + '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename, + '-I .', + '--python_out=%s' % self.outdir, + '--python-grpc_out=%s' % self.outdir, + os.path.basename(test_proto_filename), + ] + subprocess.check_call(' '.join(cmd), shell=True, env=os.environ, + cwd=os.path.dirname(test_proto_filename)) + sys.path.append(self.outdir) + + def tearDown(self): + try: + shutil.rmtree(self.outdir) + except OSError as exc: + if exc.errno != errno.ENOENT: + raise + + # TODO(atash): Figure out which of these tests is hanging flakily with small + # probability. + + def testImportAttributes(self): + # check that we can access the generated module and its members. + import test_pb2 # pylint: disable=g-import-not-at-top + self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, SERVER_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None)) + self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None)) + + def testUpDown(self): + import test_pb2 + with _CreateService( + test_pb2, NO_DELAY) as (servicer, stub, unused_server): + request = test_pb2.SimpleRequest(response_size=13) + + def testUnaryCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. + request = test_pb2.SimpleRequest(response_size=13) + response = stub.UnaryCall(request, timeout) + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testUnaryCallAsync(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = test_pb2.SimpleRequest(response_size=13) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + # Check that the call does not block waiting for the server to respond. + with methods.pause(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) + response = response_future.result() + expected_response = methods.UnaryCall(request, 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testUnaryCallAsyncExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + request = test_pb2.SimpleRequest(response_size=13) + with methods.pause(): + response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testUnaryCallAsyncCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = test_pb2.SimpleRequest(response_size=13) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.UnaryCall.async(request, 1) + response_future.cancel() + self.assertTrue(response_future.cancelled()) + + def testUnaryCallAsyncFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = test_pb2.SimpleRequest(response_size=13) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + response_future = stub.UnaryCall.async(request, LONG_TIMEOUT) + self.assertIsNotNone(response_future.exception()) + + def testStreamingOutputCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.StreamingOutputCall(request, LONG_TIMEOUT) + expected_responses = methods.StreamingOutputCall( + request, 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testStreamingOutputCallExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + list(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testStreamingOutputCallCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + unused_methods, stub, unused_server): + responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT) + next(responses) + responses.cancel() + with self.assertRaises(future.CancelledError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this times out ' + 'instead of raising the proper error.') + def testStreamingOutputCallFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request = _streaming_output_request(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + responses = stub.StreamingOutputCall(request, 1) + self.assertIsNotNone(responses) + with self.assertRaises(exceptions.ServicerError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testStreamingInputCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + response = stub.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testStreamingInputCallAsync(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT) + response = response_future.result() + expected_response = methods.StreamingInputCall( + _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testStreamingInputCallAsyncExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + + def testStreamingInputCallAsyncCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods. + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), timeout) + response_future.cancel() + self.assertTrue(response_future.cancelled()) + with self.assertRaises(future.CancelledError): + response_future.result() + + def testStreamingInputCallAsyncFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + response_future = stub.StreamingInputCall.async( + _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT) + self.assertIsNotNone(response_future.exception()) + + def testFullDuplexCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + responses = stub.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT) + expected_responses = methods.FullDuplexCall( + _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!') + for expected_response, response in itertools.izip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testFullDuplexCallExpired(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request_iterator = _full_duplex_request_iterator(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.pause(): + responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + list(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testFullDuplexCallCancelled(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + request_iterator = _full_duplex_request_iterator(test_pb2) + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) + next(responses) + responses.cancel() + with self.assertRaises(future.CancelledError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this hangs forever ' + 'and fix.') + def testFullDuplexCallFailed(self): + import test_pb2 # pylint: disable=g-import-not-at-top + request_iterator = _full_duplex_request_iterator(test_pb2) + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + with methods.fail(): + responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT) + self.assertIsNotNone(responses) + with self.assertRaises(exceptions.ServicerError): + next(responses) + + @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs ' + 'forever and fix.') + def testHalfDuplexCall(self): + import test_pb2 # pylint: disable=g-import-not-at-top + with _CreateService(test_pb2, NO_DELAY) as ( + methods, stub, unused_server): + def half_duplex_request_iterator(): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), LONG_TIMEOUT) + expected_responses = methods.HalfDuplexCall( + half_duplex_request_iterator(), 'not a real RpcContext!') + for check in itertools.izip_longest(expected_responses, responses): + expected_response, response = check + self.assertEqual(expected_response, response) + + def testHalfDuplexCallWedged(self): + import test_pb2 # pylint: disable=g-import-not-at-top + condition = threading.Condition() + wait_cell = [False] + @contextlib.contextmanager + def wait(): # pylint: disable=invalid-name + # Where's Python 3's 'nonlocal' statement when you need it? + with condition: + wait_cell[0] = True + yield + with condition: + wait_cell[0] = False + condition.notify_all() + def half_duplex_request_iterator(): + request = test_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + with condition: + while wait_cell[0]: + condition.wait() + with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server): + with wait(): + responses = stub.HalfDuplexCall( + half_duplex_request_iterator(), SHORT_TIMEOUT) + # half-duplex waits for the client to send all info + with self.assertRaises(exceptions.ExpirationError): + next(responses) + + +if __name__ == '__main__': + os.chdir(os.path.dirname(sys.argv[0])) + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_protoc_plugin/test.proto b/src/python/grpcio_test/grpc_protoc_plugin/test.proto new file mode 100644 index 0000000000..ed7c6a7b79 --- /dev/null +++ b/src/python/grpcio_test/grpc_protoc_plugin/test.proto @@ -0,0 +1,139 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. +// This file is duplicated around the code base. See GitHub issue #526. +syntax = "proto2"; + +package grpc.testing; + +enum PayloadType { + // Compressable text format. + COMPRESSABLE= 1; + + // Uncompressable binary format. + UNCOMPRESSABLE = 2; + + // Randomly chosen from all other formats defined in this enum. + RANDOM = 3; +} + +message Payload { + required PayloadType payload_type = 1; + oneof payload_body { + string payload_compressable = 2; + bytes payload_uncompressable = 3; + } +} + +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + // Desired payload size in the response from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + optional int32 response_size = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message SimpleResponse { + optional Payload payload = 1; +} + +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + optional Payload payload = 1; + + // Not expecting any payload from the response. +} + +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + optional int32 aggregated_payload_size = 1; +} + +message ResponseParameters { + // Desired payload sizes in responses from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + required int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + required int32 interval_us = 2; +} + +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message StreamingOutputCallResponse { + optional Payload payload = 1; +} + +service TestService { + // One request followed by one response. + // The server returns the client payload as-is. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by a sequence of responses (streamed download). + // The server returns the payload with client desired type and sizes. + rpc StreamingOutputCall(StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by one response (streamed upload). + // The server returns the aggregated size of client payload as the result. + rpc StreamingInputCall(stream StreamingInputCallRequest) + returns (StreamingInputCallResponse); + + // A sequence of requests with each request served by the server immediately. + // As one request could lead to multiple responses, this interface + // demonstrates the idea of full duplexing. + rpc FullDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by a sequence of responses. + // The server buffers all the client requests and then serves them in order. A + // stream of responses are returned to the client when the server starts with + // first request. + rpc HalfDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); +} diff --git a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py index 9a8edfad0c..b6583662f3 100644 --- a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py +++ b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py @@ -31,11 +31,12 @@ import threading import time import unittest +from grpc import _grpcio_metadata from grpc._adapter import _types from grpc._adapter import _low -def WaitForEvents(completion_queues, deadline): +def wait_for_events(completion_queues, deadline): """ Args: completion_queues: list of completion queues to wait for events on @@ -62,6 +63,7 @@ def WaitForEvents(completion_queues, deadline): thread.join() return results + class InsecureServerInsecureClient(unittest.TestCase): def setUp(self): @@ -123,16 +125,21 @@ class InsecureServerInsecureClient(unittest.TestCase): ], client_call_tag) self.assertEquals(_types.CallError.OK, client_start_batch_result) - client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2) + client_no_event, request_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 2) self.assertEquals(client_no_event, None) self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type) self.assertIsInstance(request_event.call, _low.Call) self.assertIs(server_request_tag, request_event.tag) self.assertEquals(1, len(request_event.results)) - got_initial_metadata = dict(request_event.results[0].initial_metadata) + received_initial_metadata = dict(request_event.results[0].initial_metadata) + # Check that our metadata were transmitted self.assertEquals( dict(client_initial_metadata), - dict((x, got_initial_metadata[x]) for x in zip(*client_initial_metadata)[0])) + dict((x, received_initial_metadata[x]) for x in zip(*client_initial_metadata)[0])) + # Check that Python's user agent string is a part of the full user agent + # string + self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__), + received_initial_metadata['user-agent']) self.assertEquals(METHOD, request_event.call_details.method) self.assertEquals(HOST, request_event.call_details.host) self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE) @@ -150,7 +157,7 @@ class InsecureServerInsecureClient(unittest.TestCase): ], server_call_tag) self.assertEquals(_types.CallError.OK, server_start_batch_result) - client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1) + client_event, server_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 1) self.assertEquals(6, len(client_event.results)) found_client_op_types = set() diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py index 925c32720f..a6203cae2d 100644 --- a/src/python/grpcio_test/setup.py +++ b/src/python/grpcio_test/setup.py @@ -48,8 +48,13 @@ _PACKAGE_DIRECTORIES = { _PACKAGE_DATA = { 'grpc_interop': [ - 'credentials/ca.pem', 'credentials/server1.key', - 'credentials/server1.pem',] + 'credentials/ca.pem', + 'credentials/server1.key', + 'credentials/server1.pem', + ], + 'grpc_protoc_plugin': [ + 'test.proto', + ], } _SETUP_REQUIRES = ( @@ -75,5 +80,5 @@ setuptools.setup( package_data=_PACKAGE_DATA, install_requires=_INSTALL_REQUIRES + _SETUP_REQUIRES, setup_requires=_SETUP_REQUIRES, - cmdclass=_COMMAND_CLASS + cmdclass=_COMMAND_CLASS, ) diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index ec6cb912f1..70f0795f29 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -179,6 +179,19 @@ static VALUE grpc_rb_call_cancel(VALUE self) { return Qnil; } +/* Called to obtain the peer that this call is connected to. */ +static VALUE grpc_rb_call_get_peer(VALUE self) { + VALUE res = Qnil; + grpc_call *call = NULL; + char *peer = NULL; + TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); + peer = grpc_call_get_peer(call); + res = rb_str_new2(peer); + gpr_free(peer); + + return res; +} + /* call-seq: status = call.status @@ -720,6 +733,7 @@ void Init_grpc_call() { /* Add ruby analogues of the Call methods. */ rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4); rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0); + rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0); rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0); rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1); rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0); diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index a80e484fe1..01762980fc 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -37,6 +37,7 @@ #include <grpc/grpc.h> #include <grpc/grpc_security.h> +#include <grpc/support/alloc.h> #include "rb_grpc.h" #include "rb_call.h" #include "rb_channel_args.h" @@ -250,6 +251,21 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { return Qnil; } + +/* Called to obtain the target that this channel accesses. */ +static VALUE grpc_rb_channel_get_target(VALUE self) { + grpc_rb_channel *wrapper = NULL; + VALUE res = Qnil; + char* target = NULL; + + TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); + target = grpc_channel_get_target(wrapper->wrapped); + res = rb_str_new2(target); + gpr_free(target); + + return res; +} + void Init_grpc_channel() { grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject); grpc_rb_cChannel = @@ -266,6 +282,7 @@ void Init_grpc_channel() { /* Add ruby analogues of the Channel methods. */ rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call, 4); + rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0); rb_define_alias(grpc_rb_cChannel, "close", "destroy"); diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index dd4e27df51..45f31329e9 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -16,12 +16,15 @@ Gem::Specification.new do |s| s.required_ruby_version = '>= 2.0.0' s.requirements << 'libgrpc ~> 0.10.0 needs to be installed' - s.files = `git ls-files`.split("\n") - s.test_files = `git ls-files -- spec/*`.split("\n") - s.executables = `git ls-files -- bin/*.rb`.split("\n").map do |f| - File.basename(f) + s.files = %w( Rakefile ) + s.files += Dir.glob('lib/**/*') + s.files += Dir.glob('ext/**/*') + s.files += Dir.glob('bin/**/*') + s.test_files = Dir.glob('spec/**/*') + %w(math noproto).each do |b| + s.executables += [ "#{b}_client.rb", "#{b}_server.rb" ] end - s.require_paths = ['lib'] + s.require_paths = %w( bin lib ) s.platform = Gem::Platform::RUBY s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index 7b2c04aa22..745eab437e 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -28,6 +28,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc/generic/active_call' +require 'grpc/version' # GRPC contains the General RPC module. module GRPC @@ -36,8 +37,8 @@ module GRPC include Core::StatusCodes include Core::TimeConsts - # Default timeout is 5 seconds. - DEFAULT_TIMEOUT = 5 + # Default timeout is infinity. + DEFAULT_TIMEOUT = INFINITE_FUTURE # setup_channel is used by #initialize to constuct a channel from its # arguments. @@ -46,6 +47,7 @@ module GRPC fail(TypeError, '!Channel') unless alt_chan.is_a?(Core::Channel) return alt_chan end + kw['grpc.primary_user_agent'] = "grpc-ruby/#{VERSION}" return Core::Channel.new(host, kw) if creds.nil? fail(TypeError, '!Credentials') unless creds.is_a?(Core::Credentials) Core::Channel.new(host, kw, creds) diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index 0e85441209..ed8032517b 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -69,6 +69,23 @@ shared_examples 'basic GRPC message delivery is OK' do include GRPC::Core include_context 'setup: tags' + context 'the test channel' do + it 'should have a target' do + expect(@ch.target).to be_a(String) + end + end + + context 'a client call' do + it 'should have a peer' do + expect(new_client_call.peer).to be_a(String) + end + end + + it 'calls have peer info' do + call = new_client_call + expect(call.peer).to be_a(String) + end + it 'servers receive requests from clients and can respond' do call = new_client_call server_call = nil |