aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-08-12 01:14:17 +0200
committerGravatar Nicolas "Pixel" Noble <pixel@nobis-crew.org>2015-08-12 01:14:17 +0200
commitb457cd831ad519d5ec023005ba6ccfb52ff4c9cb (patch)
tree75575c3da2adaf0e44a382558428b0eb47f5ab43 /src
parentf0b417dc84d52ebd3890751b0ee331f9eee49e37 (diff)
parent73578f7f62df4fd03035ad8e04c43b492649a064 (diff)
Merge remote-tracking branch 'google/master' into the-ultimate-showdown
Conflicts: src/csharp/ext/grpc_csharp_ext.c
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/client_channel.c1
-rw-r--r--src/core/security/google_default_credentials.c6
-rw-r--r--src/core/surface/call.c14
-rw-r--r--src/core/surface/secure_channel_create.c4
-rw-r--r--src/core/surface/version.c2
-rw-r--r--src/core/tsi/transport_security_interface.h2
-rw-r--r--src/cpp/client/secure_credentials.cc9
-rw-r--r--src/cpp/common/auth_property_iterator.cc2
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs296
-rw-r--r--src/csharp/Grpc.Core.Tests/CompressionTest.cs128
-rw-r--r--src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs122
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj4
-rw-r--r--src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/MockServiceHelper.cs248
-rw-r--r--src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs136
-rw-r--r--src/csharp/Grpc.Core.Tests/TimeoutsTest.cs152
-rw-r--r--src/csharp/Grpc.Core/CallOptions.cs37
-rw-r--r--src/csharp/Grpc.Core/CompressionLevel.cs63
-rw-r--r--src/csharp/Grpc.Core/ContextPropagationToken.cs139
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj3
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs12
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamReader.cs2
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamWriter.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs64
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs35
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs41
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs6
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs23
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs31
-rw-r--r--src/csharp/Grpc.Core/Metadata.cs10
-rw-r--r--src/csharp/Grpc.Core/ServerCallContext.cs56
-rw-r--r--src/csharp/Grpc.Core/Version.cs2
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs2
-rw-r--r--src/csharp/Grpc.Core/WriteOptions.cs82
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs24
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs22
-rw-r--r--src/csharp/build_packages.bat4
-rw-r--r--src/csharp/doc/README.md2
-rw-r--r--src/csharp/doc/grpc_csharp_public.shfbproj70
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c76
-rw-r--r--src/node/interop/interop_client.js16
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m41
-rw-r--r--src/objective-c/GRPCClient/private/GRPCSecureChannel.m13
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h9
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.h10
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.m6
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateWriter.h13
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.h91
-rw-r--r--src/objective-c/tests/GRPCClientTests.m4
-rw-r--r--src/objective-c/tests/InteropTests.h3
-rw-r--r--src/objective-c/tests/InteropTests.m9
-rw-r--r--src/objective-c/tests/InteropTestsLocalCleartext.m59
-rw-r--r--src/objective-c/tests/InteropTestsLocalSSL.m4
-rw-r--r--src/objective-c/tests/Tests.xcodeproj/project.pbxproj10
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/module.c6
-rw-r--r--src/python/grpcio/grpc/_adapter/_low.py4
-rw-r--r--src/python/grpcio_test/grpc_interop/_interop_test_case.py3
-rw-r--r--src/python/grpcio_test/grpc_interop/methods.py23
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py541
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/test.proto139
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_low_test.py17
-rw-r--r--src/python/grpcio_test/setup.py11
-rw-r--r--src/ruby/ext/grpc/rb_call.c14
-rw-r--r--src/ruby/ext/grpc/rb_channel.c17
-rwxr-xr-xsrc/ruby/grpc.gemspec13
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb6
-rw-r--r--src/ruby/spec/client_server_spec.rb17
71 files changed, 2569 insertions, 543 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index a293c93ec6..6c2e6b38a8 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -527,6 +527,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
if (old_lb_policy != NULL) {
+ grpc_lb_policy_shutdown(old_lb_policy);
GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
}
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index f368819597..d1f228665f 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -84,6 +84,8 @@ static void on_compute_engine_detection_http_response(
gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
}
+static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+
static int is_stack_running_on_compute_engine(void) {
compute_engine_detector detector;
grpc_httpcli_request request;
@@ -114,12 +116,12 @@ static int is_stack_running_on_compute_engine(void) {
while (!detector.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&detector.pollset, &worker,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
grpc_httpcli_context_destroy(&context);
- grpc_pollset_destroy(&detector.pollset);
+ grpc_pollset_shutdown(&detector.pollset, destroy_pollset, &detector.pollset);
return detector.success;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index e1a1f38a12..6a1a6cbf30 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -1544,6 +1544,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
/* Flag validation: currently allow no flags */
if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req->data.send_metadata.count = op->data.send_initial_metadata.count;
req->data.send_metadata.metadata =
@@ -1558,6 +1559,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_INVALID_MESSAGE;
}
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
req->flags = op->flags;
@@ -1569,6 +1571,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_CLOSE;
req->flags = op->flags;
break;
@@ -1579,6 +1582,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_CLIENT;
}
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
req->flags = op->flags;
req->data.send_metadata.count =
@@ -1586,6 +1590,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->data.send_metadata.metadata =
op->data.send_status_from_server.trailing_metadata;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_STATUS;
req->data.send_status.code = op->data.send_status_from_server.status;
req->data.send_status.details =
@@ -1595,6 +1600,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.send_status_from_server.status_details, 0)
: NULL;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_CLOSE;
break;
case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1604,6 +1610,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
req->data.recv_metadata->count = 0;
@@ -1613,6 +1620,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
/* Flag validation: currently allow no flags */
if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_MESSAGE;
req->data.recv_message = op->data.recv_message;
req->flags = op->flags;
@@ -1624,22 +1632,26 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_STATUS;
req->flags = op->flags;
req->data.recv_status.set_value = set_status_value_directly;
req->data.recv_status.user_data = op->data.recv_status_on_client.status;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_STATUS_DETAILS;
req->data.recv_status_details.details =
op->data.recv_status_on_client.status_details;
req->data.recv_status_details.details_capacity =
op->data.recv_status_on_client.status_details_capacity;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
req->data.recv_metadata =
op->data.recv_status_on_client.trailing_metadata;
req->data.recv_metadata->count = 0;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;
break;
@@ -1647,12 +1659,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
/* Flag validation: currently allow no flags */
if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_STATUS;
req->flags = op->flags;
req->data.recv_status.set_value = set_cancelled_value;
req->data.recv_status.user_data =
op->data.recv_close_on_server.cancelled;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;
break;
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 1f89353025..c3150250b8 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -88,8 +88,8 @@ static void on_secure_transport_setup_done(void *arg,
c->args.channel_args, secure_endpoint, c->args.metadata_context, 1);
grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
- c->result->filters[0] = &grpc_client_auth_filter;
- c->result->filters[1] = &grpc_http_client_filter;
+ c->result->filters[0] = &grpc_http_client_filter;
+ c->result->filters[1] = &grpc_client_auth_filter;
c->result->num_filters = 2;
}
notify = c->notify;
diff --git a/src/core/surface/version.c b/src/core/surface/version.c
index 4f5d648371..d7aaba3868 100644
--- a/src/core/surface/version.c
+++ b/src/core/surface/version.c
@@ -37,5 +37,5 @@
#include <grpc/grpc.h>
const char *grpc_version_string(void) {
- return "0.10.0.0";
+ return "0.10.1.0";
}
diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h
index 936b0c25b0..e27e6b9fc9 100644
--- a/src/core/tsi/transport_security_interface.h
+++ b/src/core/tsi/transport_security_interface.h
@@ -158,6 +158,8 @@ tsi_result tsi_frame_protector_protect_flush(
value is expected to be at most max_protected_frame_size minus overhead
which means that max_protected_frame_size is a safe bet. The output value
is the number of bytes actually written.
+ If *unprotected_bytes_size is unchanged, there may be more data remaining
+ to unprotect, and the caller should call this function again.
- This method returns TSI_OK in case of success. Success includes cases where
there is not enough data to output a frame in which case
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index 2d6114e06b..6cd6b77fcf 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -34,6 +34,7 @@
#include <grpc/support/log.h>
#include <grpc++/channel_arguments.h>
+#include <grpc++/impl/grpc_library.h>
#include "src/cpp/client/channel.h"
#include "src/cpp/client/secure_credentials.h"
@@ -61,12 +62,14 @@ std::shared_ptr<Credentials> WrapCredentials(grpc_credentials* creds) {
} // namespace
std::shared_ptr<Credentials> GoogleDefaultCredentials() {
+ GrpcLibrary init; // To call grpc_init().
return WrapCredentials(grpc_google_default_credentials_create());
}
// Builds SSL Credentials given SSL specific options
std::shared_ptr<Credentials> SslCredentials(
const SslCredentialsOptions& options) {
+ GrpcLibrary init; // To call grpc_init().
grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {
options.pem_private_key.c_str(), options.pem_cert_chain.c_str()};
@@ -78,6 +81,7 @@ std::shared_ptr<Credentials> SslCredentials(
// Builds credentials for use when running in GCE
std::shared_ptr<Credentials> ComputeEngineCredentials() {
+ GrpcLibrary init; // To call grpc_init().
return WrapCredentials(grpc_compute_engine_credentials_create());
}
@@ -85,6 +89,7 @@ std::shared_ptr<Credentials> ComputeEngineCredentials() {
std::shared_ptr<Credentials> ServiceAccountCredentials(
const grpc::string& json_key, const grpc::string& scope,
long token_lifetime_seconds) {
+ GrpcLibrary init; // To call grpc_init().
if (token_lifetime_seconds <= 0) {
gpr_log(GPR_ERROR,
"Trying to create ServiceAccountCredentials "
@@ -100,6 +105,7 @@ std::shared_ptr<Credentials> ServiceAccountCredentials(
// Builds JWT credentials.
std::shared_ptr<Credentials> ServiceAccountJWTAccessCredentials(
const grpc::string& json_key, long token_lifetime_seconds) {
+ GrpcLibrary init; // To call grpc_init().
if (token_lifetime_seconds <= 0) {
gpr_log(GPR_ERROR,
"Trying to create JWTCredentials with non-positive lifetime");
@@ -114,6 +120,7 @@ std::shared_ptr<Credentials> ServiceAccountJWTAccessCredentials(
// Builds refresh token credentials.
std::shared_ptr<Credentials> RefreshTokenCredentials(
const grpc::string& json_refresh_token) {
+ GrpcLibrary init; // To call grpc_init().
return WrapCredentials(
grpc_refresh_token_credentials_create(json_refresh_token.c_str()));
}
@@ -121,6 +128,7 @@ std::shared_ptr<Credentials> RefreshTokenCredentials(
// Builds access token credentials.
std::shared_ptr<Credentials> AccessTokenCredentials(
const grpc::string& access_token) {
+ GrpcLibrary init; // To call grpc_init().
return WrapCredentials(
grpc_access_token_credentials_create(access_token.c_str()));
}
@@ -129,6 +137,7 @@ std::shared_ptr<Credentials> AccessTokenCredentials(
std::shared_ptr<Credentials> IAMCredentials(
const grpc::string& authorization_token,
const grpc::string& authority_selector) {
+ GrpcLibrary init; // To call grpc_init().
return WrapCredentials(grpc_iam_credentials_create(
authorization_token.c_str(), authority_selector.c_str()));
}
diff --git a/src/cpp/common/auth_property_iterator.cc b/src/cpp/common/auth_property_iterator.cc
index e706c6c921..ba88983515 100644
--- a/src/cpp/common/auth_property_iterator.cc
+++ b/src/cpp/common/auth_property_iterator.cc
@@ -31,7 +31,7 @@
*
*/
-#include <grpc++/auth_property_iterator.h>
+#include <grpc++/auth_context.h>
#include <grpc/grpc_security.h>
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 64ea21800f..c5fc85b3fe 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -46,47 +46,18 @@ namespace Grpc.Core.Tests
public class ClientServerTest
{
const string Host = "127.0.0.1";
- const string ServiceName = "tests.Test";
-
- static readonly Method<string, string> EchoMethod = new Method<string, string>(
- MethodType.Unary,
- ServiceName,
- "Echo",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- static readonly Method<string, string> ConcatAndEchoMethod = new Method<string, string>(
- MethodType.ClientStreaming,
- ServiceName,
- "ConcatAndEcho",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- static readonly Method<string, string> NonexistentMethod = new Method<string, string>(
- MethodType.Unary,
- ServiceName,
- "NonexistentMethod",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- static readonly ServerServiceDefinition ServiceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
- .AddMethod(EchoMethod, EchoHandler)
- .AddMethod(ConcatAndEchoMethod, ConcatAndEchoHandler)
- .Build();
+ MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
- server = new Server
- {
- Services = { ServiceDefinition },
- Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
- };
+ helper = new MockServiceHelper(Host);
+ server = helper.GetServer();
server.Start();
- channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure);
+ channel = helper.GetChannel();
}
[TearDown]
@@ -103,86 +74,79 @@ namespace Grpc.Core.Tests
}
[Test]
- public void UnaryCall()
+ public async Task UnaryCall()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- Assert.AreEqual("ABC", Calls.BlockingUnaryCall(callDetails, "ABC"));
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return request;
+ });
+
+ Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
+
+ Assert.AreEqual("ABC", await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test]
public void UnaryCall_ServerHandlerThrows()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- try
+ helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
- Calls.BlockingUnaryCall(callDetails, "THROW");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
- }
+ throw new Exception("This was thrown on purpose by a test");
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
+
+ var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unknown, ex2.Status.StatusCode);
}
[Test]
public void UnaryCall_ServerHandlerThrowsRpcException()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- try
- {
- Calls.BlockingUnaryCall(callDetails, "THROW_UNAUTHENTICATED");
- Assert.Fail();
- }
- catch (RpcException e)
+ helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
- Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode);
- }
+ throw new RpcException(new Status(StatusCode.Unauthenticated, ""));
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
+
+ var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode);
}
[Test]
public void UnaryCall_ServerHandlerSetsStatus()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- try
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
- Calls.BlockingUnaryCall(callDetails, "SET_UNAUTHENTICATED");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode);
- }
- }
+ context.Status = new Status(StatusCode.Unauthenticated, "");
+ return "";
+ });
- [Test]
- public async Task AsyncUnaryCall()
- {
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- var result = await Calls.AsyncUnaryCall(callDetails, "ABC");
- Assert.AreEqual("ABC", result);
- }
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
- [Test]
- public async Task AsyncUnaryCall_ServerHandlerThrows()
- {
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- try
- {
- await Calls.AsyncUnaryCall(callDetails, "THROW");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
- }
+ var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode);
}
[Test]
public async Task ClientStreamingCall()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallOptions());
- var call = Calls.AsyncClientStreamingCall(callDetails);
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ string result = "";
+ await requestStream.ForEach(async (request) =>
+ {
+ result += request;
+ });
+ await Task.Delay(100);
+ return result;
+ });
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
await call.RequestStream.WriteAll(new string[] { "A", "B", "C" });
Assert.AreEqual("ABC", await call.ResponseAsync);
}
@@ -190,36 +154,47 @@ namespace Grpc.Core.Tests
[Test]
public async Task ClientStreamingCall_CancelAfterBegin()
{
+ var barrier = new TaskCompletionSource<object>();
+
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ barrier.SetResult(null);
+ await requestStream.ToList();
+ return "";
+ });
+
var cts = new CancellationTokenSource();
- var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallOptions(cancellationToken: cts.Token));
- var call = Calls.AsyncClientStreamingCall(callDetails);
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
- // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
- await Task.Delay(1000);
+ await barrier.Task; // make sure the handler has started.
cts.Cancel();
- try
- {
- await call.ResponseAsync;
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
- }
+ var ex = Assert.Throws<RpcException>(async () => await call.ResponseAsync);
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
[Test]
- public void AsyncUnaryCall_EchoMetadata()
+ public async Task AsyncUnaryCall_EchoMetadata()
{
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ foreach (Metadata.Entry metadataEntry in context.RequestHeaders)
+ {
+ if (metadataEntry.Key != "user-agent")
+ {
+ context.ResponseTrailers.Add(metadataEntry);
+ }
+ }
+ return "";
+ });
+
var headers = new Metadata
{
- new Metadata.Entry("ascii-header", "abcdefg"),
- new Metadata.Entry("binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff }),
+ { "ascii-header", "abcdefg" },
+ { "binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff } }
};
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions(headers: headers));
- var call = Calls.AsyncUnaryCall(callDetails, "ABC");
-
- Assert.AreEqual("ABC", call.ResponseAsync.Result);
+ var call = Calls.AsyncUnaryCall(helper.CreateUnaryCall(new CallOptions(headers: headers)), "ABC");
+ await call;
Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode);
@@ -236,15 +211,18 @@ namespace Grpc.Core.Tests
public void UnaryCall_DisposedChannel()
{
channel.Dispose();
-
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(callDetails, "ABC"));
+ Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test]
public void UnaryCallPerformance()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return request;
+ });
+
+ var callDetails = helper.CreateUnaryCall();
BenchmarkUtil.RunBenchmark(100, 100,
() => { Calls.BlockingUnaryCall(callDetails, "ABC"); });
}
@@ -252,44 +230,57 @@ namespace Grpc.Core.Tests
[Test]
public void UnknownMethodHandler()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, NonexistentMethod, new CallOptions());
- try
- {
- Calls.BlockingUnaryCall(callDetails, "ABC");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unimplemented, e.Status.StatusCode);
- }
+ var nonexistentMethod = new Method<string, string>(
+ MethodType.Unary,
+ MockServiceHelper.ServiceName,
+ "NonExistentMethod",
+ Marshallers.StringMarshaller,
+ Marshallers.StringMarshaller);
+
+ var callDetails = new CallInvocationDetails<string, string>(channel, nonexistentMethod, new CallOptions());
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(callDetails, "abc"));
+ Assert.AreEqual(StatusCode.Unimplemented, ex.Status.StatusCode);
}
[Test]
public void UserAgentStringPresent()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- string userAgent = Calls.BlockingUnaryCall(callDetails, "RETURN-USER-AGENT");
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return context.RequestHeaders.Where(entry => entry.Key == "user-agent").Single().Value;
+ });
+
+ string userAgent = Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc");
Assert.IsTrue(userAgent.StartsWith("grpc-csharp/"));
}
[Test]
public void PeerInfoPresent()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- string peer = Calls.BlockingUnaryCall(callDetails, "RETURN-PEER");
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return context.Peer;
+ });
+
+ string peer = Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc");
Assert.IsTrue(peer.Contains(Host));
}
[Test]
public async Task Channel_WaitForStateChangedAsync()
{
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return request;
+ });
+
Assert.Throws(typeof(TaskCanceledException),
async () => await channel.WaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(10)));
var stateChangedTask = channel.WaitForStateChangedAsync(channel.State);
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- await Calls.AsyncUnaryCall(callDetails, "abc");
+ await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc");
await stateChangedTask;
Assert.AreEqual(ChannelState.Ready, channel.State);
@@ -300,62 +291,9 @@ namespace Grpc.Core.Tests
{
await channel.ConnectAsync();
Assert.AreEqual(ChannelState.Ready, channel.State);
+
await channel.ConnectAsync(DateTime.UtcNow.AddMilliseconds(1000));
Assert.AreEqual(ChannelState.Ready, channel.State);
}
-
- private static async Task<string> EchoHandler(string request, ServerCallContext context)
- {
- foreach (Metadata.Entry metadataEntry in context.RequestHeaders)
- {
- if (metadataEntry.Key != "user-agent")
- {
- context.ResponseTrailers.Add(metadataEntry);
- }
- }
-
- if (request == "RETURN-USER-AGENT")
- {
- return context.RequestHeaders.Where(entry => entry.Key == "user-agent").Single().Value;
- }
-
- if (request == "RETURN-PEER")
- {
- return context.Peer;
- }
-
- if (request == "THROW")
- {
- throw new Exception("This was thrown on purpose by a test");
- }
-
- if (request == "THROW_UNAUTHENTICATED")
- {
- throw new RpcException(new Status(StatusCode.Unauthenticated, ""));
- }
-
- if (request == "SET_UNAUTHENTICATED")
- {
- context.Status = new Status(StatusCode.Unauthenticated, "");
- }
-
- return request;
- }
-
- private static async Task<string> ConcatAndEchoHandler(IAsyncStreamReader<string> requestStream, ServerCallContext context)
- {
- string result = "";
- await requestStream.ForEach(async (request) =>
- {
- if (request == "THROW")
- {
- throw new Exception("This was thrown on purpose by a test");
- }
- result += request;
- });
- // simulate processing takes some time.
- await Task.Delay(250);
- return result;
- }
}
}
diff --git a/src/csharp/Grpc.Core.Tests/CompressionTest.cs b/src/csharp/Grpc.Core.Tests/CompressionTest.cs
new file mode 100644
index 0000000000..ac0c3d6b5f
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/CompressionTest.cs
@@ -0,0 +1,128 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class CompressionTest
+ {
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper();
+
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.Dispose();
+ server.ShutdownAsync().Wait();
+ }
+
+ [TestFixtureTearDown]
+ public void CleanupClass()
+ {
+ GrpcEnvironment.Shutdown();
+ }
+
+ [Test]
+ public void WriteOptions_Unary()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ context.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
+ return request;
+ });
+
+ var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress));
+ Calls.BlockingUnaryCall(helper.CreateUnaryCall(callOptions), "abc");
+ }
+
+ [Test]
+ public async Task WriteOptions_DuplexStreaming()
+ {
+ helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+ {
+ await requestStream.ToList();
+
+ context.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
+
+ await context.WriteResponseHeadersAsync(new Metadata { { "ascii-header", "abcdefg" } });
+
+ await responseStream.WriteAsync("X");
+
+ responseStream.WriteOptions = null;
+ await responseStream.WriteAsync("Y");
+
+ responseStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
+ await responseStream.WriteAsync("Z");
+ });
+
+ var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress));
+ var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(callOptions));
+
+ // check that write options from call options are propagated to request stream.
+ Assert.IsTrue((call.RequestStream.WriteOptions.Flags & WriteFlags.NoCompress) != 0);
+
+ call.RequestStream.WriteOptions = new WriteOptions();
+ await call.RequestStream.WriteAsync("A");
+
+ call.RequestStream.WriteOptions = null;
+ await call.RequestStream.WriteAsync("B");
+
+ call.RequestStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
+ await call.RequestStream.WriteAsync("C");
+
+ await call.RequestStream.CompleteAsync();
+
+ await call.ResponseStream.ToList();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
new file mode 100644
index 0000000000..a7f5075874
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
@@ -0,0 +1,122 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class ContextPropagationTest
+ {
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper();
+
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.Dispose();
+ server.ShutdownAsync().Wait();
+ }
+
+ [TestFixtureTearDown]
+ public void CleanupClass()
+ {
+ GrpcEnvironment.Shutdown();
+ }
+
+ [Test]
+ public async Task PropagateCancellation()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ // check that we didn't obtain the default cancellation token.
+ Assert.IsTrue(context.CancellationToken.CanBeCanceled);
+ return "PASS";
+ });
+
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ var propagationToken = context.CreatePropagationToken();
+ Assert.IsNotNull(propagationToken.ParentCall);
+
+ var callOptions = new CallOptions(propagationToken: propagationToken);
+ return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
+ });
+
+ var cts = new CancellationTokenSource();
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
+ await call.RequestStream.CompleteAsync();
+ Assert.AreEqual("PASS", await call);
+ }
+
+ [Test]
+ public async Task PropagateDeadline()
+ {
+ var deadline = DateTime.UtcNow.AddDays(7);
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ Assert.IsTrue(context.Deadline < deadline.AddMinutes(1));
+ Assert.IsTrue(context.Deadline > deadline.AddMinutes(-1));
+ return "PASS";
+ });
+
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ var callOptions = new CallOptions(propagationToken: context.CreatePropagationToken());
+ return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
+ });
+
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(deadline: deadline)));
+ await call.RequestStream.CompleteAsync();
+ Assert.AreEqual("PASS", await call);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index f2bf459dc5..97ee0454bb 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -77,6 +77,10 @@
<Compile Include="TimeoutsTest.cs" />
<Compile Include="NUnitVersionTest.cs" />
<Compile Include="ChannelTest.cs" />
+ <Compile Include="MockServiceHelper.cs" />
+ <Compile Include="ResponseHeadersTest.cs" />
+ <Compile Include="CompressionTest.cs" />
+ <Compile Include="ContextPropagationTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 9ae12776f3..4ed93c7eca 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -69,5 +69,13 @@ namespace Grpc.Core.Tests
Assert.IsFalse(object.ReferenceEquals(env1, env2));
}
+
+ [Test]
+ public void GetCoreVersionString()
+ {
+ var coreVersion = GrpcEnvironment.GetCoreVersionString();
+ var parts = coreVersion.Split('.');
+ Assert.AreEqual(4, parts.Length);
+ }
}
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
index 46469113c5..33534fdd3c 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
@@ -53,8 +53,8 @@ namespace Grpc.Core.Internal.Tests
{
var metadata = new Metadata
{
- new Metadata.Entry("host", "somehost"),
- new Metadata.Entry("header2", "header value"),
+ { "host", "somehost" },
+ { "header2", "header value" },
};
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
nativeMetadata.Dispose();
@@ -65,8 +65,8 @@ namespace Grpc.Core.Internal.Tests
{
var metadata = new Metadata
{
- new Metadata.Entry("host", "somehost"),
- new Metadata.Entry("header2", "header value"),
+ { "host", "somehost" },
+ { "header2", "header value" }
};
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
diff --git a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
new file mode 100644
index 0000000000..b642286b11
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
@@ -0,0 +1,248 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ /// <summary>
+ /// Allows setting up a mock service in the client-server tests easily.
+ /// </summary>
+ public class MockServiceHelper
+ {
+ public const string ServiceName = "tests.Test";
+
+ public static readonly Method<string, string> UnaryMethod = new Method<string, string>(
+ MethodType.Unary,
+ ServiceName,
+ "Unary",
+ Marshallers.StringMarshaller,
+ Marshallers.StringMarshaller);
+
+ public static readonly Method<string, string> ClientStreamingMethod = new Method<string, string>(
+ MethodType.ClientStreaming,
+ ServiceName,
+ "ClientStreaming",
+ Marshallers.StringMarshaller,
+ Marshallers.StringMarshaller);
+
+ public static readonly Method<string, string> ServerStreamingMethod = new Method<string, string>(
+ MethodType.ServerStreaming,
+ ServiceName,
+ "ServerStreaming",
+ Marshallers.StringMarshaller,
+ Marshallers.StringMarshaller);
+
+ public static readonly Method<string, string> DuplexStreamingMethod = new Method<string, string>(
+ MethodType.DuplexStreaming,
+ ServiceName,
+ "DuplexStreaming",
+ Marshallers.StringMarshaller,
+ Marshallers.StringMarshaller);
+
+ readonly string host;
+ readonly ServerServiceDefinition serviceDefinition;
+
+ UnaryServerMethod<string, string> unaryHandler;
+ ClientStreamingServerMethod<string, string> clientStreamingHandler;
+ ServerStreamingServerMethod<string, string> serverStreamingHandler;
+ DuplexStreamingServerMethod<string, string> duplexStreamingHandler;
+
+ Server server;
+ Channel channel;
+
+ public MockServiceHelper(string host = null)
+ {
+ this.host = host ?? "localhost";
+
+ serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
+ .AddMethod(UnaryMethod, (request, context) => unaryHandler(request, context))
+ .AddMethod(ClientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context))
+ .AddMethod(ServerStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context))
+ .AddMethod(DuplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context))
+ .Build();
+
+ var defaultStatus = new Status(StatusCode.Unknown, "Default mock implementation. Please provide your own.");
+
+ unaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ context.Status = defaultStatus;
+ return "";
+ });
+
+ clientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ context.Status = defaultStatus;
+ return "";
+ });
+
+ serverStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ context.Status = defaultStatus;
+ });
+
+ duplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+ {
+ context.Status = defaultStatus;
+ });
+ }
+
+ /// <summary>
+ /// Returns the default server for this service and creates one if not yet created.
+ /// </summary>
+ public Server GetServer()
+ {
+ if (server == null)
+ {
+ server = new Server
+ {
+ Services = { serviceDefinition },
+ Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
+ };
+ }
+ return server;
+ }
+
+ /// <summary>
+ /// Returns the default channel for this service and creates one if not yet created.
+ /// </summary>
+ public Channel GetChannel()
+ {
+ if (channel == null)
+ {
+ channel = new Channel(Host, GetServer().Ports.Single().BoundPort, Credentials.Insecure);
+ }
+ return channel;
+ }
+
+ public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = null)
+ {
+ options = options ?? new CallOptions();
+ return new CallInvocationDetails<string, string>(channel, UnaryMethod, options);
+ }
+
+ public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = null)
+ {
+ options = options ?? new CallOptions();
+ return new CallInvocationDetails<string, string>(channel, ClientStreamingMethod, options);
+ }
+
+ public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = null)
+ {
+ options = options ?? new CallOptions();
+ return new CallInvocationDetails<string, string>(channel, ServerStreamingMethod, options);
+ }
+
+ public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = null)
+ {
+ options = options ?? new CallOptions();
+ return new CallInvocationDetails<string, string>(channel, DuplexStreamingMethod, options);
+ }
+
+ public string Host
+ {
+ get
+ {
+ return this.host;
+ }
+ }
+
+ public ServerServiceDefinition ServiceDefinition
+ {
+ get
+ {
+ return this.serviceDefinition;
+ }
+ }
+
+ public UnaryServerMethod<string, string> UnaryHandler
+ {
+ get
+ {
+ return this.unaryHandler;
+ }
+
+ set
+ {
+ unaryHandler = value;
+ }
+ }
+
+ public ClientStreamingServerMethod<string, string> ClientStreamingHandler
+ {
+ get
+ {
+ return this.clientStreamingHandler;
+ }
+
+ set
+ {
+ clientStreamingHandler = value;
+ }
+ }
+
+ public ServerStreamingServerMethod<string, string> ServerStreamingHandler
+ {
+ get
+ {
+ return this.serverStreamingHandler;
+ }
+
+ set
+ {
+ serverStreamingHandler = value;
+ }
+ }
+
+ public DuplexStreamingServerMethod<string, string> DuplexStreamingHandler
+ {
+ get
+ {
+ return this.duplexStreamingHandler;
+ }
+
+ set
+ {
+ duplexStreamingHandler = value;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
new file mode 100644
index 0000000000..8925041ba4
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
@@ -0,0 +1,136 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ /// <summary>
+ /// Tests for response headers support.
+ /// </summary>
+ public class ResponseHeadersTest
+ {
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ Metadata headers;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper();
+
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+
+ headers = new Metadata { { "ascii-header", "abcdefg" } };
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.Dispose();
+ server.ShutdownAsync().Wait();
+ }
+
+ [TestFixtureTearDown]
+ public void CleanupClass()
+ {
+ GrpcEnvironment.Shutdown();
+ }
+
+ [Test]
+ public void WriteResponseHeaders_NullNotAllowed()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ Assert.Throws(typeof(NullReferenceException), async () => await context.WriteResponseHeadersAsync(null));
+ return "PASS";
+ });
+
+ Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), ""));
+ }
+
+ [Test]
+ public void WriteResponseHeaders_AllowedOnlyOnce()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ try
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ Assert.Fail();
+ }
+ catch (InvalidOperationException expected)
+ {
+ }
+ return "PASS";
+ });
+
+ Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), ""));
+ }
+
+ [Test]
+ public async Task WriteResponseHeaders_NotAllowedAfterWrite()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ await responseStream.WriteAsync("A");
+ try
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ Assert.Fail();
+ }
+ catch (InvalidOperationException expected)
+ {
+ }
+ await responseStream.WriteAsync("B");
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+ var responses = await call.ResponseStream.ToList();
+ CollectionAssert.AreEqual(new[] { "A", "B" }, responses);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
index fc395b0acd..d875d601b9 100644
--- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
+++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
@@ -48,38 +48,18 @@ namespace Grpc.Core.Tests
/// </summary>
public class TimeoutsTest
{
- const string Host = "localhost";
- const string ServiceName = "tests.Test";
-
- static readonly Method<string, string> TestMethod = new Method<string, string>(
- MethodType.Unary,
- ServiceName,
- "Test",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- static readonly ServerServiceDefinition ServiceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
- .AddMethod(TestMethod, TestMethodHandler)
- .Build();
-
- // provides a way how to retrieve an out-of-band result value from server handler
- static TaskCompletionSource<string> stringFromServerHandlerTcs;
-
+ MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
- server = new Server
- {
- Services = { ServiceDefinition },
- Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
- };
- server.Start();
- channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure);
+ helper = new MockServiceHelper();
- stringFromServerHandlerTcs = new TaskCompletionSource<string>();
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
}
[TearDown]
@@ -98,115 +78,83 @@ namespace Grpc.Core.Tests
[Test]
public void InfiniteDeadline()
{
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ Assert.AreEqual(DateTime.MaxValue, context.Deadline);
+ return "PASS";
+ });
+
// no deadline specified, check server sees infinite deadline
- var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions());
- Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE"));
+ Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
// DateTime.MaxValue deadline specified, check server sees infinite deadline
- var callDetails2 = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions());
- Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails2, "RETURN_DEADLINE"));
+ Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.MaxValue)), "abc"));
}
[Test]
public void DeadlineTransferredToServer()
{
- var remainingTimeClient = TimeSpan.FromDays(7);
- var deadline = DateTime.UtcNow + remainingTimeClient;
- Thread.Sleep(1000);
- var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline));
-
- var serverDeadlineTicksString = Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE");
- var serverDeadline = new DateTime(long.Parse(serverDeadlineTicksString), DateTimeKind.Utc);
-
- // A fairly relaxed check that the deadline set by client and deadline seen by server
- // are in agreement. C core takes care of the work with transferring deadline over the wire,
- // so we don't need an exact check here.
- Assert.IsTrue(Math.Abs((deadline - serverDeadline).TotalMilliseconds) < 5000);
+ var clientDeadline = DateTime.UtcNow + TimeSpan.FromDays(7);
+
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ // A fairly relaxed check that the deadline set by client and deadline seen by server
+ // are in agreement. C core takes care of the work with transferring deadline over the wire,
+ // so we don't need an exact check here.
+ Assert.IsTrue(Math.Abs((clientDeadline - context.Deadline).TotalMilliseconds) < 5000);
+ return "PASS";
+ });
+ Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: clientDeadline)), "abc");
}
[Test]
public void DeadlineInThePast()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: DateTime.MinValue));
-
- try
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
- Calls.BlockingUnaryCall(callDetails, "TIMEOUT");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
- Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
- }
+ await Task.Delay(60000);
+ return "FAIL";
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.MinValue)), "abc"));
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
[Test]
public void DeadlineExceededStatusOnTimeout()
{
- var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
- var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline));
-
- try
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
- Calls.BlockingUnaryCall(callDetails, "TIMEOUT");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
- Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
- }
+ await Task.Delay(60000);
+ return "FAIL";
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)))), "abc"));
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
[Test]
- public void ServerReceivesCancellationOnTimeout()
+ public async Task ServerReceivesCancellationOnTimeout()
{
- var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
- var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline));
+ var serverReceivedCancellationTcs = new TaskCompletionSource<bool>();
- try
- {
- Calls.BlockingUnaryCall(callDetails, "CHECK_CANCELLATION_RECEIVED");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- // We can't guarantee the status code is always DeadlineExceeded. See issue #2685.
- Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
- }
- Assert.AreEqual("CANCELLED", stringFromServerHandlerTcs.Task.Result);
- }
-
- private static async Task<string> TestMethodHandler(string request, ServerCallContext context)
- {
- if (request == "TIMEOUT")
- {
- await Task.Delay(60000);
- return "";
- }
-
- if (request == "RETURN_DEADLINE")
- {
- if (context.Deadline == DateTime.MaxValue)
- {
- return "DATETIME_MAXVALUE";
- }
-
- return context.Deadline.Ticks.ToString();
- }
-
- if (request == "CHECK_CANCELLATION_RECEIVED")
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
// wait until cancellation token is fired.
var tcs = new TaskCompletionSource<object>();
context.CancellationToken.Register(() => { tcs.SetResult(null); });
await tcs.Task;
- stringFromServerHandlerTcs.SetResult("CANCELLED");
+ serverReceivedCancellationTcs.SetResult(true);
return "";
- }
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)))), "abc"));
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
- return "";
+ Assert.IsTrue(await serverReceivedCancellationTcs.Task);
}
}
}
diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs
index 8e9739335f..0d82b5a28e 100644
--- a/src/csharp/Grpc.Core/CallOptions.cs
+++ b/src/csharp/Grpc.Core/CallOptions.cs
@@ -47,6 +47,8 @@ namespace Grpc.Core
readonly Metadata headers;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
+ readonly WriteOptions writeOptions;
+ readonly ContextPropagationToken propagationToken;
/// <summary>
/// Creates a new instance of <c>CallOptions</c>.
@@ -54,12 +56,17 @@ namespace Grpc.Core
/// <param name="headers">Headers to be sent with the call.</param>
/// <param name="deadline">Deadline for the call to finish. null means no deadline.</param>
/// <param name="cancellationToken">Can be used to request cancellation of the call.</param>
- public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
+ /// <param name="writeOptions">Write options that will be used for this call.</param>
+ /// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param>
+ public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken? cancellationToken = null,
+ WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null)
{
// TODO(jtattermusch): consider only creating metadata object once it's really needed.
- this.headers = headers != null ? headers : new Metadata();
- this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue;
- this.cancellationToken = cancellationToken;
+ this.headers = headers ?? new Metadata();
+ this.deadline = deadline ?? (propagationToken != null ? propagationToken.Deadline : DateTime.MaxValue);
+ this.cancellationToken = cancellationToken ?? (propagationToken != null ? propagationToken.CancellationToken : CancellationToken.None);
+ this.writeOptions = writeOptions;
+ this.propagationToken = propagationToken;
}
/// <summary>
@@ -85,5 +92,27 @@ namespace Grpc.Core
{
get { return cancellationToken; }
}
+
+ /// <summary>
+ /// Write options that will be used for this call.
+ /// </summary>
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+ }
+
+ /// <summary>
+ /// Token for propagating parent call context.
+ /// </summary>
+ public ContextPropagationToken PropagationToken
+ {
+ get
+ {
+ return this.propagationToken;
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/CompressionLevel.cs b/src/csharp/Grpc.Core/CompressionLevel.cs
new file mode 100644
index 0000000000..399652b85e
--- /dev/null
+++ b/src/csharp/Grpc.Core/CompressionLevel.cs
@@ -0,0 +1,63 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Compression level based on grpc_compression_level from grpc/compression.h
+ /// </summary>
+ public enum CompressionLevel
+ {
+ /// <summary>
+ /// No compression.
+ /// </summary>
+ None = 0,
+
+ /// <summary>
+ /// Low compression.
+ /// </summary>
+ Low,
+
+ /// <summary>
+ /// Medium compression.
+ /// </summary>
+ Medium,
+
+ /// <summary>
+ /// High compression.
+ /// </summary>
+ High,
+ }
+}
diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs
new file mode 100644
index 0000000000..b6ea5115a4
--- /dev/null
+++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs
@@ -0,0 +1,139 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading;
+
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Token for propagating context of server side handlers to child calls.
+ /// In situations when a backend is making calls to another backend,
+ /// it makes sense to propagate properties like deadline and cancellation
+ /// token of the server call to the child call.
+ /// C core provides some other contexts (like tracing context) that
+ /// are not accessible to C# layer, but this token still allows propagating them.
+ /// </summary>
+ public class ContextPropagationToken
+ {
+ /// <summary>
+ /// Default propagation mask used by C core.
+ /// </summary>
+ const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
+
+ /// <summary>
+ /// Default propagation mask used by C# - we want to propagate deadline
+ /// and cancellation token by our own means.
+ /// </summary>
+ internal const ContextPropagationFlags DefaultMask = DefaultCoreMask
+ & ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation;
+
+ readonly CallSafeHandle parentCall;
+ readonly DateTime deadline;
+ readonly CancellationToken cancellationToken;
+ readonly ContextPropagationOptions options;
+
+ internal ContextPropagationToken(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options)
+ {
+ this.parentCall = Preconditions.CheckNotNull(parentCall);
+ this.deadline = deadline;
+ this.cancellationToken = cancellationToken;
+ this.options = options ?? ContextPropagationOptions.Default;
+ }
+
+ internal CallSafeHandle ParentCall
+ {
+ get
+ {
+ return this.parentCall;
+ }
+ }
+
+ internal DateTime Deadline
+ {
+ get
+ {
+ return this.deadline;
+ }
+ }
+
+ internal CancellationToken CancellationToken
+ {
+ get
+ {
+ return this.cancellationToken;
+ }
+ }
+
+ internal ContextPropagationOptions Options
+ {
+ get
+ {
+ return this.options;
+ }
+ }
+
+ internal bool IsPropagateDeadline
+ {
+ get { return false; }
+ }
+
+ internal bool IsPropagateCancellation
+ {
+ get { return false; }
+ }
+ }
+
+ /// <summary>
+ /// Options for <see cref="ContextPropagationToken"/>.
+ /// </summary>
+ public class ContextPropagationOptions
+ {
+ public static readonly ContextPropagationOptions Default = new ContextPropagationOptions();
+ }
+
+ /// <summary>
+ /// Context propagation flags from grpc/grpc.h.
+ /// </summary>
+ [Flags]
+ internal enum ContextPropagationFlags
+ {
+ Deadline = 1,
+ CensusStatsContext = 2,
+ CensusTracingContext = 4,
+ Cancellation = 8
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 52defd1965..e535c47f55 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -115,6 +115,9 @@
<Compile Include="ChannelState.cs" />
<Compile Include="CallInvocationDetails.cs" />
<Compile Include="CallOptions.cs" />
+ <Compile Include="CompressionLevel.cs" />
+ <Compile Include="WriteOptions.cs" />
+ <Compile Include="ContextPropagationToken.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 034a66be3c..1bb83c9962 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -53,6 +53,9 @@ namespace Grpc.Core
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_shutdown();
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern IntPtr grpcsharp_version_string(); // returns not-owned const char*
+
static object staticLock = new object();
static GrpcEnvironment instance;
@@ -164,6 +167,15 @@ namespace Grpc.Core
}
/// <summary>
+ /// Gets version of gRPC C core.
+ /// </summary>
+ internal static string GetCoreVersionString()
+ {
+ var ptr = grpcsharp_version_string(); // the pointer is not owned
+ return Marshal.PtrToStringAnsi(ptr);
+ }
+
+ /// <summary>
/// Shuts down this environment.
/// </summary>
private void Close()
diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
index 371fbf27ce..c0a0674e50 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
@@ -43,7 +43,7 @@ namespace Grpc.Core
/// A stream of messages to be read.
/// </summary>
/// <typeparam name="T"></typeparam>
- public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse>
+ public interface IAsyncStreamReader<T> : IAsyncEnumerator<T>
{
// TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface.
}
diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
index 2000210252..4e2acb9c71 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
@@ -50,5 +50,13 @@ namespace Grpc.Core
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
Task WriteAsync(T message);
+
+ /// <summary>
+ /// Write options that will be used for the next write.
+ /// If null, default options will be used.
+ /// Once set, this property maintains its value across subsequent
+ /// writes.
+ /// <value>The write options.</value>
+ WriteOptions WriteOptions { get; set; }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 414b5c4282..0db9d2a515 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -50,7 +50,7 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
- readonly CallInvocationDetails<TRequest, TResponse> callDetails;
+ readonly CallInvocationDetails<TRequest, TResponse> details;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -63,7 +63,8 @@ namespace Grpc.Core.Internal
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
- this.callDetails = callDetails;
+ this.details = callDetails;
+ this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
@@ -89,11 +90,11 @@ namespace Grpc.Core.Internal
readingDone = true;
}
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
- call.StartUnary(payload, ctx, metadataArray);
+ call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
@@ -130,7 +131,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@@ -138,9 +139,9 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartUnary(payload, HandleUnaryResponse, metadataArray);
+ call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
}
@@ -157,12 +158,12 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
@@ -181,16 +182,16 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartServerStreaming(payload, HandleFinished, metadataArray);
+ call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
}
}
@@ -206,9 +207,9 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
@@ -219,9 +220,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -278,6 +279,14 @@ namespace Grpc.Core.Internal
}
}
+ public CallInvocationDetails<TRequest, TResponse> Details
+ {
+ get
+ {
+ return this.details;
+ }
+ }
+
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@@ -310,14 +319,18 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
private void Initialize(CompletionQueueSafeHandle cq)
{
- var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq,
- callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Options.Deadline));
- callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
+ var propagationToken = details.Options.PropagationToken;
+ var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance;
+
+ var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
+ parentCall, ContextPropagationToken.DefaultMask, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
RegisterCancellationCallback();
}
@@ -325,7 +338,7 @@ namespace Grpc.Core.Internal
// Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
private void RegisterCancellationCallback()
{
- var token = callDetails.Options.CancellationToken;
+ var token = details.Options.CancellationToken;
if (token.CanBeCanceled)
{
token.Register(() => this.Cancel());
@@ -333,6 +346,15 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Gets WriteFlags set in callDetails.Options.WriteOptions
+ /// </summary>
+ private WriteFlags GetWriteFlagsForCall()
+ {
+ var writeOptions = details.Options.WriteOptions;
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
+ }
+
+ /// <summary>
/// Handler for unary response completion.
/// </summary>
private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 38f2a5baeb..9fa0baca87 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -71,6 +71,9 @@ namespace Grpc.Core.Internal
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer.
+ protected bool initialMetadataSent;
+ protected long streamingWritesCounter;
+
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
@@ -123,7 +126,7 @@ namespace Grpc.Core.Internal
/// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate)
+ protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
byte[] payload = UnsafeSerialize(msg);
@@ -132,8 +135,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(payload, HandleSendFinished);
+ call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
+
sendCompletionDelegate = completionDelegate;
+ initialMetadataSent = true;
+ streamingWritesCounter++;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 513902ee36..3710a65d6b 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -83,9 +83,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -98,6 +98,35 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Initiates sending a initial metadata.
+ /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
+ /// to make things simpler.
+ /// completionDelegate is invoked upon completion.
+ /// </summary>
+ public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(headers, "metadata");
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
+ Preconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
+ CheckSendingAllowed();
+
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ {
+ call.StartSendInitialMetadata(HandleSendFinished, metadataArray);
+ }
+
+ this.initialMetadataSent = true;
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
@@ -111,7 +140,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
+ call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 714749b171..3cb01e29bd 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -42,6 +42,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
+ public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
+
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
@@ -53,7 +55,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
@@ -62,7 +64,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
- MetadataArraySafeHandle metadataArray);
+ MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
@@ -70,7 +72,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -78,7 +80,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@@ -89,6 +91,10 @@ namespace Grpc.Core.Internal
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
@@ -103,17 +109,17 @@ namespace Grpc.Core.Internal
this.completionRegistry = completionRegistry;
}
- public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
- public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@@ -124,11 +130,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
@@ -138,11 +144,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
+ public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
@@ -152,11 +158,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
- public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@@ -173,6 +179,13 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
+ public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
+ }
+
public void Cancel()
{
grpcsharp_call_cancel(this).CheckOk();
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 7324ebdf57..7f03bf4ea5 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -47,7 +47,7 @@ namespace Grpc.Core.Internal
static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs);
[DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
+ static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect);
@@ -76,9 +76,9 @@ namespace Grpc.Core.Internal
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
- public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
+ public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
- var result = grpcsharp_channel_create_call(this, cq, method, host, deadline);
+ var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
result.SetCompletionRegistry(registry);
return result;
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 58f493463b..013f00ff6f 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -40,16 +40,18 @@ namespace Grpc.Core.Internal
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
{
readonly AsyncCall<TRequest, TResponse> call;
+ WriteOptions writeOptions;
public ClientRequestStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
+ this.writeOptions = call.Details.Options.WriteOptions;
}
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@@ -59,5 +61,24 @@ namespace Grpc.Core.Internal
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 19f0e3c57f..688f9f6fec 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -75,7 +75,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -131,7 +131,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -187,7 +187,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
@@ -247,7 +247,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
@@ -304,13 +304,14 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
- public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken)
+ public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
+ where TRequest : class
+ where TResponse : class
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
- return new ServerCallContext(
- newRpc.Method, newRpc.Host, peer, realtimeDeadline,
- newRpc.RequestMetadata, cancellationToken);
+ return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
+ newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 756dcee87f..03e39efc02 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -38,11 +38,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Writes responses asynchronously to an underlying AsyncCallServer object.
/// </summary>
- internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>
+ internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions
where TRequest : class
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
+ WriteOptions writeOptions;
public ServerResponseStream(AsyncCallServer<TRequest, TResponse> call)
{
@@ -52,7 +53,7 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@@ -62,5 +63,31 @@ namespace Grpc.Core.Internal
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ var taskSource = new AsyncCompletionTaskSource<object>();
+ call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
+ return taskSource.Task;
+ }
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptions;
+ }
+
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs
index 6fd0a7109d..a58dbdbc93 100644
--- a/src/csharp/Grpc.Core/Metadata.cs
+++ b/src/csharp/Grpc.Core/Metadata.cs
@@ -114,6 +114,16 @@ namespace Grpc.Core
entries.Add(item);
}
+ public void Add(string key, string value)
+ {
+ Add(new Entry(key, value));
+ }
+
+ public void Add(string key, byte[] valueBytes)
+ {
+ Add(new Entry(key, valueBytes));
+ }
+
public void Clear()
{
CheckWriteable();
diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs
index 032b1390db..75d81c64f3 100644
--- a/src/csharp/Grpc.Core/ServerCallContext.cs
+++ b/src/csharp/Grpc.Core/ServerCallContext.cs
@@ -36,15 +36,16 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
+using Grpc.Core.Internal;
+
namespace Grpc.Core
{
/// <summary>
/// Context for a server-side call.
/// </summary>
- public sealed class ServerCallContext
+ public class ServerCallContext
{
- // TODO(jtattermusch): expose method to send initial metadata back to client
-
+ private readonly CallSafeHandle callHandle;
private readonly string method;
private readonly string host;
private readonly string peer;
@@ -54,15 +55,34 @@ namespace Grpc.Core
private readonly Metadata responseTrailers = new Metadata();
private Status status = Status.DefaultSuccess;
+ private Func<Metadata, Task> writeHeadersFunc;
+ private IHasWriteOptions writeOptionsHolder;
- public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken)
+ internal ServerCallContext(CallSafeHandle callHandle, string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
+ Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder)
{
+ this.callHandle = callHandle;
this.method = method;
this.host = host;
this.peer = peer;
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
+ this.writeHeadersFunc = writeHeadersFunc;
+ this.writeOptionsHolder = writeOptionsHolder;
+ }
+
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ return writeHeadersFunc(responseHeaders);
+ }
+
+ /// <summary>
+ /// Creates a propagation token to be used to propagate call context to a child call.
+ /// </summary>
+ public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null)
+ {
+ return new ContextPropagationToken(callHandle, deadline, cancellationToken, options);
}
/// <summary>Name of method called in this RPC.</summary>
@@ -110,7 +130,7 @@ namespace Grpc.Core
}
}
- ///<summary>Cancellation token signals when call is cancelled.</summary>
+ /// <summary>Cancellation token signals when call is cancelled.</summary>
public CancellationToken CancellationToken
{
get
@@ -141,5 +161,31 @@ namespace Grpc.Core
status = value;
}
}
+
+ /// <summary>
+ /// Allows setting write options for the following write.
+ /// For streaming response calls, this property is also exposed as on IServerStreamWriter for convenience.
+ /// Both properties are backed by the same underlying value.
+ /// </summary>
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptionsHolder.WriteOptions;
+ }
+
+ set
+ {
+ writeOptionsHolder.WriteOptions = value;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Allows sharing write options between ServerCallContext and other objects.
+ /// </summary>
+ public interface IHasWriteOptions
+ {
+ WriteOptions WriteOptions { get; set; }
}
}
diff --git a/src/csharp/Grpc.Core/Version.cs b/src/csharp/Grpc.Core/Version.cs
index b5cb652945..d2a029fbb4 100644
--- a/src/csharp/Grpc.Core/Version.cs
+++ b/src/csharp/Grpc.Core/Version.cs
@@ -2,4 +2,4 @@ using System.Reflection;
using System.Runtime.CompilerServices;
// The current version of gRPC C#.
-[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".*")]
+[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".0")]
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index 656a3d47bb..939372e237 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -8,6 +8,6 @@ namespace Grpc.Core
/// <summary>
/// Current version of gRPC
/// </summary>
- public const string CurrentVersion = "0.6.0";
+ public const string CurrentVersion = "0.6.1";
}
}
diff --git a/src/csharp/Grpc.Core/WriteOptions.cs b/src/csharp/Grpc.Core/WriteOptions.cs
new file mode 100644
index 0000000000..7ef3189d76
--- /dev/null
+++ b/src/csharp/Grpc.Core/WriteOptions.cs
@@ -0,0 +1,82 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Flags for write operations.
+ /// </summary>
+ [Flags]
+ public enum WriteFlags
+ {
+ /// <summary>
+ /// Hint that the write may be buffered and need not go out on the wire immediately.
+ /// gRPC is free to buffer the message until the next non-buffered
+ /// write, or until write stream completion, but it need not buffer completely or at all.
+ /// </summary>
+ BufferHint = 0x1,
+
+ /// <summary>
+ /// Force compression to be disabled for a particular write.
+ /// </summary>
+ NoCompress = 0x2
+ }
+
+ /// <summary>
+ /// Options for write operations.
+ /// </summary>
+ public class WriteOptions
+ {
+ /// <summary>
+ /// Default write options.
+ /// </summary>
+ public static readonly WriteOptions Default = new WriteOptions();
+
+ private WriteFlags flags;
+
+ public WriteOptions(WriteFlags flags = default(WriteFlags))
+ {
+ this.flags = flags;
+ }
+
+ public WriteFlags Flags
+ {
+ get
+ {
+ return this.flags;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index 08aece7ef2..73d2a1ca9b 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -92,15 +92,8 @@ namespace math.Tests
[Test]
public void DivByZero()
{
- try
- {
- DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build());
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
- }
+ var ex = Assert.Throws<RpcException>(() => client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build()));
+ Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
}
[Test]
@@ -158,15 +151,10 @@ namespace math.Tests
using (var call = client.Fib(new FibArgs.Builder { Limit = 0 }.Build(),
deadline: DateTime.UtcNow.AddMilliseconds(500)))
{
- try
- {
- await call.ResponseStream.ToList();
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode);
- }
+ var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.ToList());
+
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 7411d91d5a..6802de489d 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -404,15 +404,8 @@ namespace Grpc.IntegrationTesting
await Task.Delay(1000);
cts.Cancel();
- try
- {
- var response = await call.ResponseAsync;
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
- }
+ var ex = Assert.Throws<RpcException>(async () => await call.ResponseAsync);
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
Console.WriteLine("Passed!");
}
@@ -435,15 +428,8 @@ namespace Grpc.IntegrationTesting
cts.Cancel();
- try
- {
- await call.ResponseStream.MoveNext();
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
- }
+ var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext());
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
Console.WriteLine("Passed!");
}
diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat
index 9e1253bf0b..8a11d01430 100644
--- a/src/csharp/build_packages.bat
+++ b/src/csharp/build_packages.bat
@@ -1,8 +1,8 @@
@rem Builds gRPC NuGet packages
@rem Current package versions
-set VERSION=0.6.0
-set CORE_VERSION=0.10.0
+set VERSION=0.6.1
+set CORE_VERSION=0.10.1
@rem Adjust the location of nuget.exe
set NUGET=C:\nuget\nuget.exe
diff --git a/src/csharp/doc/README.md b/src/csharp/doc/README.md
new file mode 100644
index 0000000000..585500b5ca
--- /dev/null
+++ b/src/csharp/doc/README.md
@@ -0,0 +1,2 @@
+
+SandCastle project files to generate HTML reference documentation. \ No newline at end of file
diff --git a/src/csharp/doc/grpc_csharp_public.shfbproj b/src/csharp/doc/grpc_csharp_public.shfbproj
new file mode 100644
index 0000000000..05c93f4a13
--- /dev/null
+++ b/src/csharp/doc/grpc_csharp_public.shfbproj
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <!-- The configuration and platform will be used to determine which assemblies to include from solution and
+ project documentation sources -->
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{77e3da09-fc92-486f-a90a-99ca788e8b59}</ProjectGuid>
+ <SHFBSchemaVersion>2015.6.5.0</SHFBSchemaVersion>
+ <!-- AssemblyName, Name, and RootNamespace are not used by SHFB but Visual Studio adds them anyway -->
+ <AssemblyName>Documentation</AssemblyName>
+ <RootNamespace>Documentation</RootNamespace>
+ <Name>Documentation</Name>
+ <!-- SHFB properties -->
+ <FrameworkVersion>.NET Framework 4.5</FrameworkVersion>
+ <OutputPath>..\..\..\doc\ref\csharp\html</OutputPath>
+ <Language>en-US</Language>
+ <DocumentationSources>
+ <DocumentationSource sourceFile="..\Grpc.Auth\Grpc.Auth.csproj" />
+<DocumentationSource sourceFile="..\Grpc.Core\Grpc.Core.csproj" /></DocumentationSources>
+ <BuildAssemblerVerbosity>OnlyWarningsAndErrors</BuildAssemblerVerbosity>
+ <HelpFileFormat>Website</HelpFileFormat>
+ <IndentHtml>False</IndentHtml>
+ <KeepLogFile>True</KeepLogFile>
+ <DisableCodeBlockComponent>False</DisableCodeBlockComponent>
+ <CleanIntermediates>True</CleanIntermediates>
+ <HelpFileVersion>1.0.0.0</HelpFileVersion>
+ <MaximumGroupParts>2</MaximumGroupParts>
+ <NamespaceGrouping>False</NamespaceGrouping>
+ <SyntaxFilters>Standard</SyntaxFilters>
+ <SdkLinkTarget>Blank</SdkLinkTarget>
+ <RootNamespaceContainer>True</RootNamespaceContainer>
+ <PresentationStyle>VS2013</PresentationStyle>
+ <Preliminary>False</Preliminary>
+ <NamingMethod>MemberName</NamingMethod>
+ <HelpTitle>gRPC C#</HelpTitle>
+ <ContentPlacement>AboveNamespaces</ContentPlacement>
+ <HtmlHelpName>Documentation</HtmlHelpName>
+ </PropertyGroup>
+ <!-- There are no properties for these groups. AnyCPU needs to appear in order for Visual Studio to perform
+ the build. The others are optional common platform types that may appear. -->
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x86' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|Win32' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|Win32' ">
+ </PropertyGroup>
+ <!-- Import the SHFB build targets -->
+ <Import Project="$(SHFBROOT)\SandcastleHelpFileBuilder.targets" />
+ <!-- The pre-build and post-build event properties must appear *after* the targets file import in order to be
+ evaluated correctly. -->
+ <PropertyGroup>
+ <PreBuildEvent>
+ </PreBuildEvent>
+ <PostBuildEvent>
+ </PostBuildEvent>
+ <RunPostBuildEvent>OnBuildSuccess</RunPostBuildEvent>
+ </PropertyGroup>
+</Project> \ No newline at end of file
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index d28ce1b383..88f092573e 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -377,10 +377,12 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) {
}
GPR_EXPORT grpc_call *GPR_CALLTYPE
-grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq,
+grpcsharp_channel_create_call(grpc_channel *channel, grpc_call *parent_call,
+ gpr_uint32 propagation_mask,
+ grpc_completion_queue *cq,
const char *method, const char *host,
gpr_timespec deadline) {
- return grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
+ return grpc_channel_create_call(channel, parent_call, propagation_mask, cq,
method, host, deadline, NULL);
}
@@ -498,7 +500,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,
- grpc_metadata_array *initial_metadata) {
+ grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[6];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -512,7 +514,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
- ops[1].flags = 0;
+ ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = 0;
@@ -581,7 +583,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
- size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
+ size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[5];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -595,7 +597,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
- ops[1].flags = 0;
+ ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = 0;
@@ -656,16 +658,22 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
- const char *send_buffer, size_t send_buffer_len) {
+ const char *send_buffer, size_t send_buffer_len,
+ gpr_uint32 write_flags,
+ gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
- grpc_op ops[1];
+ grpc_op ops[2];
+ size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
- ops[0].flags = 0;
+ ops[0].flags = write_flags;
+ ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[1].data.send_initial_metadata.count = 0;
+ ops[1].data.send_initial_metadata.metadata = NULL;
+ ops[1].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -682,9 +690,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
- const char *status_details, grpc_metadata_array *trailing_metadata) {
+ const char *status_details, grpc_metadata_array *trailing_metadata,
+ gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
- grpc_op ops[1];
+ grpc_op ops[2];
+ size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@@ -696,9 +706,12 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ops[0].data.send_status_from_server.trailing_metadata =
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0;
+ ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[1].data.send_initial_metadata.count = 0;
+ ops[1].data.send_initial_metadata.metadata = NULL;
+ ops[1].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -715,16 +728,28 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
- grpc_op ops[2];
- ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
- ops[0].data.send_initial_metadata.count = 0;
- ops[0].data.send_initial_metadata.metadata = NULL;
+ grpc_op ops[1];
+ ops[0].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops[0].data.recv_close_on_server.cancelled =
+ (&ctx->recv_close_on_server_cancelled);
ops[0].flags = 0;
- ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- ops[1].data.recv_close_on_server.cancelled =
- (&ctx->recv_close_on_server_cancelled);
- ops[1].flags = 0;
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+}
+
+GPR_EXPORT grpc_call_error GPR_CALLTYPE
+grpcsharp_call_send_initial_metadata(grpc_call *call,
+ grpcsharp_batch_context *ctx,
+ grpc_metadata_array *initial_metadata) {
+ /* TODO: don't use magic number */
+ grpc_op ops[1];
+ ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
+ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
+ initial_metadata);
+ ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
+ ops[0].data.send_initial_metadata.metadata =
+ ctx->send_initial_metadata.metadata;
+ ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@@ -859,6 +884,11 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_redirect_log(grpcsharp_log_func func) {
typedef void(GPR_CALLTYPE *test_callback_funcptr)(gpr_int32 success);
+/* Version info */
+GPR_EXPORT const char *GPR_CALLTYPE grpcsharp_version_string() {
+ return grpc_version_string();
+}
+
/* For testing */
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_test_callback(test_callback_funcptr callback) {
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js
index 236b36616c..221d69e246 100644
--- a/src/node/interop/interop_client.js
+++ b/src/node/interop/interop_client.js
@@ -69,9 +69,6 @@ function zeroBuffer(size) {
function emptyUnary(client, done) {
var call = client.emptyCall({}, function(err, resp) {
assert.ifError(err);
- });
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
@@ -96,9 +93,6 @@ function largeUnary(client, done) {
assert.ifError(err);
assert.strictEqual(resp.payload.type, 'COMPRESSABLE');
assert.strictEqual(resp.payload.body.length, 314159);
- });
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
@@ -115,9 +109,6 @@ function clientStreaming(client, done) {
var call = client.streamingInputCall(function(err, resp) {
assert.ifError(err);
assert.strictEqual(resp.aggregated_payload_size, 74922);
- });
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
@@ -308,9 +299,6 @@ function authTest(expected_user, scope, client, done) {
assert.strictEqual(resp.payload.body.length, 314159);
assert.strictEqual(resp.username, expected_user);
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
- });
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
@@ -344,9 +332,6 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) {
assert.ifError(err);
assert.strictEqual(resp.username, expected_user);
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
- });
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
@@ -358,7 +343,6 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) {
client.updateMetadata = updateMetadata;
makeTestCall(null, {});
}
-
});
});
}
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 5f7d74bca8..0f4c811ce4 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -74,11 +74,20 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// all. This wrapper over our actual writeable ensures thread-safety and
// correct ordering.
GRXConcurrentWriteable *_responseWriteable;
+
+ // The network thread wants the requestWriter to resume (when the server is ready for more input),
+ // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
+ // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
+ // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
+ // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
+ // pause the writer immediately on writeValue:, so we need our locking to be recursive.
GRXWriter *_requestWriter;
// To create a retain cycle when a call is started, up until it finishes. See
- // |startWithWriteable:| and |finishWithError:|.
- GRPCCall *_self;
+ // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
+ // reference to the call object if all they're interested in is the handler being executed when
+ // the response arrives.
+ GRPCCall *_retainSelf;
NSMutableDictionary *_requestMetadata;
NSMutableDictionary *_responseMetadata;
@@ -136,11 +145,12 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
- (void)finishWithError:(NSError *)errorOrNil {
// If the call isn't retained anywhere else, it can be deallocated now.
- _self = nil;
+ _retainSelf = nil;
// If there were still request messages coming, stop them.
- _requestWriter.state = GRXWriterStateFinished;
- _requestWriter = nil;
+ @synchronized(_requestWriter) {
+ _requestWriter.state = GRXWriterStateFinished;
+ }
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
@@ -240,12 +250,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
- strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ @synchronized(strongSelf->_requestWriter) {
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ }
}
};
- [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc]
- initWithMessage:message
- handler:resumingHandler]] errorHandler:errorHandler];
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
+ handler:resumingHandler]]
+ errorHandler:errorHandler];
}
- (void)writeValue:(id)value {
@@ -253,7 +265,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
- _requestWriter.state = GRXWriterStatePaused;
+ @synchronized(_requestWriter) {
+ _requestWriter.state = GRXWriterStatePaused;
+ }
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
@@ -273,7 +287,6 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _requestWriter = nil;
if (errorOrNil) {
[self cancel];
} else {
@@ -327,7 +340,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
}];
// Now that the RPC has been initiated, request writes can start.
- [_requestWriter startWithWriteable:self];
+ @synchronized(_requestWriter) {
+ [_requestWriter startWithWriteable:self];
+ }
}
#pragma mark GRXWriter implementation
@@ -338,7 +353,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// before being autoreleased).
// Care is taken not to retain self strongly in any of the blocks used in this implementation, so
// that the life of the instance is determined by this retain cycle.
- _self = self;
+ _retainSelf = self;
_responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
[self sendHeaders:_requestMetadata];
diff --git a/src/objective-c/GRPCClient/private/GRPCSecureChannel.m b/src/objective-c/GRPCClient/private/GRPCSecureChannel.m
index 9b4b6768f8..0a54804bb2 100644
--- a/src/objective-c/GRPCClient/private/GRPCSecureChannel.m
+++ b/src/objective-c/GRPCClient/private/GRPCSecureChannel.m
@@ -38,15 +38,18 @@
// Returns NULL if the file at path couldn't be read. In that case, if errorPtr isn't NULL,
// *errorPtr will be an object describing what went wrong.
static grpc_credentials *CertificatesAtPath(NSString *path, NSError **errorPtr) {
- NSString *certsContent = [NSString stringWithContentsOfFile:path
- encoding:NSASCIIStringEncoding
+ // Files in PEM format can have non-ASCII characters in their comments (e.g. for the name of the
+ // issuer). Load them as UTF8 and produce an ASCII equivalent.
+ NSString *contentInUTF8 = [NSString stringWithContentsOfFile:path
+ encoding:NSUTF8StringEncoding
error:errorPtr];
- if (!certsContent) {
+ NSData *contentInASCII = [contentInUTF8 dataUsingEncoding:NSASCIIStringEncoding
+ allowLossyConversion:YES];
+ if (!contentInASCII.bytes) {
// Passing NULL to grpc_ssl_credentials_create produces behavior we don't want, so return.
return NULL;
}
- const char * asCString = [certsContent cStringUsingEncoding:NSASCIIStringEncoding];
- return grpc_ssl_credentials_create(asCString, NULL);
+ return grpc_ssl_credentials_create(contentInASCII.bytes, NULL);
}
@implementation GRPCSecureChannel
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index b6296e1ed7..ca94ce275f 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -36,13 +36,11 @@
#import "GRXWriteable.h"
#import "GRXWriter.h"
-// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
-// to -startWithWriteable:).
+// A buffered pipe is a Writer that also acts as a Writeable.
// Once it is started, whatever values are written into it (via -writeValue:) will be propagated
// immediately, unless flow control prevents it.
// If it is throttled and keeps receiving values, as well as if it receives values before being
-// started, it will buffer them and propagate them in order as soon as its state becomes
-// GRXWriterStateStarted.
+// started, it will buffer them and propagate them in order as soon as its state becomes Started.
// If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
// propagate the error immediately.
//
@@ -51,6 +49,9 @@
// pipe will keep buffering all data written to it, your application could run out of memory and
// crash. If you want to react to flow control signals to prevent that, instead of using this class
// you can implement an object that conforms to GRXWriter.
+//
+// Thread-safety:
+// The methods of an object of this class should not be called concurrently from different threads.
@interface GRXBufferedPipe : GRXWriter<GRXWriteable>
// Convenience constructor.
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h
index d004333d2b..f310832284 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.h
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h
@@ -33,11 +33,17 @@
#import "GRXWriter.h"
-// A "proxy" class that simply forwards values, completion, and errors from its
-// input writer to its writeable.
+// A "proxy" class that simply forwards values, completion, and errors from its input writer to its
+// writeable.
// It is useful as a superclass for pipes that act as a transformation of their
// input writer, and for classes that represent objects with input and
// output sequences of values, like an RPC.
+//
+// Thread-safety:
+// All messages sent to this object need to be serialized. When it is started, the writer it wraps
+// is started in the same thread. Manual state changes are propagated to the wrapped writer in the
+// same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be
+// serialized with any message sent to this object.
@interface GRXForwardingWriter : GRXWriter
- (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER;
@end
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m
index 2342f51ab3..a72be9ace2 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.m
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m
@@ -48,7 +48,11 @@
// Designated initializer
- (instancetype)initWithWriter:(GRXWriter *)writer {
if (!writer) {
- [NSException raise:NSInvalidArgumentException format:@"writer can't be nil."];
+ return nil;
+ }
+ if (writer.state != GRXWriterStateNotStarted) {
+ [NSException raise:NSInvalidArgumentException
+ format:@"The writer argument must not have already started."];
}
if ((self = [super init])) {
_writer = writer;
diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.h b/src/objective-c/RxLibrary/GRXImmediateWriter.h
index b171f0c760..3fcc259434 100644
--- a/src/objective-c/RxLibrary/GRXImmediateWriter.h
+++ b/src/objective-c/RxLibrary/GRXImmediateWriter.h
@@ -36,10 +36,17 @@
#import "GRXWriter.h"
// Utility to construct GRXWriter instances from values that are immediately available when
-// required. The returned writers all support pausing and early termination.
+// required.
//
-// Unless the writeable callback pauses them or stops them early, these writers will do all their
-// interactions with the writeable before the start method returns.
+// Thread-safety:
+//
+// An object of this class shouldn't be messaged concurrently by more than one thread. It will start
+// messaging the writeable before |startWithWriteable:| returns, in the same thread. That is the
+// only place where the writer can be paused or stopped prematurely.
+//
+// If a paused writer of this class is resumed, it will start messaging the writeable, in the same
+// thread, before |setState:| returns. Because the object can't be legally accessed concurrently,
+// that's the only place where it can be paused again (or stopped).
@interface GRXImmediateWriter : GRXWriter
// Returns a writer that pulls values from the passed NSEnumerator instance and pushes them to
diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h
index 5d6e1a472a..b1c994aa38 100644
--- a/src/objective-c/RxLibrary/GRXWriter.h
+++ b/src/objective-c/RxLibrary/GRXWriter.h
@@ -35,84 +35,73 @@
#import "GRXWriteable.h"
+// States of a writer.
typedef NS_ENUM(NSInteger, GRXWriterState) {
- // The writer has not yet been given a writeable to which it can push its
- // values. To have an writer transition to the Started state, send it a
- // startWithWriteable: message.
+ // The writer has not yet been given a writeable to which it can push its values. To have a writer
+ // transition to the Started state, send it a startWithWriteable: message.
//
- // An writer's state cannot be manually set to this value.
+ // A writer's state cannot be manually set to this value.
GRXWriterStateNotStarted,
// The writer might push values to the writeable at any moment.
GRXWriterStateStarted,
- // The writer is temporarily paused, and won't send any more values to the
- // writeable unless its state is set back to Started. The writer might still
- // transition to the Finished state at any moment, and is allowed to send
- // writesFinishedWithError: to its writeable.
- //
- // Not all implementations of writer have to support pausing, and thus
- // trying to set an writer's state to this value might have no effect.
+ // The writer is temporarily paused, and won't send any more values to the writeable unless its
+ // state is set back to Started. The writer might still transition to the Finished state at any
+ // moment, and is allowed to send writesFinishedWithError: to its writeable.
GRXWriterStatePaused,
// The writer has released its writeable and won't interact with it anymore.
//
- // One seldomly wants to set an writer's state to this value, as its
- // writeable isn't notified with a writesFinishedWithError: message. Instead, sending
- // finishWithError: to the writer will make it notify the writeable and then
- // transition to this state.
+ // One seldomly wants to set a writer's state to this value, as its writeable isn't notified with
+ // a writesFinishedWithError: message. Instead, sending finishWithError: to the writer will make
+ // it notify the writeable and then transition to this state.
GRXWriterStateFinished
};
-// An object that conforms to this protocol can produce, on demand, a sequence
-// of values. The sequence may be produced asynchronously, and it may consist of
-// any number of elements, including none or an infinite number.
+// An GRXWriter object can produce, on demand, a sequence of values. The sequence may be produced
+// asynchronously, and it may consist of any number of elements, including none or an infinite
+// number.
+//
+// GRXWriter is the active dual of NSEnumerator. The difference between them is thus whether the
+// object plays an active or passive role during usage: A user of NSEnumerator pulls values off it,
+// and passes the values to a writeable. A user of GRXWriter, though, just gives it a writeable, and
+// the GRXWriter instance pushes values to the writeable. This makes this protocol suitable to
+// represent a sequence of future values, as well as collections with internal iteration.
//
-// GRXWriter is the active dual of NSEnumerator. The difference between them
-// is thus whether the object plays an active or passive role during usage: A
-// user of NSEnumerator pulls values off it, and passes the values to a writeable.
-// A user of GRXWriter, though, just gives it a writeable, and the
-// GRXWriter instance pushes values to the writeable. This makes this protocol
-// suitable to represent a sequence of future values, as well as collections
-// with internal iteration.
+// An instance of GRXWriter can start producing values after a writeable is passed to it. It can
+// also be commanded to finish the sequence immediately (with an optional error). Finally, it can be
+// asked to pause, and resumed later. All GRXWriter objects support pausing and early termination.
//
-// An instance of GRXWriter can start producing values after a writeable is
-// passed to it. It can also be commanded to finish the sequence immediately
-// (with an optional error). Finally, it can be asked to pause, but the
-// conforming instance is not required to oblige.
+// Thread-safety:
//
-// Unless otherwise indicated by a conforming class, no messages should be sent
-// concurrently to a GRXWriter. I.e., conforming classes aren't required to
-// be thread-safe.
+// State transitions take immediate effect if the object is used from a single thread. Subclasses
+// might offer stronger guarantees.
+//
+// Unless otherwise indicated by a conforming subclass, no messages should be sent concurrently to a
+// GRXWriter. I.e., conforming classes aren't required to be thread-safe.
@interface GRXWriter : NSObject
-// This property can be used to query the current state of the writer, which
-// determines how it might currently use its writeable. Some state transitions can
-// be triggered by setting this property to the corresponding value, and that's
-// useful for advanced use cases like pausing an writer. For more details,
-// see the documentation of the enum.
+// This property can be used to query the current state of the writer, which determines how it might
+// currently use its writeable. Some state transitions can be triggered by setting this property to
+// the corresponding value, and that's useful for advanced use cases like pausing an writer. For
+// more details, see the documentation of the enum further down.
@property(nonatomic) GRXWriterState state;
-// Start sending messages to the writeable. Messages may be sent before the method
-// returns, or they may be sent later in the future. See GRXWriteable.h for the
-// different messages a writeable can receive.
+// Transition to the Started state, and start sending messages to the writeable (a reference to it
+// is retained). Messages to the writeable may be sent before the method returns, or they may be
+// sent later in the future. See GRXWriteable.h for the different messages a writeable can receive.
//
-// If this writer draws its values from an external source (e.g. from the
-// filesystem or from a server), calling this method will commonly trigger side
-// effects (like network connections).
+// If this writer draws its values from an external source (e.g. from the filesystem or from a
+// server), calling this method will commonly trigger side effects (like network connections).
//
// This method might only be called on writers in the NotStarted state.
- (void)startWithWriteable:(id<GRXWriteable>)writeable;
-// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send
-// any more messages to it.
-//
-// This method might only be called on writers in the Started or Paused
-// state.
+// Send writesFinishedWithError:errorOrNil to the writeable. Then release the reference to it and
+// transition to the Finished state.
//
-// TODO(jcanizales): Consider adding some guarantee about the immediacy of that
-// stopping. I know I've relied on it in part of the code that uses this, but
-// can't remember the details in the presence of concurrency.
+// This method might only be called on writers in the Started or Paused state.
- (void)finishWithError:(NSError *)errorOrNil;
@end
diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m
index e85dd6e65c..f23102988b 100644
--- a/src/objective-c/tests/GRPCClientTests.m
+++ b/src/objective-c/tests/GRPCClientTests.m
@@ -114,7 +114,7 @@ static ProtoMethod *kUnaryCallMethod;
[call startWithWriteable:responsesWriteable];
- [self waitForExpectationsWithTimeout:4 handler:nil];
+ [self waitForExpectationsWithTimeout:8 handler:nil];
}
- (void)testSimpleProtoRPC {
@@ -146,7 +146,7 @@ static ProtoMethod *kUnaryCallMethod;
[call startWithWriteable:responsesWriteable];
- [self waitForExpectationsWithTimeout:4 handler:nil];
+ [self waitForExpectationsWithTimeout:8 handler:nil];
}
- (void)testMetadata {
diff --git a/src/objective-c/tests/InteropTests.h b/src/objective-c/tests/InteropTests.h
index 4eb97e9e06..1045c3d124 100644
--- a/src/objective-c/tests/InteropTests.h
+++ b/src/objective-c/tests/InteropTests.h
@@ -37,8 +37,7 @@
// https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
@interface InteropTests : XCTestCase
-// Returns @"localhost:5050".
+// Returns @"grpc-test.sandbox.google.com".
// Override in a subclass to perform the same tests against a different address.
-// For interop tests, use @"grpc-test.sandbox.google.com".
+ (NSString *)host;
@end
diff --git a/src/objective-c/tests/InteropTests.m b/src/objective-c/tests/InteropTests.m
index b61d567464..1b63fe2059 100644
--- a/src/objective-c/tests/InteropTests.m
+++ b/src/objective-c/tests/InteropTests.m
@@ -78,20 +78,17 @@
#pragma mark Tests
-static NSString * const kLocalCleartextHost = @"localhost:5050";
+static NSString * const kRemoteSSLHost = @"grpc-test.sandbox.google.com";
@implementation InteropTests {
RMTTestService *_service;
}
+ (NSString *)host {
- return kLocalCleartextHost;
+ return kRemoteSSLHost;
}
- (void)setUp {
- // Register test server as non-SSL.
- [GRPCCall useInsecureConnectionsForHost:kLocalCleartextHost];
-
_service = [[RMTTestService alloc] initWithHost:self.class.host];
}
@@ -131,7 +128,7 @@ static NSString * const kLocalCleartextHost = @"localhost:5050";
[expectation fulfill];
}];
- [self waitForExpectationsWithTimeout:8 handler:nil];
+ [self waitForExpectationsWithTimeout:16 handler:nil];
}
- (void)testClientStreamingRPC {
diff --git a/src/objective-c/tests/InteropTestsLocalCleartext.m b/src/objective-c/tests/InteropTestsLocalCleartext.m
new file mode 100644
index 0000000000..2d7d3c4b2c
--- /dev/null
+++ b/src/objective-c/tests/InteropTestsLocalCleartext.m
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// Repeat of the tests in InteropTests.m, but sending the RPCs to a local cleartext server instead
+// of the remote SSL one.
+
+#import <GRPCClient/GRPCCall+Tests.h>
+
+#import "InteropTests.h"
+
+static NSString * const kLocalCleartextHost = @"localhost:5050";
+
+@interface InteropTestsLocalCleartext : InteropTests
+@end
+
+@implementation InteropTestsLocalCleartext
+
++ (NSString *)host {
+ return kLocalCleartextHost;
+}
+
+- (void)setUp {
+ // Register test server as non-SSL.
+ [GRPCCall useInsecureConnectionsForHost:kLocalCleartextHost];
+
+ [super setUp];
+}
+
+@end
diff --git a/src/objective-c/tests/InteropTestsLocalSSL.m b/src/objective-c/tests/InteropTestsLocalSSL.m
index 227ca79659..f69f806dcf 100644
--- a/src/objective-c/tests/InteropTestsLocalSSL.m
+++ b/src/objective-c/tests/InteropTestsLocalSSL.m
@@ -31,8 +31,8 @@
*
*/
-// Repeat of the tests in InteropTests.m, but using SSL to communicate with the local server instead
-// of cleartext.
+// Repeat of the tests in InteropTests.m, but sending the RPCs to a local SSL server instead of the
+// remote one.
#import <GRPCClient/GRPCCall+Tests.h>
diff --git a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
index af98aba9c0..3a1c3d940a 100644
--- a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
+++ b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
@@ -13,6 +13,7 @@
63423F511B151B77006CF63C /* RxLibraryUnitTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 63423F501B151B77006CF63C /* RxLibraryUnitTests.m */; };
635697CD1B14FC11007A7283 /* Tests.m in Sources */ = {isa = PBXBuildFile; fileRef = 635697CC1B14FC11007A7283 /* Tests.m */; };
635ED2EC1B1A3BC400FDE5C3 /* InteropTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */; };
+ 63715F561B780C020029CB0B /* InteropTestsLocalCleartext.m in Sources */ = {isa = PBXBuildFile; fileRef = 63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */; };
63E240CE1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m in Sources */ = {isa = PBXBuildFile; fileRef = 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */; };
63E240D01B6C63DC005F3B0E /* TestCertificates.bundle in Resources */ = {isa = PBXBuildFile; fileRef = 63E240CF1B6C63DC005F3B0E /* TestCertificates.bundle */; };
7D8A186224D39101F90230F6 /* libPods.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 35F2B6BF3BAE8F0DC4AFD76E /* libPods.a */; };
@@ -51,6 +52,7 @@
635697CC1B14FC11007A7283 /* Tests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = Tests.m; sourceTree = "<group>"; };
635697D81B14FC11007A7283 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTests.m; sourceTree = "<group>"; };
+ 63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTestsLocalCleartext.m; sourceTree = "<group>"; };
63E240CC1B6C4D3A005F3B0E /* InteropTests.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = InteropTests.h; sourceTree = "<group>"; };
63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTestsLocalSSL.m; sourceTree = "<group>"; };
63E240CF1B6C63DC005F3B0E /* TestCertificates.bundle */ = {isa = PBXFileReference; lastKnownFileType = "wrapper.plug-in"; path = TestCertificates.bundle; sourceTree = "<group>"; };
@@ -117,14 +119,15 @@
635697C91B14FC11007A7283 /* Tests */ = {
isa = PBXGroup;
children = (
- 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */,
6312AE4D1B1BF49B00341DEE /* GRPCClientTests.m */,
- 63175DFE1B1B9FAF00027841 /* LocalClearTextTests.m */,
+ 63E240CC1B6C4D3A005F3B0E /* InteropTests.h */,
635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */,
+ 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */,
+ 63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */,
63423F501B151B77006CF63C /* RxLibraryUnitTests.m */,
+ 63175DFE1B1B9FAF00027841 /* LocalClearTextTests.m */,
635697CC1B14FC11007A7283 /* Tests.m */,
635697D71B14FC11007A7283 /* Supporting Files */,
- 63E240CC1B6C4D3A005F3B0E /* InteropTests.h */,
);
name = Tests;
sourceTree = SOURCE_ROOT;
@@ -261,6 +264,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
+ 63715F561B780C020029CB0B /* InteropTestsLocalCleartext.m in Sources */,
63175DFF1B1B9FAF00027841 /* LocalClearTextTests.m in Sources */,
63423F511B151B77006CF63C /* RxLibraryUnitTests.m in Sources */,
63E240CE1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m in Sources */,
diff --git a/src/python/grpcio/grpc/_adapter/_c/module.c b/src/python/grpcio/grpc/_adapter/_c/module.c
index 1f3aedd9d8..9b93b051f6 100644
--- a/src/python/grpcio/grpc/_adapter/_c/module.c
+++ b/src/python/grpcio/grpc/_adapter/_c/module.c
@@ -53,6 +53,12 @@ PyMODINIT_FUNC init_c(void) {
return;
}
+ if (PyModule_AddStringConstant(
+ module, "PRIMARY_USER_AGENT_KEY",
+ GRPC_ARG_PRIMARY_USER_AGENT_STRING) < 0) {
+ return;
+ }
+
/* GRPC maintains an internal counter of how many times it has been
initialized and handles multiple pairs of grpc_init()/grpc_shutdown()
invocations accordingly. */
diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py
index dcf67dbc11..239aac81b2 100644
--- a/src/python/grpcio/grpc/_adapter/_low.py
+++ b/src/python/grpcio/grpc/_adapter/_low.py
@@ -27,9 +27,12 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+from grpc import _grpcio_metadata
from grpc._adapter import _c
from grpc._adapter import _types
+_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
+
ClientCredentials = _c.ClientCredentials
ServerCredentials = _c.ServerCredentials
@@ -76,6 +79,7 @@ class Call(_types.Call):
class Channel(_types.Channel):
def __init__(self, target, args, creds=None):
+ args = list(args) + [(_c.PRIMARY_USER_AGENT_KEY, _USER_AGENT)]
if creds is None:
self.channel = _c.Channel(target, args)
else:
diff --git a/src/python/grpcio_test/grpc_interop/_interop_test_case.py b/src/python/grpcio_test/grpc_interop/_interop_test_case.py
index ed8f7ef009..b6d06b300d 100644
--- a/src/python/grpcio_test/grpc_interop/_interop_test_case.py
+++ b/src/python/grpcio_test/grpc_interop/_interop_test_case.py
@@ -59,3 +59,6 @@ class InteropTestCase(object):
def testCancelAfterFirstResponse(self):
methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(self.stub, None)
+
+ def testTimeoutOnSleepingServer(self):
+ methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability(self.stub, None)
diff --git a/src/python/grpcio_test/grpc_interop/methods.py b/src/python/grpcio_test/grpc_interop/methods.py
index f4c94685ee..7a831f3cbd 100644
--- a/src/python/grpcio_test/grpc_interop/methods.py
+++ b/src/python/grpcio_test/grpc_interop/methods.py
@@ -33,10 +33,12 @@ import enum
import json
import os
import threading
+import time
from oauth2client import client as oauth2client_client
from grpc.framework.alpha import utilities
+from grpc.framework.alpha import exceptions
from grpc_interop import empty_pb2
from grpc_interop import messages_pb2
@@ -318,6 +320,24 @@ def _cancel_after_first_response(stub):
raise ValueError('expected call to be cancelled')
+def _timeout_on_sleeping_server(stub):
+ request_payload_size = 27182
+ with stub, _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe, 0.001)
+
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ payload=messages_pb2.Payload(body=b'\x00' * request_payload_size))
+ pipe.add(request)
+ time.sleep(0.1)
+ try:
+ next(response_iterator)
+ except exceptions.ExpirationError:
+ pass
+ else:
+ raise ValueError('expected call to exceed deadline')
+
+
def _compute_engine_creds(stub, args):
response = _large_unary_common_behavior(stub, True, True)
if args.default_service_account != response.username:
@@ -351,6 +371,7 @@ class TestCase(enum.Enum):
CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
SERVICE_ACCOUNT_CREDS = 'service_account_creds'
+ TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
def test_interoperability(self, stub, args):
if self is TestCase.EMPTY_UNARY:
@@ -367,6 +388,8 @@ class TestCase(enum.Enum):
_cancel_after_begin(stub)
elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE:
_cancel_after_first_response(stub)
+ elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER:
+ _timeout_on_sleeping_server(stub)
elif self is TestCase.COMPUTE_ENGINE_CREDS:
_compute_engine_creds(stub, args)
elif self is TestCase.SERVICE_ACCOUNT_CREDS:
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/__init__.py b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py
new file mode 100644
index 0000000000..b200d129a9
--- /dev/null
+++ b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py
@@ -0,0 +1,541 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+import contextlib
+import distutils.spawn
+import errno
+import itertools
+import os
+import pkg_resources
+import shutil
+import subprocess
+import sys
+import tempfile
+import threading
+import time
+import unittest
+
+from grpc.framework.alpha import exceptions
+from grpc.framework.foundation import future
+
+# Identifiers of entities we expect to find in the generated module.
+SERVICER_IDENTIFIER = 'EarlyAdopterTestServiceServicer'
+SERVER_IDENTIFIER = 'EarlyAdopterTestServiceServer'
+STUB_IDENTIFIER = 'EarlyAdopterTestServiceStub'
+SERVER_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_server'
+STUB_FACTORY_IDENTIFIER = 'early_adopter_create_TestService_stub'
+
+# The timeout used in tests of RPCs that are supposed to expire.
+SHORT_TIMEOUT = 2
+# The timeout used in tests of RPCs that are not supposed to expire. The
+# absurdly large value doesn't matter since no passing execution of this test
+# module will ever wait the duration.
+LONG_TIMEOUT = 600
+NO_DELAY = 0
+
+
+class _ServicerMethods(object):
+
+ def __init__(self, test_pb2, delay):
+ self._condition = threading.Condition()
+ self._delay = delay
+ self._paused = False
+ self._fail = False
+ self._test_pb2 = test_pb2
+
+ @contextlib.contextmanager
+ def pause(self): # pylint: disable=invalid-name
+ with self._condition:
+ self._paused = True
+ yield
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
+
+ @contextlib.contextmanager
+ def fail(self): # pylint: disable=invalid-name
+ with self._condition:
+ self._fail = True
+ yield
+ with self._condition:
+ self._fail = False
+
+ def _control(self): # pylint: disable=invalid-name
+ with self._condition:
+ if self._fail:
+ raise ValueError()
+ while self._paused:
+ self._condition.wait()
+ time.sleep(self._delay)
+
+ def UnaryCall(self, request, unused_rpc_context):
+ response = self._test_pb2.SimpleResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * request.response_size
+ self._control()
+ return response
+
+ def StreamingOutputCall(self, request, unused_rpc_context):
+ for parameter in request.response_parameters:
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * parameter.size
+ self._control()
+ yield response
+
+ def StreamingInputCall(self, request_iter, unused_rpc_context):
+ response = self._test_pb2.StreamingInputCallResponse()
+ aggregated_payload_size = 0
+ for request in request_iter:
+ aggregated_payload_size += len(request.payload.payload_compressable)
+ response.aggregated_payload_size = aggregated_payload_size
+ self._control()
+ return response
+
+ def FullDuplexCall(self, request_iter, unused_rpc_context):
+ for request in request_iter:
+ for parameter in request.response_parameters:
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * parameter.size
+ self._control()
+ yield response
+
+ def HalfDuplexCall(self, request_iter, unused_rpc_context):
+ responses = []
+ for request in request_iter:
+ for parameter in request.response_parameters:
+ response = self._test_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response.payload.payload_compressable = 'a' * parameter.size
+ self._control()
+ responses.append(response)
+ for response in responses:
+ yield response
+
+
+@contextlib.contextmanager
+def _CreateService(test_pb2, delay):
+ """Provides a servicer backend and a stub.
+
+ The servicer is just the implementation
+ of the actual servicer passed to the face player of the python RPC
+ implementation; the two are detached.
+
+ Non-zero delay puts a delay on each call to the servicer, representative of
+ communication latency. Timeout is the default timeout for the stub while
+ waiting for the service.
+
+ Args:
+ test_pb2: The test_pb2 module generated by this test.
+ delay: Delay in seconds per response from the servicer.
+
+ Yields:
+ A (servicer_methods, servicer, stub) three-tuple where servicer_methods is
+ the back-end of the service bound to the stub and the server and stub
+ are both activated and ready for use.
+ """
+ servicer_methods = _ServicerMethods(test_pb2, delay)
+
+ class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+
+ def UnaryCall(self, request, context):
+ return servicer_methods.UnaryCall(request, context)
+
+ def StreamingOutputCall(self, request, context):
+ return servicer_methods.StreamingOutputCall(request, context)
+
+ def StreamingInputCall(self, request_iter, context):
+ return servicer_methods.StreamingInputCall(request_iter, context)
+
+ def FullDuplexCall(self, request_iter, context):
+ return servicer_methods.FullDuplexCall(request_iter, context)
+
+ def HalfDuplexCall(self, request_iter, context):
+ return servicer_methods.HalfDuplexCall(request_iter, context)
+
+ servicer = Servicer()
+ server = getattr(
+ test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer, 0)
+ with server:
+ port = server.port()
+ stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)('localhost', port)
+ with stub:
+ yield servicer_methods, stub, server
+
+
+def _streaming_input_request_iterator(test_pb2):
+ for _ in range(3):
+ request = test_pb2.StreamingInputCallRequest()
+ request.payload.payload_type = test_pb2.COMPRESSABLE
+ request.payload.payload_compressable = 'a'
+ yield request
+
+
+def _streaming_output_request(test_pb2):
+ request = test_pb2.StreamingOutputCallRequest()
+ sizes = [1, 2, 3]
+ request.response_parameters.add(size=sizes[0], interval_us=0)
+ request.response_parameters.add(size=sizes[1], interval_us=0)
+ request.response_parameters.add(size=sizes[2], interval_us=0)
+ return request
+
+
+def _full_duplex_request_iterator(test_pb2):
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=1, interval_us=0)
+ yield request
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=2, interval_us=0)
+ request.response_parameters.add(size=3, interval_us=0)
+ yield request
+
+
+class PythonPluginTest(unittest.TestCase):
+ """Test case for the gRPC Python protoc-plugin.
+
+ While reading these tests, remember that the futures API
+ (`stub.method.async()`) only gives futures for the *non-streaming* responses,
+ else it behaves like its blocking cousin.
+ """
+
+ def setUp(self):
+ # Assume that the appropriate protoc and grpc_python_plugins are on the
+ # path.
+ protoc_command = 'protoc'
+ protoc_plugin_filename = distutils.spawn.find_executable(
+ 'grpc_python_plugin')
+ test_proto_filename = pkg_resources.resource_filename(
+ 'grpc_protoc_plugin', 'test.proto')
+ if not os.path.isfile(protoc_command):
+ # Assume that if we haven't built protoc that it's on the system.
+ protoc_command = 'protoc'
+
+ # Ensure that the output directory exists.
+ self.outdir = tempfile.mkdtemp()
+
+ # Invoke protoc with the plugin.
+ cmd = [
+ protoc_command,
+ '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
+ '-I .',
+ '--python_out=%s' % self.outdir,
+ '--python-grpc_out=%s' % self.outdir,
+ os.path.basename(test_proto_filename),
+ ]
+ subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
+ cwd=os.path.dirname(test_proto_filename))
+ sys.path.append(self.outdir)
+
+ def tearDown(self):
+ try:
+ shutil.rmtree(self.outdir)
+ except OSError as exc:
+ if exc.errno != errno.ENOENT:
+ raise
+
+ # TODO(atash): Figure out which of these tests is hanging flakily with small
+ # probability.
+
+ def testImportAttributes(self):
+ # check that we can access the generated module and its members.
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, SERVER_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
+ self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))
+
+ def testUpDown(self):
+ import test_pb2
+ with _CreateService(
+ test_pb2, NO_DELAY) as (servicer, stub, unused_server):
+ request = test_pb2.SimpleRequest(response_size=13)
+
+ def testUnaryCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
+ request = test_pb2.SimpleRequest(response_size=13)
+ response = stub.UnaryCall(request, timeout)
+ expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testUnaryCallAsync(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ # Check that the call does not block waiting for the server to respond.
+ with methods.pause():
+ response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
+ response = response_future.result()
+ expected_response = methods.UnaryCall(request, 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testUnaryCallAsyncExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ request = test_pb2.SimpleRequest(response_size=13)
+ with methods.pause():
+ response_future = stub.UnaryCall.async(request, SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ response_future.result()
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testUnaryCallAsyncCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ response_future = stub.UnaryCall.async(request, 1)
+ response_future.cancel()
+ self.assertTrue(response_future.cancelled())
+
+ def testUnaryCallAsyncFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ response_future = stub.UnaryCall.async(request, LONG_TIMEOUT)
+ self.assertIsNotNone(response_future.exception())
+
+ def testStreamingOutputCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ responses = stub.StreamingOutputCall(request, LONG_TIMEOUT)
+ expected_responses = methods.StreamingOutputCall(
+ request, 'not a real RpcContext!')
+ for expected_response, response in itertools.izip_longest(
+ expected_responses, responses):
+ self.assertEqual(expected_response, response)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testStreamingOutputCallExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ list(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testStreamingOutputCallCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ unused_methods, stub, unused_server):
+ responses = stub.StreamingOutputCall(request, SHORT_TIMEOUT)
+ next(responses)
+ responses.cancel()
+ with self.assertRaises(future.CancelledError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this times out '
+ 'instead of raising the proper error.')
+ def testStreamingOutputCallFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request = _streaming_output_request(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ responses = stub.StreamingOutputCall(request, 1)
+ self.assertIsNotNone(responses)
+ with self.assertRaises(exceptions.ServicerError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testStreamingInputCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ response = stub.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
+ expected_response = methods.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testStreamingInputCallAsync(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), LONG_TIMEOUT)
+ response = response_future.result()
+ expected_response = methods.StreamingInputCall(
+ _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ self.assertEqual(expected_response, response)
+
+ def testStreamingInputCallAsyncExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ response_future.result()
+ self.assertIsInstance(
+ response_future.exception(), exceptions.ExpirationError)
+
+ def testStreamingInputCallAsyncCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ timeout = 6 # TODO(issue 2039): LONG_TIMEOUT like the other methods.
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), timeout)
+ response_future.cancel()
+ self.assertTrue(response_future.cancelled())
+ with self.assertRaises(future.CancelledError):
+ response_future.result()
+
+ def testStreamingInputCallAsyncFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ response_future = stub.StreamingInputCall.async(
+ _streaming_input_request_iterator(test_pb2), SHORT_TIMEOUT)
+ self.assertIsNotNone(response_future.exception())
+
+ def testFullDuplexCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ responses = stub.FullDuplexCall(
+ _full_duplex_request_iterator(test_pb2), LONG_TIMEOUT)
+ expected_responses = methods.FullDuplexCall(
+ _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
+ for expected_response, response in itertools.izip_longest(
+ expected_responses, responses):
+ self.assertEqual(expected_response, response)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testFullDuplexCallExpired(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.pause():
+ responses = stub.FullDuplexCall(request_iterator, SHORT_TIMEOUT)
+ with self.assertRaises(exceptions.ExpirationError):
+ list(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testFullDuplexCallCancelled(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
+ next(responses)
+ responses.cancel()
+ with self.assertRaises(future.CancelledError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this hangs forever '
+ 'and fix.')
+ def testFullDuplexCallFailed(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ with methods.fail():
+ responses = stub.FullDuplexCall(request_iterator, LONG_TIMEOUT)
+ self.assertIsNotNone(responses)
+ with self.assertRaises(exceptions.ServicerError):
+ next(responses)
+
+ @unittest.skip('TODO(atash,nathaniel): figure out why this flakily hangs '
+ 'forever and fix.')
+ def testHalfDuplexCall(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ with _CreateService(test_pb2, NO_DELAY) as (
+ methods, stub, unused_server):
+ def half_duplex_request_iterator():
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=1, interval_us=0)
+ yield request
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=2, interval_us=0)
+ request.response_parameters.add(size=3, interval_us=0)
+ yield request
+ responses = stub.HalfDuplexCall(
+ half_duplex_request_iterator(), LONG_TIMEOUT)
+ expected_responses = methods.HalfDuplexCall(
+ half_duplex_request_iterator(), 'not a real RpcContext!')
+ for check in itertools.izip_longest(expected_responses, responses):
+ expected_response, response = check
+ self.assertEqual(expected_response, response)
+
+ def testHalfDuplexCallWedged(self):
+ import test_pb2 # pylint: disable=g-import-not-at-top
+ condition = threading.Condition()
+ wait_cell = [False]
+ @contextlib.contextmanager
+ def wait(): # pylint: disable=invalid-name
+ # Where's Python 3's 'nonlocal' statement when you need it?
+ with condition:
+ wait_cell[0] = True
+ yield
+ with condition:
+ wait_cell[0] = False
+ condition.notify_all()
+ def half_duplex_request_iterator():
+ request = test_pb2.StreamingOutputCallRequest()
+ request.response_parameters.add(size=1, interval_us=0)
+ yield request
+ with condition:
+ while wait_cell[0]:
+ condition.wait()
+ with _CreateService(test_pb2, NO_DELAY) as (methods, stub, unused_server):
+ with wait():
+ responses = stub.HalfDuplexCall(
+ half_duplex_request_iterator(), SHORT_TIMEOUT)
+ # half-duplex waits for the client to send all info
+ with self.assertRaises(exceptions.ExpirationError):
+ next(responses)
+
+
+if __name__ == '__main__':
+ os.chdir(os.path.dirname(sys.argv[0]))
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/test.proto b/src/python/grpcio_test/grpc_protoc_plugin/test.proto
new file mode 100644
index 0000000000..ed7c6a7b79
--- /dev/null
+++ b/src/python/grpcio_test/grpc_protoc_plugin/test.proto
@@ -0,0 +1,139 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+// An integration test service that covers all the method signature permutations
+// of unary/streaming requests/responses.
+// This file is duplicated around the code base. See GitHub issue #526.
+syntax = "proto2";
+
+package grpc.testing;
+
+enum PayloadType {
+ // Compressable text format.
+ COMPRESSABLE= 1;
+
+ // Uncompressable binary format.
+ UNCOMPRESSABLE = 2;
+
+ // Randomly chosen from all other formats defined in this enum.
+ RANDOM = 3;
+}
+
+message Payload {
+ required PayloadType payload_type = 1;
+ oneof payload_body {
+ string payload_compressable = 2;
+ bytes payload_uncompressable = 3;
+ }
+}
+
+message SimpleRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, server randomly chooses one from other formats.
+ optional PayloadType response_type = 1 [default=COMPRESSABLE];
+
+ // Desired payload size in the response from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ optional int32 response_size = 2;
+
+ // Optional input payload sent along with the request.
+ optional Payload payload = 3;
+}
+
+message SimpleResponse {
+ optional Payload payload = 1;
+}
+
+message StreamingInputCallRequest {
+ // Optional input payload sent along with the request.
+ optional Payload payload = 1;
+
+ // Not expecting any payload from the response.
+}
+
+message StreamingInputCallResponse {
+ // Aggregated size of payloads received from the client.
+ optional int32 aggregated_payload_size = 1;
+}
+
+message ResponseParameters {
+ // Desired payload sizes in responses from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ required int32 size = 1;
+
+ // Desired interval between consecutive responses in the response stream in
+ // microseconds.
+ required int32 interval_us = 2;
+}
+
+message StreamingOutputCallRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, the payload from each response in the stream
+ // might be of different types. This is to simulate a mixed type of payload
+ // stream.
+ optional PayloadType response_type = 1 [default=COMPRESSABLE];
+
+ repeated ResponseParameters response_parameters = 2;
+
+ // Optional input payload sent along with the request.
+ optional Payload payload = 3;
+}
+
+message StreamingOutputCallResponse {
+ optional Payload payload = 1;
+}
+
+service TestService {
+ // One request followed by one response.
+ // The server returns the client payload as-is.
+ rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
+
+ // One request followed by a sequence of responses (streamed download).
+ // The server returns the payload with client desired type and sizes.
+ rpc StreamingOutputCall(StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+
+ // A sequence of requests followed by one response (streamed upload).
+ // The server returns the aggregated size of client payload as the result.
+ rpc StreamingInputCall(stream StreamingInputCallRequest)
+ returns (StreamingInputCallResponse);
+
+ // A sequence of requests with each request served by the server immediately.
+ // As one request could lead to multiple responses, this interface
+ // demonstrates the idea of full duplexing.
+ rpc FullDuplexCall(stream StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+
+ // A sequence of requests followed by a sequence of responses.
+ // The server buffers all the client requests and then serves them in order. A
+ // stream of responses are returned to the client when the server starts with
+ // first request.
+ rpc HalfDuplexCall(stream StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+}
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
index 9a8edfad0c..b6583662f3 100644
--- a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
+++ b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
@@ -31,11 +31,12 @@ import threading
import time
import unittest
+from grpc import _grpcio_metadata
from grpc._adapter import _types
from grpc._adapter import _low
-def WaitForEvents(completion_queues, deadline):
+def wait_for_events(completion_queues, deadline):
"""
Args:
completion_queues: list of completion queues to wait for events on
@@ -62,6 +63,7 @@ def WaitForEvents(completion_queues, deadline):
thread.join()
return results
+
class InsecureServerInsecureClient(unittest.TestCase):
def setUp(self):
@@ -123,16 +125,21 @@ class InsecureServerInsecureClient(unittest.TestCase):
], client_call_tag)
self.assertEquals(_types.CallError.OK, client_start_batch_result)
- client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
+ client_no_event, request_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
self.assertEquals(client_no_event, None)
self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type)
self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag)
self.assertEquals(1, len(request_event.results))
- got_initial_metadata = dict(request_event.results[0].initial_metadata)
+ received_initial_metadata = dict(request_event.results[0].initial_metadata)
+ # Check that our metadata were transmitted
self.assertEquals(
dict(client_initial_metadata),
- dict((x, got_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
+ dict((x, received_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
+ # Check that Python's user agent string is a part of the full user agent
+ # string
+ self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__),
+ received_initial_metadata['user-agent'])
self.assertEquals(METHOD, request_event.call_details.method)
self.assertEquals(HOST, request_event.call_details.host)
self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE)
@@ -150,7 +157,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
], server_call_tag)
self.assertEquals(_types.CallError.OK, server_start_batch_result)
- client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
+ client_event, server_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
self.assertEquals(6, len(client_event.results))
found_client_op_types = set()
diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py
index 925c32720f..a6203cae2d 100644
--- a/src/python/grpcio_test/setup.py
+++ b/src/python/grpcio_test/setup.py
@@ -48,8 +48,13 @@ _PACKAGE_DIRECTORIES = {
_PACKAGE_DATA = {
'grpc_interop': [
- 'credentials/ca.pem', 'credentials/server1.key',
- 'credentials/server1.pem',]
+ 'credentials/ca.pem',
+ 'credentials/server1.key',
+ 'credentials/server1.pem',
+ ],
+ 'grpc_protoc_plugin': [
+ 'test.proto',
+ ],
}
_SETUP_REQUIRES = (
@@ -75,5 +80,5 @@ setuptools.setup(
package_data=_PACKAGE_DATA,
install_requires=_INSTALL_REQUIRES + _SETUP_REQUIRES,
setup_requires=_SETUP_REQUIRES,
- cmdclass=_COMMAND_CLASS
+ cmdclass=_COMMAND_CLASS,
)
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index ec6cb912f1..70f0795f29 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -179,6 +179,19 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
return Qnil;
}
+/* Called to obtain the peer that this call is connected to. */
+static VALUE grpc_rb_call_get_peer(VALUE self) {
+ VALUE res = Qnil;
+ grpc_call *call = NULL;
+ char *peer = NULL;
+ TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+ peer = grpc_call_get_peer(call);
+ res = rb_str_new2(peer);
+ gpr_free(peer);
+
+ return res;
+}
+
/*
call-seq:
status = call.status
@@ -720,6 +733,7 @@ void Init_grpc_call() {
/* Add ruby analogues of the Call methods. */
rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
+ rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0);
rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index a80e484fe1..01762980fc 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -37,6 +37,7 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
+#include <grpc/support/alloc.h>
#include "rb_grpc.h"
#include "rb_call.h"
#include "rb_channel_args.h"
@@ -250,6 +251,21 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
return Qnil;
}
+
+/* Called to obtain the target that this channel accesses. */
+static VALUE grpc_rb_channel_get_target(VALUE self) {
+ grpc_rb_channel *wrapper = NULL;
+ VALUE res = Qnil;
+ char* target = NULL;
+
+ TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
+ target = grpc_channel_get_target(wrapper->wrapped);
+ res = rb_str_new2(target);
+ gpr_free(target);
+
+ return res;
+}
+
void Init_grpc_channel() {
grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
grpc_rb_cChannel =
@@ -266,6 +282,7 @@ void Init_grpc_channel() {
/* Add ruby analogues of the Channel methods. */
rb_define_method(grpc_rb_cChannel, "create_call",
grpc_rb_channel_create_call, 4);
+ rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
rb_define_alias(grpc_rb_cChannel, "close", "destroy");
diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec
index dd4e27df51..45f31329e9 100755
--- a/src/ruby/grpc.gemspec
+++ b/src/ruby/grpc.gemspec
@@ -16,12 +16,15 @@ Gem::Specification.new do |s|
s.required_ruby_version = '>= 2.0.0'
s.requirements << 'libgrpc ~> 0.10.0 needs to be installed'
- s.files = `git ls-files`.split("\n")
- s.test_files = `git ls-files -- spec/*`.split("\n")
- s.executables = `git ls-files -- bin/*.rb`.split("\n").map do |f|
- File.basename(f)
+ s.files = %w( Rakefile )
+ s.files += Dir.glob('lib/**/*')
+ s.files += Dir.glob('ext/**/*')
+ s.files += Dir.glob('bin/**/*')
+ s.test_files = Dir.glob('spec/**/*')
+ %w(math noproto).each do |b|
+ s.executables += [ "#{b}_client.rb", "#{b}_server.rb" ]
end
- s.require_paths = ['lib']
+ s.require_paths = %w( bin lib )
s.platform = Gem::Platform::RUBY
s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1'
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index 7b2c04aa22..745eab437e 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -28,6 +28,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc/generic/active_call'
+require 'grpc/version'
# GRPC contains the General RPC module.
module GRPC
@@ -36,8 +37,8 @@ module GRPC
include Core::StatusCodes
include Core::TimeConsts
- # Default timeout is 5 seconds.
- DEFAULT_TIMEOUT = 5
+ # Default timeout is infinity.
+ DEFAULT_TIMEOUT = INFINITE_FUTURE
# setup_channel is used by #initialize to constuct a channel from its
# arguments.
@@ -46,6 +47,7 @@ module GRPC
fail(TypeError, '!Channel') unless alt_chan.is_a?(Core::Channel)
return alt_chan
end
+ kw['grpc.primary_user_agent'] = "grpc-ruby/#{VERSION}"
return Core::Channel.new(host, kw) if creds.nil?
fail(TypeError, '!Credentials') unless creds.is_a?(Core::Credentials)
Core::Channel.new(host, kw, creds)
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index 0e85441209..ed8032517b 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -69,6 +69,23 @@ shared_examples 'basic GRPC message delivery is OK' do
include GRPC::Core
include_context 'setup: tags'
+ context 'the test channel' do
+ it 'should have a target' do
+ expect(@ch.target).to be_a(String)
+ end
+ end
+
+ context 'a client call' do
+ it 'should have a peer' do
+ expect(new_client_call.peer).to be_a(String)
+ end
+ end
+
+ it 'calls have peer info' do
+ call = new_client_call
+ expect(call.peer).to be_a(String)
+ end
+
it 'servers receive requests from clients and can respond' do
call = new_client_call
server_call = nil