aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD2
-rw-r--r--INSTALL1
-rw-r--r--Makefile4
-rw-r--r--build.json3
-rw-r--r--doc/health-checking.md70
-rw-r--r--include/grpc++/auth_context.h32
-rw-r--r--include/grpc/grpc.h39
-rw-r--r--src/core/channel/client_channel.c1
-rw-r--r--src/core/security/google_default_credentials.c6
-rw-r--r--src/core/surface/call.c14
-rw-r--r--src/core/surface/secure_channel_create.c4
-rw-r--r--src/core/surface/version.c2
-rw-r--r--src/core/tsi/transport_security_interface.h2
-rw-r--r--src/cpp/client/secure_credentials.cc9
-rw-r--r--src/cpp/common/auth_property_iterator.cc2
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs296
-rw-r--r--src/csharp/Grpc.Core.Tests/CompressionTest.cs128
-rw-r--r--src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs122
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj4
-rw-r--r--src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/MockServiceHelper.cs248
-rw-r--r--src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs136
-rw-r--r--src/csharp/Grpc.Core.Tests/TimeoutsTest.cs152
-rw-r--r--src/csharp/Grpc.Core/CallOptions.cs37
-rw-r--r--src/csharp/Grpc.Core/CompressionLevel.cs63
-rw-r--r--src/csharp/Grpc.Core/ContextPropagationToken.cs139
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj3
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs12
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamReader.cs2
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamWriter.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs64
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs35
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs41
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs6
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs23
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs31
-rw-r--r--src/csharp/Grpc.Core/Metadata.cs10
-rw-r--r--src/csharp/Grpc.Core/ServerCallContext.cs56
-rw-r--r--src/csharp/Grpc.Core/Version.cs2
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs2
-rw-r--r--src/csharp/Grpc.Core/WriteOptions.cs82
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs24
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs22
-rw-r--r--src/csharp/build_packages.bat4
-rw-r--r--src/csharp/doc/README.md2
-rw-r--r--src/csharp/doc/grpc_csharp_public.shfbproj70
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c76
-rw-r--r--src/node/interop/interop_client.js16
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m41
-rw-r--r--src/objective-c/GRPCClient/private/GRPCSecureChannel.m13
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h9
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.h10
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.m6
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateWriter.h13
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.h91
-rw-r--r--src/objective-c/tests/GRPCClientTests.m4
-rw-r--r--src/objective-c/tests/InteropTests.h3
-rw-r--r--src/objective-c/tests/InteropTests.m9
-rw-r--r--src/objective-c/tests/InteropTestsLocalCleartext.m (renamed from include/grpc++/auth_property_iterator.h)52
-rw-r--r--src/objective-c/tests/InteropTestsLocalSSL.m4
-rw-r--r--src/objective-c/tests/Tests.xcodeproj/project.pbxproj10
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/module.c6
-rw-r--r--src/python/grpcio/grpc/_adapter/_low.py4
-rw-r--r--src/python/grpcio_test/grpc_interop/_interop_test_case.py3
-rw-r--r--src/python/grpcio_test/grpc_interop/methods.py23
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py (renamed from test/compiler/python_plugin_test.py)20
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/test.proto (renamed from test/compiler/test.proto)0
-rw-r--r--src/python/grpcio_test/grpc_test/_adapter/_low_test.py17
-rw-r--r--src/python/grpcio_test/setup.py11
-rw-r--r--src/ruby/ext/grpc/rb_call.c14
-rw-r--r--src/ruby/ext/grpc/rb_channel.c17
-rwxr-xr-xsrc/ruby/grpc.gemspec13
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb6
-rw-r--r--src/ruby/spec/client_server_spec.rb17
-rw-r--r--tools/doxygen/Doxyfile.c++3
-rw-r--r--tools/doxygen/Doxyfile.c++.internal3
-rw-r--r--tools/doxygen/Doxyfile.core2
-rw-r--r--tools/doxygen/Doxyfile.core.internal2
-rwxr-xr-xtools/run_tests/run_interops.py18
-rwxr-xr-xtools/run_tests/run_interops_build.sh36
-rwxr-xr-xtools/run_tests/run_interops_test.sh11
-rwxr-xr-xtools/run_tests/run_python.sh1
-rw-r--r--tools/run_tests/sources_and_headers.json4
-rw-r--r--vsprojects/grpc++/grpc++.vcxproj1
-rw-r--r--vsprojects/grpc++/grpc++.vcxproj.filters3
-rw-r--r--vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj1
-rw-r--r--vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters3
91 files changed, 2045 insertions, 639 deletions
diff --git a/BUILD b/BUILD
index dcabd648e4..9eaabbbccd 100644
--- a/BUILD
+++ b/BUILD
@@ -690,7 +690,6 @@ cc_library(
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
- "include/grpc++/auth_property_iterator.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
@@ -778,7 +777,6 @@ cc_library(
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
- "include/grpc++/auth_property_iterator.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
diff --git a/INSTALL b/INSTALL
index 8a0a98ad2e..808166dfed 100644
--- a/INSTALL
+++ b/INSTALL
@@ -132,6 +132,7 @@ We will also need to make openssl and install it appropriately
$ cd <git directory>
$ cd third_party/openssl
+ $ ./config
$ sudo make install
$ cd ../../
diff --git a/Makefile b/Makefile
index 181194f78f..46e23db5ea 100644
--- a/Makefile
+++ b/Makefile
@@ -313,7 +313,7 @@ E = @echo
Q = @
endif
-VERSION = 0.10.0.0
+VERSION = 0.10.1.0
CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES))
CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS)
@@ -4484,7 +4484,6 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/async_generic_service.h \
include/grpc++/async_unary_call.h \
include/grpc++/auth_context.h \
- include/grpc++/auth_property_iterator.h \
include/grpc++/byte_buffer.h \
include/grpc++/channel_arguments.h \
include/grpc++/channel_interface.h \
@@ -4728,7 +4727,6 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/async_generic_service.h \
include/grpc++/async_unary_call.h \
include/grpc++/auth_context.h \
- include/grpc++/auth_property_iterator.h \
include/grpc++/byte_buffer.h \
include/grpc++/channel_arguments.h \
include/grpc++/channel_interface.h \
diff --git a/build.json b/build.json
index 515cecdc5a..896cbee8c8 100644
--- a/build.json
+++ b/build.json
@@ -7,7 +7,7 @@
"version": {
"major": 0,
"minor": 10,
- "micro": 0,
+ "micro": 1,
"build": 0
}
},
@@ -33,7 +33,6 @@
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
- "include/grpc++/auth_property_iterator.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
diff --git a/doc/health-checking.md b/doc/health-checking.md
new file mode 100644
index 0000000000..0b3f9c6a03
--- /dev/null
+++ b/doc/health-checking.md
@@ -0,0 +1,70 @@
+GRPC Health Checking Protocol
+================================
+
+Health checks are used to probe whether the server is able to handle rpcs. The
+client-to-server health checking can happen from point to point or via some
+control system. A server may choose to reply “unhealthy” because it
+is not ready to take requests, it is shutting down or some other reason.
+The client can act accordingly if the response is not received within some time
+window or the response says unhealthy in it.
+
+
+A GRPC service is used as the health checking mechanism for both simple
+client-to-server scenario and other control systems such as load-balancing.
+Being a high
+level service provides some benefits. Firstly, since it is a GRPC service
+itself, doing a health check is in the same format as a normal rpc. Secondly,
+it has rich semantics such as per-service health status. Thirdly, as a GRPC
+service, it is able reuse all the existing billing, quota infrastructure, etc,
+and thus the server has full control over the access of the health checking
+service.
+
+## Service Definition
+
+The server should export a service defined in the following proto:
+
+```
+syntax = "proto3";
+
+package grpc.health.v1alpha;
+
+message HealthCheckRequest {
+ string service = 1;
+}
+
+message HealthCheckResponse {
+ enum ServingStatus {
+ UNKNOWN = 0;
+ SERVING = 1;
+ NOT_SERVING = 2;
+ }
+ ServingStatus status = 1;
+}
+
+service Health {
+ rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
+}
+```
+
+A client can query the server’s health status by calling the `Check` method, and
+a deadline should be set on the rpc. The client can optionally set the service
+name it wants to query for health status. The suggested format of service name
+is `package_names.ServiceName`, such as `grpc.health.v1alpha.Health`.
+
+The server should register all the services manually and set
+the individual status, including an empty service name and its status. For each
+request received, if the service name can be found in the registry,
+a response must be sent back with an `OK` status and the status field should be
+set to `SERVING` or `NOT_SERVING` accordingly. If the service name is not
+registered, the server returns a `NOT_FOUND` GRPC status.
+
+The server should use an empty string as the key for server’s
+overall health status, so that a client not interested in a specific service can
+query the server's status with an empty request. The server can just do exact
+matching of the service name without support of any kind of wildcard matching.
+However, the service owner has the freedom to implement more complicated
+matching semantics that both the client and server agree upon.
+
+A client can declare the server as unhealthy if the rpc is not finished after
+some amount of time. The client should be able to handle the case where server
+does not have the Health service.
diff --git a/include/grpc++/auth_context.h b/include/grpc++/auth_context.h
index c42105b927..f8ea8ad6f4 100644
--- a/include/grpc++/auth_context.h
+++ b/include/grpc++/auth_context.h
@@ -34,12 +34,42 @@
#ifndef GRPCXX_AUTH_CONTEXT_H
#define GRPCXX_AUTH_CONTEXT_H
+#include <iterator>
#include <vector>
-#include <grpc++/auth_property_iterator.h>
#include <grpc++/config.h>
+struct grpc_auth_context;
+struct grpc_auth_property;
+struct grpc_auth_property_iterator;
+
namespace grpc {
+class SecureAuthContext;
+
+typedef std::pair<grpc::string, grpc::string> AuthProperty;
+
+class AuthPropertyIterator
+ : public std::iterator<std::input_iterator_tag, const AuthProperty> {
+ public:
+ ~AuthPropertyIterator();
+ AuthPropertyIterator& operator++();
+ AuthPropertyIterator operator++(int);
+ bool operator==(const AuthPropertyIterator& rhs) const;
+ bool operator!=(const AuthPropertyIterator& rhs) const;
+ const AuthProperty operator*();
+
+ protected:
+ AuthPropertyIterator();
+ AuthPropertyIterator(const grpc_auth_property* property,
+ const grpc_auth_property_iterator* iter);
+ private:
+ friend class SecureAuthContext;
+ const grpc_auth_property* property_;
+ // The following items form a grpc_auth_property_iterator.
+ const grpc_auth_context* ctx_;
+ size_t index_;
+ const char* name_;
+};
class AuthContext {
public:
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 2471a0937e..f104648011 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -181,7 +181,9 @@ typedef enum grpc_call_error {
GRPC_CALL_ERROR_INVALID_MESSAGE,
/** completion queue for notification has not been registered with the
server */
- GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE
+ GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE,
+ /** this batch of operations leads to more operations than allowed */
+ GRPC_CALL_ERROR_BATCH_TOO_BIG
} grpc_call_error;
/* Write Flags: */
@@ -258,31 +260,44 @@ void grpc_call_details_destroy(grpc_call_details *details);
typedef enum {
/** Send initial metadata: one and only one instance MUST be sent for each
- call, unless the call was cancelled - in which case this can be skipped */
+ call, unless the call was cancelled - in which case this can be skipped.
+ This op completes after all bytes of metadata have been accepted by
+ outgoing flow control. */
GRPC_OP_SEND_INITIAL_METADATA = 0,
- /** Send a message: 0 or more of these operations can occur for each call */
+ /** Send a message: 0 or more of these operations can occur for each call.
+ This op completes after all bytes for the message have been accepted by
+ outgoing flow control. */
GRPC_OP_SEND_MESSAGE,
/** Send a close from the client: one and only one instance MUST be sent from
the client, unless the call was cancelled - in which case this can be
- skipped */
+ skipped.
+ This op completes after all bytes for the call (including the close)
+ have passed outgoing flow control. */
GRPC_OP_SEND_CLOSE_FROM_CLIENT,
/** Send status from the server: one and only one instance MUST be sent from
the server unless the call was cancelled - in which case this can be
- skipped */
+ skipped.
+ This op completes after all bytes for the call (including the status)
+ have passed outgoing flow control. */
GRPC_OP_SEND_STATUS_FROM_SERVER,
/** Receive initial metadata: one and only one MUST be made on the client,
- must not be made on the server */
+ must not be made on the server.
+ This op completes after all initial metadata has been read from the
+ peer. */
GRPC_OP_RECV_INITIAL_METADATA,
- /** Receive a message: 0 or more of these operations can occur for each call
- */
+ /** Receive a message: 0 or more of these operations can occur for each call.
+ This op completes after all bytes of the received message have been
+ read, or after a half-close has been received on this call. */
GRPC_OP_RECV_MESSAGE,
/** Receive status on the client: one and only one must be made on the client.
- This operation always succeeds, meaning ops paired with this operation
- will also appear to succeed, even though they may not have. In that case
- the status will indicate some failure. */
+ This operation always succeeds, meaning ops paired with this operation
+ will also appear to succeed, even though they may not have. In that case
+ the status will indicate some failure.
+ This op completes after all activity on the call has completed. */
GRPC_OP_RECV_STATUS_ON_CLIENT,
/** Receive close on the server: one and only one must be made on the
- server */
+ server.
+ This op completes after the close has been received by the server. */
GRPC_OP_RECV_CLOSE_ON_SERVER
} grpc_op_type;
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index a293c93ec6..6c2e6b38a8 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -527,6 +527,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
if (old_lb_policy != NULL) {
+ grpc_lb_policy_shutdown(old_lb_policy);
GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
}
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index f368819597..d1f228665f 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -84,6 +84,8 @@ static void on_compute_engine_detection_http_response(
gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
}
+static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+
static int is_stack_running_on_compute_engine(void) {
compute_engine_detector detector;
grpc_httpcli_request request;
@@ -114,12 +116,12 @@ static int is_stack_running_on_compute_engine(void) {
while (!detector.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&detector.pollset, &worker,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset));
grpc_httpcli_context_destroy(&context);
- grpc_pollset_destroy(&detector.pollset);
+ grpc_pollset_shutdown(&detector.pollset, destroy_pollset, &detector.pollset);
return detector.success;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index e1a1f38a12..6a1a6cbf30 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -1544,6 +1544,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
/* Flag validation: currently allow no flags */
if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
req->data.send_metadata.count = op->data.send_initial_metadata.count;
req->data.send_metadata.metadata =
@@ -1558,6 +1559,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_INVALID_MESSAGE;
}
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_MESSAGE;
req->data.send_message = op->data.send_message;
req->flags = op->flags;
@@ -1569,6 +1571,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_CLOSE;
req->flags = op->flags;
break;
@@ -1579,6 +1582,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_CLIENT;
}
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
req->flags = op->flags;
req->data.send_metadata.count =
@@ -1586,6 +1590,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
req->data.send_metadata.metadata =
op->data.send_status_from_server.trailing_metadata;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_STATUS;
req->data.send_status.code = op->data.send_status_from_server.status;
req->data.send_status.details =
@@ -1595,6 +1600,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.send_status_from_server.status_details, 0)
: NULL;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_SEND_CLOSE;
break;
case GRPC_OP_RECV_INITIAL_METADATA:
@@ -1604,6 +1610,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req->data.recv_metadata = op->data.recv_initial_metadata;
req->data.recv_metadata->count = 0;
@@ -1613,6 +1620,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
/* Flag validation: currently allow no flags */
if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_MESSAGE;
req->data.recv_message = op->data.recv_message;
req->flags = op->flags;
@@ -1624,22 +1632,26 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return GRPC_CALL_ERROR_NOT_ON_SERVER;
}
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_STATUS;
req->flags = op->flags;
req->data.recv_status.set_value = set_status_value_directly;
req->data.recv_status.user_data = op->data.recv_status_on_client.status;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_STATUS_DETAILS;
req->data.recv_status_details.details =
op->data.recv_status_on_client.status_details;
req->data.recv_status_details.details_capacity =
op->data.recv_status_on_client.status_details_capacity;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
req->data.recv_metadata =
op->data.recv_status_on_client.trailing_metadata;
req->data.recv_metadata->count = 0;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;
break;
@@ -1647,12 +1659,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
/* Flag validation: currently allow no flags */
if (op->flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_STATUS;
req->flags = op->flags;
req->data.recv_status.set_value = set_cancelled_value;
req->data.recv_status.user_data =
op->data.recv_close_on_server.cancelled;
req = &reqs[out++];
+ if (out > GRPC_IOREQ_OP_COUNT) return GRPC_CALL_ERROR_BATCH_TOO_BIG;
req->op = GRPC_IOREQ_RECV_CLOSE;
finish_func = finish_batch_with_close;
break;
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 1f89353025..c3150250b8 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -88,8 +88,8 @@ static void on_secure_transport_setup_done(void *arg,
c->args.channel_args, secure_endpoint, c->args.metadata_context, 1);
grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *) * 2);
- c->result->filters[0] = &grpc_client_auth_filter;
- c->result->filters[1] = &grpc_http_client_filter;
+ c->result->filters[0] = &grpc_http_client_filter;
+ c->result->filters[1] = &grpc_client_auth_filter;
c->result->num_filters = 2;
}
notify = c->notify;
diff --git a/src/core/surface/version.c b/src/core/surface/version.c
index 4f5d648371..d7aaba3868 100644
--- a/src/core/surface/version.c
+++ b/src/core/surface/version.c
@@ -37,5 +37,5 @@
#include <grpc/grpc.h>
const char *grpc_version_string(void) {
- return "0.10.0.0";
+ return "0.10.1.0";
}
diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h
index 936b0c25b0..e27e6b9fc9 100644
--- a/src/core/tsi/transport_security_interface.h
+++ b/src/core/tsi/transport_security_interface.h
@@ -158,6 +158,8 @@ tsi_result tsi_frame_protector_protect_flush(
value is expected to be at most max_protected_frame_size minus overhead
which means that max_protected_frame_size is a safe bet. The output value
is the number of bytes actually written.
+ If *unprotected_bytes_size is unchanged, there may be more data remaining
+ to unprotect, and the caller should call this function again.
- This method returns TSI_OK in case of success. Success includes cases where
there is not enough data to output a frame in which case
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index 2d6114e06b..6cd6b77fcf 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -34,6 +34,7 @@
#include <grpc/support/log.h>
#include <grpc++/channel_arguments.h>
+#include <grpc++/impl/grpc_library.h>
#include "src/cpp/client/channel.h"
#include "src/cpp/client/secure_credentials.h"
@@ -61,12 +62,14 @@ std::shared_ptr<Credentials> WrapCredentials(grpc_credentials* creds) {
} // namespace
std::shared_ptr<Credentials> GoogleDefaultCredentials() {
+ GrpcLibrary init; // To call grpc_init().
return WrapCredentials(grpc_google_default_credentials_create());
}
// Builds SSL Credentials given SSL specific options
std::shared_ptr<Credentials> SslCredentials(
const SslCredentialsOptions& options) {
+ GrpcLibrary init; // To call grpc_init().
grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {
options.pem_private_key.c_str(), options.pem_cert_chain.c_str()};
@@ -78,6 +81,7 @@ std::shared_ptr<Credentials> SslCredentials(
// Builds credentials for use when running in GCE
std::shared_ptr<Credentials> ComputeEngineCredentials() {
+ GrpcLibrary init; // To call grpc_init().
return WrapCredentials(grpc_compute_engine_credentials_create());
}
@@ -85,6 +89,7 @@ std::shared_ptr<Credentials> ComputeEngineCredentials() {
std::shared_ptr<Credentials> ServiceAccountCredentials(
const grpc::string& json_key, const grpc::string& scope,
long token_lifetime_seconds) {
+ GrpcLibrary init; // To call grpc_init().
if (token_lifetime_seconds <= 0) {
gpr_log(GPR_ERROR,
"Trying to create ServiceAccountCredentials "
@@ -100,6 +105,7 @@ std::shared_ptr<Credentials> ServiceAccountCredentials(
// Builds JWT credentials.
std::shared_ptr<Credentials> ServiceAccountJWTAccessCredentials(
const grpc::string& json_key, long token_lifetime_seconds) {
+ GrpcLibrary init; // To call grpc_init().
if (token_lifetime_seconds <= 0) {
gpr_log(GPR_ERROR,
"Trying to create JWTCredentials with non-positive lifetime");
@@ -114,6 +120,7 @@ std::shared_ptr<Credentials> ServiceAccountJWTAccessCredentials(
// Builds refresh token credentials.
std::shared_ptr<Credentials> RefreshTokenCredentials(
const grpc::string& json_refresh_token) {
+ GrpcLibrary init; // To call grpc_init().
return WrapCredentials(
grpc_refresh_token_credentials_create(json_refresh_token.c_str()));
}
@@ -121,6 +128,7 @@ std::shared_ptr<Credentials> RefreshTokenCredentials(
// Builds access token credentials.
std::shared_ptr<Credentials> AccessTokenCredentials(
const grpc::string& access_token) {
+ GrpcLibrary init; // To call grpc_init().
return WrapCredentials(
grpc_access_token_credentials_create(access_token.c_str()));
}
@@ -129,6 +137,7 @@ std::shared_ptr<Credentials> AccessTokenCredentials(
std::shared_ptr<Credentials> IAMCredentials(
const grpc::string& authorization_token,
const grpc::string& authority_selector) {
+ GrpcLibrary init; // To call grpc_init().
return WrapCredentials(grpc_iam_credentials_create(
authorization_token.c_str(), authority_selector.c_str()));
}
diff --git a/src/cpp/common/auth_property_iterator.cc b/src/cpp/common/auth_property_iterator.cc
index e706c6c921..ba88983515 100644
--- a/src/cpp/common/auth_property_iterator.cc
+++ b/src/cpp/common/auth_property_iterator.cc
@@ -31,7 +31,7 @@
*
*/
-#include <grpc++/auth_property_iterator.h>
+#include <grpc++/auth_context.h>
#include <grpc/grpc_security.h>
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 64ea21800f..c5fc85b3fe 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -46,47 +46,18 @@ namespace Grpc.Core.Tests
public class ClientServerTest
{
const string Host = "127.0.0.1";
- const string ServiceName = "tests.Test";
-
- static readonly Method<string, string> EchoMethod = new Method<string, string>(
- MethodType.Unary,
- ServiceName,
- "Echo",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- static readonly Method<string, string> ConcatAndEchoMethod = new Method<string, string>(
- MethodType.ClientStreaming,
- ServiceName,
- "ConcatAndEcho",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- static readonly Method<string, string> NonexistentMethod = new Method<string, string>(
- MethodType.Unary,
- ServiceName,
- "NonexistentMethod",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- static readonly ServerServiceDefinition ServiceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
- .AddMethod(EchoMethod, EchoHandler)
- .AddMethod(ConcatAndEchoMethod, ConcatAndEchoHandler)
- .Build();
+ MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
- server = new Server
- {
- Services = { ServiceDefinition },
- Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
- };
+ helper = new MockServiceHelper(Host);
+ server = helper.GetServer();
server.Start();
- channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure);
+ channel = helper.GetChannel();
}
[TearDown]
@@ -103,86 +74,79 @@ namespace Grpc.Core.Tests
}
[Test]
- public void UnaryCall()
+ public async Task UnaryCall()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- Assert.AreEqual("ABC", Calls.BlockingUnaryCall(callDetails, "ABC"));
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return request;
+ });
+
+ Assert.AreEqual("ABC", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
+
+ Assert.AreEqual("ABC", await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test]
public void UnaryCall_ServerHandlerThrows()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- try
+ helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
- Calls.BlockingUnaryCall(callDetails, "THROW");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
- }
+ throw new Exception("This was thrown on purpose by a test");
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
+
+ var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unknown, ex2.Status.StatusCode);
}
[Test]
public void UnaryCall_ServerHandlerThrowsRpcException()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- try
- {
- Calls.BlockingUnaryCall(callDetails, "THROW_UNAUTHENTICATED");
- Assert.Fail();
- }
- catch (RpcException e)
+ helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
- Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode);
- }
+ throw new RpcException(new Status(StatusCode.Unauthenticated, ""));
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
+
+ var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode);
}
[Test]
public void UnaryCall_ServerHandlerSetsStatus()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- try
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
- Calls.BlockingUnaryCall(callDetails, "SET_UNAUTHENTICATED");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode);
- }
- }
+ context.Status = new Status(StatusCode.Unauthenticated, "");
+ return "";
+ });
- [Test]
- public async Task AsyncUnaryCall()
- {
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- var result = await Calls.AsyncUnaryCall(callDetails, "ABC");
- Assert.AreEqual("ABC", result);
- }
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode);
- [Test]
- public async Task AsyncUnaryCall_ServerHandlerThrows()
- {
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- try
- {
- await Calls.AsyncUnaryCall(callDetails, "THROW");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
- }
+ var ex2 = Assert.Throws<RpcException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc"));
+ Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode);
}
[Test]
public async Task ClientStreamingCall()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallOptions());
- var call = Calls.AsyncClientStreamingCall(callDetails);
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ string result = "";
+ await requestStream.ForEach(async (request) =>
+ {
+ result += request;
+ });
+ await Task.Delay(100);
+ return result;
+ });
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
await call.RequestStream.WriteAll(new string[] { "A", "B", "C" });
Assert.AreEqual("ABC", await call.ResponseAsync);
}
@@ -190,36 +154,47 @@ namespace Grpc.Core.Tests
[Test]
public async Task ClientStreamingCall_CancelAfterBegin()
{
+ var barrier = new TaskCompletionSource<object>();
+
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ barrier.SetResult(null);
+ await requestStream.ToList();
+ return "";
+ });
+
var cts = new CancellationTokenSource();
- var callDetails = new CallInvocationDetails<string, string>(channel, ConcatAndEchoMethod, new CallOptions(cancellationToken: cts.Token));
- var call = Calls.AsyncClientStreamingCall(callDetails);
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
- // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
- await Task.Delay(1000);
+ await barrier.Task; // make sure the handler has started.
cts.Cancel();
- try
- {
- await call.ResponseAsync;
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
- }
+ var ex = Assert.Throws<RpcException>(async () => await call.ResponseAsync);
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
[Test]
- public void AsyncUnaryCall_EchoMetadata()
+ public async Task AsyncUnaryCall_EchoMetadata()
{
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ foreach (Metadata.Entry metadataEntry in context.RequestHeaders)
+ {
+ if (metadataEntry.Key != "user-agent")
+ {
+ context.ResponseTrailers.Add(metadataEntry);
+ }
+ }
+ return "";
+ });
+
var headers = new Metadata
{
- new Metadata.Entry("ascii-header", "abcdefg"),
- new Metadata.Entry("binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff }),
+ { "ascii-header", "abcdefg" },
+ { "binary-header-bin", new byte[] { 1, 2, 3, 0, 0xff } }
};
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions(headers: headers));
- var call = Calls.AsyncUnaryCall(callDetails, "ABC");
-
- Assert.AreEqual("ABC", call.ResponseAsync.Result);
+ var call = Calls.AsyncUnaryCall(helper.CreateUnaryCall(new CallOptions(headers: headers)), "ABC");
+ await call;
Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode);
@@ -236,15 +211,18 @@ namespace Grpc.Core.Tests
public void UnaryCall_DisposedChannel()
{
channel.Dispose();
-
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(callDetails, "ABC"));
+ Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
}
[Test]
public void UnaryCallPerformance()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return request;
+ });
+
+ var callDetails = helper.CreateUnaryCall();
BenchmarkUtil.RunBenchmark(100, 100,
() => { Calls.BlockingUnaryCall(callDetails, "ABC"); });
}
@@ -252,44 +230,57 @@ namespace Grpc.Core.Tests
[Test]
public void UnknownMethodHandler()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, NonexistentMethod, new CallOptions());
- try
- {
- Calls.BlockingUnaryCall(callDetails, "ABC");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unimplemented, e.Status.StatusCode);
- }
+ var nonexistentMethod = new Method<string, string>(
+ MethodType.Unary,
+ MockServiceHelper.ServiceName,
+ "NonExistentMethod",
+ Marshallers.StringMarshaller,
+ Marshallers.StringMarshaller);
+
+ var callDetails = new CallInvocationDetails<string, string>(channel, nonexistentMethod, new CallOptions());
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(callDetails, "abc"));
+ Assert.AreEqual(StatusCode.Unimplemented, ex.Status.StatusCode);
}
[Test]
public void UserAgentStringPresent()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- string userAgent = Calls.BlockingUnaryCall(callDetails, "RETURN-USER-AGENT");
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return context.RequestHeaders.Where(entry => entry.Key == "user-agent").Single().Value;
+ });
+
+ string userAgent = Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc");
Assert.IsTrue(userAgent.StartsWith("grpc-csharp/"));
}
[Test]
public void PeerInfoPresent()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- string peer = Calls.BlockingUnaryCall(callDetails, "RETURN-PEER");
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return context.Peer;
+ });
+
+ string peer = Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc");
Assert.IsTrue(peer.Contains(Host));
}
[Test]
public async Task Channel_WaitForStateChangedAsync()
{
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ return request;
+ });
+
Assert.Throws(typeof(TaskCanceledException),
async () => await channel.WaitForStateChangedAsync(channel.State, DateTime.UtcNow.AddMilliseconds(10)));
var stateChangedTask = channel.WaitForStateChangedAsync(channel.State);
- var callDetails = new CallInvocationDetails<string, string>(channel, EchoMethod, new CallOptions());
- await Calls.AsyncUnaryCall(callDetails, "abc");
+ await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc");
await stateChangedTask;
Assert.AreEqual(ChannelState.Ready, channel.State);
@@ -300,62 +291,9 @@ namespace Grpc.Core.Tests
{
await channel.ConnectAsync();
Assert.AreEqual(ChannelState.Ready, channel.State);
+
await channel.ConnectAsync(DateTime.UtcNow.AddMilliseconds(1000));
Assert.AreEqual(ChannelState.Ready, channel.State);
}
-
- private static async Task<string> EchoHandler(string request, ServerCallContext context)
- {
- foreach (Metadata.Entry metadataEntry in context.RequestHeaders)
- {
- if (metadataEntry.Key != "user-agent")
- {
- context.ResponseTrailers.Add(metadataEntry);
- }
- }
-
- if (request == "RETURN-USER-AGENT")
- {
- return context.RequestHeaders.Where(entry => entry.Key == "user-agent").Single().Value;
- }
-
- if (request == "RETURN-PEER")
- {
- return context.Peer;
- }
-
- if (request == "THROW")
- {
- throw new Exception("This was thrown on purpose by a test");
- }
-
- if (request == "THROW_UNAUTHENTICATED")
- {
- throw new RpcException(new Status(StatusCode.Unauthenticated, ""));
- }
-
- if (request == "SET_UNAUTHENTICATED")
- {
- context.Status = new Status(StatusCode.Unauthenticated, "");
- }
-
- return request;
- }
-
- private static async Task<string> ConcatAndEchoHandler(IAsyncStreamReader<string> requestStream, ServerCallContext context)
- {
- string result = "";
- await requestStream.ForEach(async (request) =>
- {
- if (request == "THROW")
- {
- throw new Exception("This was thrown on purpose by a test");
- }
- result += request;
- });
- // simulate processing takes some time.
- await Task.Delay(250);
- return result;
- }
}
}
diff --git a/src/csharp/Grpc.Core.Tests/CompressionTest.cs b/src/csharp/Grpc.Core.Tests/CompressionTest.cs
new file mode 100644
index 0000000000..ac0c3d6b5f
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/CompressionTest.cs
@@ -0,0 +1,128 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class CompressionTest
+ {
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper();
+
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.Dispose();
+ server.ShutdownAsync().Wait();
+ }
+
+ [TestFixtureTearDown]
+ public void CleanupClass()
+ {
+ GrpcEnvironment.Shutdown();
+ }
+
+ [Test]
+ public void WriteOptions_Unary()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ context.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
+ return request;
+ });
+
+ var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress));
+ Calls.BlockingUnaryCall(helper.CreateUnaryCall(callOptions), "abc");
+ }
+
+ [Test]
+ public async Task WriteOptions_DuplexStreaming()
+ {
+ helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+ {
+ await requestStream.ToList();
+
+ context.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
+
+ await context.WriteResponseHeadersAsync(new Metadata { { "ascii-header", "abcdefg" } });
+
+ await responseStream.WriteAsync("X");
+
+ responseStream.WriteOptions = null;
+ await responseStream.WriteAsync("Y");
+
+ responseStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
+ await responseStream.WriteAsync("Z");
+ });
+
+ var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress));
+ var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(callOptions));
+
+ // check that write options from call options are propagated to request stream.
+ Assert.IsTrue((call.RequestStream.WriteOptions.Flags & WriteFlags.NoCompress) != 0);
+
+ call.RequestStream.WriteOptions = new WriteOptions();
+ await call.RequestStream.WriteAsync("A");
+
+ call.RequestStream.WriteOptions = null;
+ await call.RequestStream.WriteAsync("B");
+
+ call.RequestStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
+ await call.RequestStream.WriteAsync("C");
+
+ await call.RequestStream.CompleteAsync();
+
+ await call.ResponseStream.ToList();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
new file mode 100644
index 0000000000..a7f5075874
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
@@ -0,0 +1,122 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class ContextPropagationTest
+ {
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper();
+
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.Dispose();
+ server.ShutdownAsync().Wait();
+ }
+
+ [TestFixtureTearDown]
+ public void CleanupClass()
+ {
+ GrpcEnvironment.Shutdown();
+ }
+
+ [Test]
+ public async Task PropagateCancellation()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ // check that we didn't obtain the default cancellation token.
+ Assert.IsTrue(context.CancellationToken.CanBeCanceled);
+ return "PASS";
+ });
+
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ var propagationToken = context.CreatePropagationToken();
+ Assert.IsNotNull(propagationToken.ParentCall);
+
+ var callOptions = new CallOptions(propagationToken: propagationToken);
+ return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
+ });
+
+ var cts = new CancellationTokenSource();
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
+ await call.RequestStream.CompleteAsync();
+ Assert.AreEqual("PASS", await call);
+ }
+
+ [Test]
+ public async Task PropagateDeadline()
+ {
+ var deadline = DateTime.UtcNow.AddDays(7);
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ Assert.IsTrue(context.Deadline < deadline.AddMinutes(1));
+ Assert.IsTrue(context.Deadline > deadline.AddMinutes(-1));
+ return "PASS";
+ });
+
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ var callOptions = new CallOptions(propagationToken: context.CreatePropagationToken());
+ return await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
+ });
+
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(deadline: deadline)));
+ await call.RequestStream.CompleteAsync();
+ Assert.AreEqual("PASS", await call);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index f2bf459dc5..97ee0454bb 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -77,6 +77,10 @@
<Compile Include="TimeoutsTest.cs" />
<Compile Include="NUnitVersionTest.cs" />
<Compile Include="ChannelTest.cs" />
+ <Compile Include="MockServiceHelper.cs" />
+ <Compile Include="ResponseHeadersTest.cs" />
+ <Compile Include="CompressionTest.cs" />
+ <Compile Include="ContextPropagationTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 9ae12776f3..4ed93c7eca 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -69,5 +69,13 @@ namespace Grpc.Core.Tests
Assert.IsFalse(object.ReferenceEquals(env1, env2));
}
+
+ [Test]
+ public void GetCoreVersionString()
+ {
+ var coreVersion = GrpcEnvironment.GetCoreVersionString();
+ var parts = coreVersion.Split('.');
+ Assert.AreEqual(4, parts.Length);
+ }
}
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
index 46469113c5..33534fdd3c 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
@@ -53,8 +53,8 @@ namespace Grpc.Core.Internal.Tests
{
var metadata = new Metadata
{
- new Metadata.Entry("host", "somehost"),
- new Metadata.Entry("header2", "header value"),
+ { "host", "somehost" },
+ { "header2", "header value" },
};
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
nativeMetadata.Dispose();
@@ -65,8 +65,8 @@ namespace Grpc.Core.Internal.Tests
{
var metadata = new Metadata
{
- new Metadata.Entry("host", "somehost"),
- new Metadata.Entry("header2", "header value"),
+ { "host", "somehost" },
+ { "header2", "header value" }
};
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
diff --git a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
new file mode 100644
index 0000000000..b642286b11
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
@@ -0,0 +1,248 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ /// <summary>
+ /// Allows setting up a mock service in the client-server tests easily.
+ /// </summary>
+ public class MockServiceHelper
+ {
+ public const string ServiceName = "tests.Test";
+
+ public static readonly Method<string, string> UnaryMethod = new Method<string, string>(
+ MethodType.Unary,
+ ServiceName,
+ "Unary",
+ Marshallers.StringMarshaller,
+ Marshallers.StringMarshaller);
+
+ public static readonly Method<string, string> ClientStreamingMethod = new Method<string, string>(
+ MethodType.ClientStreaming,
+ ServiceName,
+ "ClientStreaming",
+ Marshallers.StringMarshaller,
+ Marshallers.StringMarshaller);
+
+ public static readonly Method<string, string> ServerStreamingMethod = new Method<string, string>(
+ MethodType.ServerStreaming,
+ ServiceName,
+ "ServerStreaming",
+ Marshallers.StringMarshaller,
+ Marshallers.StringMarshaller);
+
+ public static readonly Method<string, string> DuplexStreamingMethod = new Method<string, string>(
+ MethodType.DuplexStreaming,
+ ServiceName,
+ "DuplexStreaming",
+ Marshallers.StringMarshaller,
+ Marshallers.StringMarshaller);
+
+ readonly string host;
+ readonly ServerServiceDefinition serviceDefinition;
+
+ UnaryServerMethod<string, string> unaryHandler;
+ ClientStreamingServerMethod<string, string> clientStreamingHandler;
+ ServerStreamingServerMethod<string, string> serverStreamingHandler;
+ DuplexStreamingServerMethod<string, string> duplexStreamingHandler;
+
+ Server server;
+ Channel channel;
+
+ public MockServiceHelper(string host = null)
+ {
+ this.host = host ?? "localhost";
+
+ serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
+ .AddMethod(UnaryMethod, (request, context) => unaryHandler(request, context))
+ .AddMethod(ClientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context))
+ .AddMethod(ServerStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context))
+ .AddMethod(DuplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context))
+ .Build();
+
+ var defaultStatus = new Status(StatusCode.Unknown, "Default mock implementation. Please provide your own.");
+
+ unaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ context.Status = defaultStatus;
+ return "";
+ });
+
+ clientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ context.Status = defaultStatus;
+ return "";
+ });
+
+ serverStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ context.Status = defaultStatus;
+ });
+
+ duplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+ {
+ context.Status = defaultStatus;
+ });
+ }
+
+ /// <summary>
+ /// Returns the default server for this service and creates one if not yet created.
+ /// </summary>
+ public Server GetServer()
+ {
+ if (server == null)
+ {
+ server = new Server
+ {
+ Services = { serviceDefinition },
+ Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
+ };
+ }
+ return server;
+ }
+
+ /// <summary>
+ /// Returns the default channel for this service and creates one if not yet created.
+ /// </summary>
+ public Channel GetChannel()
+ {
+ if (channel == null)
+ {
+ channel = new Channel(Host, GetServer().Ports.Single().BoundPort, Credentials.Insecure);
+ }
+ return channel;
+ }
+
+ public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = null)
+ {
+ options = options ?? new CallOptions();
+ return new CallInvocationDetails<string, string>(channel, UnaryMethod, options);
+ }
+
+ public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = null)
+ {
+ options = options ?? new CallOptions();
+ return new CallInvocationDetails<string, string>(channel, ClientStreamingMethod, options);
+ }
+
+ public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = null)
+ {
+ options = options ?? new CallOptions();
+ return new CallInvocationDetails<string, string>(channel, ServerStreamingMethod, options);
+ }
+
+ public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = null)
+ {
+ options = options ?? new CallOptions();
+ return new CallInvocationDetails<string, string>(channel, DuplexStreamingMethod, options);
+ }
+
+ public string Host
+ {
+ get
+ {
+ return this.host;
+ }
+ }
+
+ public ServerServiceDefinition ServiceDefinition
+ {
+ get
+ {
+ return this.serviceDefinition;
+ }
+ }
+
+ public UnaryServerMethod<string, string> UnaryHandler
+ {
+ get
+ {
+ return this.unaryHandler;
+ }
+
+ set
+ {
+ unaryHandler = value;
+ }
+ }
+
+ public ClientStreamingServerMethod<string, string> ClientStreamingHandler
+ {
+ get
+ {
+ return this.clientStreamingHandler;
+ }
+
+ set
+ {
+ clientStreamingHandler = value;
+ }
+ }
+
+ public ServerStreamingServerMethod<string, string> ServerStreamingHandler
+ {
+ get
+ {
+ return this.serverStreamingHandler;
+ }
+
+ set
+ {
+ serverStreamingHandler = value;
+ }
+ }
+
+ public DuplexStreamingServerMethod<string, string> DuplexStreamingHandler
+ {
+ get
+ {
+ return this.duplexStreamingHandler;
+ }
+
+ set
+ {
+ duplexStreamingHandler = value;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
new file mode 100644
index 0000000000..8925041ba4
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
@@ -0,0 +1,136 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ /// <summary>
+ /// Tests for response headers support.
+ /// </summary>
+ public class ResponseHeadersTest
+ {
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ Metadata headers;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper();
+
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+
+ headers = new Metadata { { "ascii-header", "abcdefg" } };
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.Dispose();
+ server.ShutdownAsync().Wait();
+ }
+
+ [TestFixtureTearDown]
+ public void CleanupClass()
+ {
+ GrpcEnvironment.Shutdown();
+ }
+
+ [Test]
+ public void WriteResponseHeaders_NullNotAllowed()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ Assert.Throws(typeof(NullReferenceException), async () => await context.WriteResponseHeadersAsync(null));
+ return "PASS";
+ });
+
+ Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), ""));
+ }
+
+ [Test]
+ public void WriteResponseHeaders_AllowedOnlyOnce()
+ {
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ try
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ Assert.Fail();
+ }
+ catch (InvalidOperationException expected)
+ {
+ }
+ return "PASS";
+ });
+
+ Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), ""));
+ }
+
+ [Test]
+ public async Task WriteResponseHeaders_NotAllowedAfterWrite()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ await responseStream.WriteAsync("A");
+ try
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ Assert.Fail();
+ }
+ catch (InvalidOperationException expected)
+ {
+ }
+ await responseStream.WriteAsync("B");
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+ var responses = await call.ResponseStream.ToList();
+ CollectionAssert.AreEqual(new[] { "A", "B" }, responses);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
index fc395b0acd..d875d601b9 100644
--- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
+++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
@@ -48,38 +48,18 @@ namespace Grpc.Core.Tests
/// </summary>
public class TimeoutsTest
{
- const string Host = "localhost";
- const string ServiceName = "tests.Test";
-
- static readonly Method<string, string> TestMethod = new Method<string, string>(
- MethodType.Unary,
- ServiceName,
- "Test",
- Marshallers.StringMarshaller,
- Marshallers.StringMarshaller);
-
- static readonly ServerServiceDefinition ServiceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
- .AddMethod(TestMethod, TestMethodHandler)
- .Build();
-
- // provides a way how to retrieve an out-of-band result value from server handler
- static TaskCompletionSource<string> stringFromServerHandlerTcs;
-
+ MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
- server = new Server
- {
- Services = { ServiceDefinition },
- Ports = { { Host, ServerPort.PickUnused, ServerCredentials.Insecure } }
- };
- server.Start();
- channel = new Channel(Host, server.Ports.Single().BoundPort, Credentials.Insecure);
+ helper = new MockServiceHelper();
- stringFromServerHandlerTcs = new TaskCompletionSource<string>();
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
}
[TearDown]
@@ -98,115 +78,83 @@ namespace Grpc.Core.Tests
[Test]
public void InfiniteDeadline()
{
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ Assert.AreEqual(DateTime.MaxValue, context.Deadline);
+ return "PASS";
+ });
+
// no deadline specified, check server sees infinite deadline
- var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions());
- Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE"));
+ Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc"));
// DateTime.MaxValue deadline specified, check server sees infinite deadline
- var callDetails2 = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions());
- Assert.AreEqual("DATETIME_MAXVALUE", Calls.BlockingUnaryCall(callDetails2, "RETURN_DEADLINE"));
+ Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.MaxValue)), "abc"));
}
[Test]
public void DeadlineTransferredToServer()
{
- var remainingTimeClient = TimeSpan.FromDays(7);
- var deadline = DateTime.UtcNow + remainingTimeClient;
- Thread.Sleep(1000);
- var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline));
-
- var serverDeadlineTicksString = Calls.BlockingUnaryCall(callDetails, "RETURN_DEADLINE");
- var serverDeadline = new DateTime(long.Parse(serverDeadlineTicksString), DateTimeKind.Utc);
-
- // A fairly relaxed check that the deadline set by client and deadline seen by server
- // are in agreement. C core takes care of the work with transferring deadline over the wire,
- // so we don't need an exact check here.
- Assert.IsTrue(Math.Abs((deadline - serverDeadline).TotalMilliseconds) < 5000);
+ var clientDeadline = DateTime.UtcNow + TimeSpan.FromDays(7);
+
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
+ {
+ // A fairly relaxed check that the deadline set by client and deadline seen by server
+ // are in agreement. C core takes care of the work with transferring deadline over the wire,
+ // so we don't need an exact check here.
+ Assert.IsTrue(Math.Abs((clientDeadline - context.Deadline).TotalMilliseconds) < 5000);
+ return "PASS";
+ });
+ Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: clientDeadline)), "abc");
}
[Test]
public void DeadlineInThePast()
{
- var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: DateTime.MinValue));
-
- try
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
- Calls.BlockingUnaryCall(callDetails, "TIMEOUT");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
- Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
- }
+ await Task.Delay(60000);
+ return "FAIL";
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.MinValue)), "abc"));
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
[Test]
public void DeadlineExceededStatusOnTimeout()
{
- var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
- var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline));
-
- try
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
- Calls.BlockingUnaryCall(callDetails, "TIMEOUT");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
- Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
- }
+ await Task.Delay(60000);
+ return "FAIL";
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)))), "abc"));
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
[Test]
- public void ServerReceivesCancellationOnTimeout()
+ public async Task ServerReceivesCancellationOnTimeout()
{
- var deadline = DateTime.UtcNow.Add(TimeSpan.FromSeconds(5));
- var callDetails = new CallInvocationDetails<string, string>(channel, TestMethod, new CallOptions(deadline: deadline));
+ var serverReceivedCancellationTcs = new TaskCompletionSource<bool>();
- try
- {
- Calls.BlockingUnaryCall(callDetails, "CHECK_CANCELLATION_RECEIVED");
- Assert.Fail();
- }
- catch (RpcException e)
- {
- // We can't guarantee the status code is always DeadlineExceeded. See issue #2685.
- Assert.Contains(e.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
- }
- Assert.AreEqual("CANCELLED", stringFromServerHandlerTcs.Task.Result);
- }
-
- private static async Task<string> TestMethodHandler(string request, ServerCallContext context)
- {
- if (request == "TIMEOUT")
- {
- await Task.Delay(60000);
- return "";
- }
-
- if (request == "RETURN_DEADLINE")
- {
- if (context.Deadline == DateTime.MaxValue)
- {
- return "DATETIME_MAXVALUE";
- }
-
- return context.Deadline.Ticks.ToString();
- }
-
- if (request == "CHECK_CANCELLATION_RECEIVED")
+ helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
// wait until cancellation token is fired.
var tcs = new TaskCompletionSource<object>();
context.CancellationToken.Register(() => { tcs.SetResult(null); });
await tcs.Task;
- stringFromServerHandlerTcs.SetResult("CANCELLED");
+ serverReceivedCancellationTcs.SetResult(true);
return "";
- }
+ });
+
+ var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(new CallOptions(deadline: DateTime.UtcNow.Add(TimeSpan.FromSeconds(5)))), "abc"));
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
- return "";
+ Assert.IsTrue(await serverReceivedCancellationTcs.Task);
}
}
}
diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs
index 8e9739335f..0d82b5a28e 100644
--- a/src/csharp/Grpc.Core/CallOptions.cs
+++ b/src/csharp/Grpc.Core/CallOptions.cs
@@ -47,6 +47,8 @@ namespace Grpc.Core
readonly Metadata headers;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
+ readonly WriteOptions writeOptions;
+ readonly ContextPropagationToken propagationToken;
/// <summary>
/// Creates a new instance of <c>CallOptions</c>.
@@ -54,12 +56,17 @@ namespace Grpc.Core
/// <param name="headers">Headers to be sent with the call.</param>
/// <param name="deadline">Deadline for the call to finish. null means no deadline.</param>
/// <param name="cancellationToken">Can be used to request cancellation of the call.</param>
- public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
+ /// <param name="writeOptions">Write options that will be used for this call.</param>
+ /// <param name="propagationToken">Context propagation token obtained from <see cref="ServerCallContext"/>.</param>
+ public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken? cancellationToken = null,
+ WriteOptions writeOptions = null, ContextPropagationToken propagationToken = null)
{
// TODO(jtattermusch): consider only creating metadata object once it's really needed.
- this.headers = headers != null ? headers : new Metadata();
- this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue;
- this.cancellationToken = cancellationToken;
+ this.headers = headers ?? new Metadata();
+ this.deadline = deadline ?? (propagationToken != null ? propagationToken.Deadline : DateTime.MaxValue);
+ this.cancellationToken = cancellationToken ?? (propagationToken != null ? propagationToken.CancellationToken : CancellationToken.None);
+ this.writeOptions = writeOptions;
+ this.propagationToken = propagationToken;
}
/// <summary>
@@ -85,5 +92,27 @@ namespace Grpc.Core
{
get { return cancellationToken; }
}
+
+ /// <summary>
+ /// Write options that will be used for this call.
+ /// </summary>
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+ }
+
+ /// <summary>
+ /// Token for propagating parent call context.
+ /// </summary>
+ public ContextPropagationToken PropagationToken
+ {
+ get
+ {
+ return this.propagationToken;
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/CompressionLevel.cs b/src/csharp/Grpc.Core/CompressionLevel.cs
new file mode 100644
index 0000000000..399652b85e
--- /dev/null
+++ b/src/csharp/Grpc.Core/CompressionLevel.cs
@@ -0,0 +1,63 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Compression level based on grpc_compression_level from grpc/compression.h
+ /// </summary>
+ public enum CompressionLevel
+ {
+ /// <summary>
+ /// No compression.
+ /// </summary>
+ None = 0,
+
+ /// <summary>
+ /// Low compression.
+ /// </summary>
+ Low,
+
+ /// <summary>
+ /// Medium compression.
+ /// </summary>
+ Medium,
+
+ /// <summary>
+ /// High compression.
+ /// </summary>
+ High,
+ }
+}
diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs
new file mode 100644
index 0000000000..b6ea5115a4
--- /dev/null
+++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs
@@ -0,0 +1,139 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading;
+
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Token for propagating context of server side handlers to child calls.
+ /// In situations when a backend is making calls to another backend,
+ /// it makes sense to propagate properties like deadline and cancellation
+ /// token of the server call to the child call.
+ /// C core provides some other contexts (like tracing context) that
+ /// are not accessible to C# layer, but this token still allows propagating them.
+ /// </summary>
+ public class ContextPropagationToken
+ {
+ /// <summary>
+ /// Default propagation mask used by C core.
+ /// </summary>
+ const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
+
+ /// <summary>
+ /// Default propagation mask used by C# - we want to propagate deadline
+ /// and cancellation token by our own means.
+ /// </summary>
+ internal const ContextPropagationFlags DefaultMask = DefaultCoreMask
+ & ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation;
+
+ readonly CallSafeHandle parentCall;
+ readonly DateTime deadline;
+ readonly CancellationToken cancellationToken;
+ readonly ContextPropagationOptions options;
+
+ internal ContextPropagationToken(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options)
+ {
+ this.parentCall = Preconditions.CheckNotNull(parentCall);
+ this.deadline = deadline;
+ this.cancellationToken = cancellationToken;
+ this.options = options ?? ContextPropagationOptions.Default;
+ }
+
+ internal CallSafeHandle ParentCall
+ {
+ get
+ {
+ return this.parentCall;
+ }
+ }
+
+ internal DateTime Deadline
+ {
+ get
+ {
+ return this.deadline;
+ }
+ }
+
+ internal CancellationToken CancellationToken
+ {
+ get
+ {
+ return this.cancellationToken;
+ }
+ }
+
+ internal ContextPropagationOptions Options
+ {
+ get
+ {
+ return this.options;
+ }
+ }
+
+ internal bool IsPropagateDeadline
+ {
+ get { return false; }
+ }
+
+ internal bool IsPropagateCancellation
+ {
+ get { return false; }
+ }
+ }
+
+ /// <summary>
+ /// Options for <see cref="ContextPropagationToken"/>.
+ /// </summary>
+ public class ContextPropagationOptions
+ {
+ public static readonly ContextPropagationOptions Default = new ContextPropagationOptions();
+ }
+
+ /// <summary>
+ /// Context propagation flags from grpc/grpc.h.
+ /// </summary>
+ [Flags]
+ internal enum ContextPropagationFlags
+ {
+ Deadline = 1,
+ CensusStatsContext = 2,
+ CensusTracingContext = 4,
+ Cancellation = 8
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 52defd1965..e535c47f55 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -115,6 +115,9 @@
<Compile Include="ChannelState.cs" />
<Compile Include="CallInvocationDetails.cs" />
<Compile Include="CallOptions.cs" />
+ <Compile Include="CompressionLevel.cs" />
+ <Compile Include="WriteOptions.cs" />
+ <Compile Include="ContextPropagationToken.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 034a66be3c..1bb83c9962 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -53,6 +53,9 @@ namespace Grpc.Core
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_shutdown();
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern IntPtr grpcsharp_version_string(); // returns not-owned const char*
+
static object staticLock = new object();
static GrpcEnvironment instance;
@@ -164,6 +167,15 @@ namespace Grpc.Core
}
/// <summary>
+ /// Gets version of gRPC C core.
+ /// </summary>
+ internal static string GetCoreVersionString()
+ {
+ var ptr = grpcsharp_version_string(); // the pointer is not owned
+ return Marshal.PtrToStringAnsi(ptr);
+ }
+
+ /// <summary>
/// Shuts down this environment.
/// </summary>
private void Close()
diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
index 371fbf27ce..c0a0674e50 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
@@ -43,7 +43,7 @@ namespace Grpc.Core
/// A stream of messages to be read.
/// </summary>
/// <typeparam name="T"></typeparam>
- public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse>
+ public interface IAsyncStreamReader<T> : IAsyncEnumerator<T>
{
// TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface.
}
diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
index 2000210252..4e2acb9c71 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
@@ -50,5 +50,13 @@ namespace Grpc.Core
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
Task WriteAsync(T message);
+
+ /// <summary>
+ /// Write options that will be used for the next write.
+ /// If null, default options will be used.
+ /// Once set, this property maintains its value across subsequent
+ /// writes.
+ /// <value>The write options.</value>
+ WriteOptions WriteOptions { get; set; }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 414b5c4282..0db9d2a515 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -50,7 +50,7 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
- readonly CallInvocationDetails<TRequest, TResponse> callDetails;
+ readonly CallInvocationDetails<TRequest, TResponse> details;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -63,7 +63,8 @@ namespace Grpc.Core.Internal
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
- this.callDetails = callDetails;
+ this.details = callDetails;
+ this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
@@ -89,11 +90,11 @@ namespace Grpc.Core.Internal
readingDone = true;
}
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
- call.StartUnary(payload, ctx, metadataArray);
+ call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
@@ -130,7 +131,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@@ -138,9 +139,9 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartUnary(payload, HandleUnaryResponse, metadataArray);
+ call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
}
@@ -157,12 +158,12 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
@@ -181,16 +182,16 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartServerStreaming(payload, HandleFinished, metadataArray);
+ call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
}
}
@@ -206,9 +207,9 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
@@ -219,9 +220,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -278,6 +279,14 @@ namespace Grpc.Core.Internal
}
}
+ public CallInvocationDetails<TRequest, TResponse> Details
+ {
+ get
+ {
+ return this.details;
+ }
+ }
+
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@@ -310,14 +319,18 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
private void Initialize(CompletionQueueSafeHandle cq)
{
- var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq,
- callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Options.Deadline));
- callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
+ var propagationToken = details.Options.PropagationToken;
+ var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance;
+
+ var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
+ parentCall, ContextPropagationToken.DefaultMask, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
RegisterCancellationCallback();
}
@@ -325,7 +338,7 @@ namespace Grpc.Core.Internal
// Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
private void RegisterCancellationCallback()
{
- var token = callDetails.Options.CancellationToken;
+ var token = details.Options.CancellationToken;
if (token.CanBeCanceled)
{
token.Register(() => this.Cancel());
@@ -333,6 +346,15 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Gets WriteFlags set in callDetails.Options.WriteOptions
+ /// </summary>
+ private WriteFlags GetWriteFlagsForCall()
+ {
+ var writeOptions = details.Options.WriteOptions;
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
+ }
+
+ /// <summary>
/// Handler for unary response completion.
/// </summary>
private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 38f2a5baeb..9fa0baca87 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -71,6 +71,9 @@ namespace Grpc.Core.Internal
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer.
+ protected bool initialMetadataSent;
+ protected long streamingWritesCounter;
+
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
@@ -123,7 +126,7 @@ namespace Grpc.Core.Internal
/// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate)
+ protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
byte[] payload = UnsafeSerialize(msg);
@@ -132,8 +135,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(payload, HandleSendFinished);
+ call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
+
sendCompletionDelegate = completionDelegate;
+ initialMetadataSent = true;
+ streamingWritesCounter++;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 513902ee36..3710a65d6b 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -83,9 +83,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -98,6 +98,35 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Initiates sending a initial metadata.
+ /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
+ /// to make things simpler.
+ /// completionDelegate is invoked upon completion.
+ /// </summary>
+ public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(headers, "metadata");
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
+ Preconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
+ CheckSendingAllowed();
+
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ {
+ call.StartSendInitialMetadata(HandleSendFinished, metadataArray);
+ }
+
+ this.initialMetadataSent = true;
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
@@ -111,7 +140,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
+ call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 714749b171..3cb01e29bd 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -42,6 +42,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
+ public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
+
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
@@ -53,7 +55,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
@@ -62,7 +64,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
- MetadataArraySafeHandle metadataArray);
+ MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
@@ -70,7 +72,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -78,7 +80,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@@ -89,6 +91,10 @@ namespace Grpc.Core.Internal
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
@@ -103,17 +109,17 @@ namespace Grpc.Core.Internal
this.completionRegistry = completionRegistry;
}
- public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
- public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@@ -124,11 +130,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
@@ -138,11 +144,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
+ public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
@@ -152,11 +158,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
- public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@@ -173,6 +179,13 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
+ public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
+ }
+
public void Cancel()
{
grpcsharp_call_cancel(this).CheckOk();
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 7324ebdf57..7f03bf4ea5 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -47,7 +47,7 @@ namespace Grpc.Core.Internal
static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs);
[DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
+ static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect);
@@ -76,9 +76,9 @@ namespace Grpc.Core.Internal
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
- public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
+ public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
- var result = grpcsharp_channel_create_call(this, cq, method, host, deadline);
+ var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
result.SetCompletionRegistry(registry);
return result;
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 58f493463b..013f00ff6f 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -40,16 +40,18 @@ namespace Grpc.Core.Internal
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
{
readonly AsyncCall<TRequest, TResponse> call;
+ WriteOptions writeOptions;
public ClientRequestStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
+ this.writeOptions = call.Details.Options.WriteOptions;
}
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@@ -59,5 +61,24 @@ namespace Grpc.Core.Internal
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 19f0e3c57f..688f9f6fec 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -75,7 +75,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -131,7 +131,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -187,7 +187,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
@@ -247,7 +247,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
@@ -304,13 +304,14 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
- public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken)
+ public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
+ where TRequest : class
+ where TResponse : class
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
- return new ServerCallContext(
- newRpc.Method, newRpc.Host, peer, realtimeDeadline,
- newRpc.RequestMetadata, cancellationToken);
+ return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
+ newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 756dcee87f..03e39efc02 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -38,11 +38,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Writes responses asynchronously to an underlying AsyncCallServer object.
/// </summary>
- internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>
+ internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions
where TRequest : class
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
+ WriteOptions writeOptions;
public ServerResponseStream(AsyncCallServer<TRequest, TResponse> call)
{
@@ -52,7 +53,7 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@@ -62,5 +63,31 @@ namespace Grpc.Core.Internal
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ var taskSource = new AsyncCompletionTaskSource<object>();
+ call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
+ return taskSource.Task;
+ }
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptions;
+ }
+
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs
index 6fd0a7109d..a58dbdbc93 100644
--- a/src/csharp/Grpc.Core/Metadata.cs
+++ b/src/csharp/Grpc.Core/Metadata.cs
@@ -114,6 +114,16 @@ namespace Grpc.Core
entries.Add(item);
}
+ public void Add(string key, string value)
+ {
+ Add(new Entry(key, value));
+ }
+
+ public void Add(string key, byte[] valueBytes)
+ {
+ Add(new Entry(key, valueBytes));
+ }
+
public void Clear()
{
CheckWriteable();
diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs
index 032b1390db..75d81c64f3 100644
--- a/src/csharp/Grpc.Core/ServerCallContext.cs
+++ b/src/csharp/Grpc.Core/ServerCallContext.cs
@@ -36,15 +36,16 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
+using Grpc.Core.Internal;
+
namespace Grpc.Core
{
/// <summary>
/// Context for a server-side call.
/// </summary>
- public sealed class ServerCallContext
+ public class ServerCallContext
{
- // TODO(jtattermusch): expose method to send initial metadata back to client
-
+ private readonly CallSafeHandle callHandle;
private readonly string method;
private readonly string host;
private readonly string peer;
@@ -54,15 +55,34 @@ namespace Grpc.Core
private readonly Metadata responseTrailers = new Metadata();
private Status status = Status.DefaultSuccess;
+ private Func<Metadata, Task> writeHeadersFunc;
+ private IHasWriteOptions writeOptionsHolder;
- public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken)
+ internal ServerCallContext(CallSafeHandle callHandle, string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
+ Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder)
{
+ this.callHandle = callHandle;
this.method = method;
this.host = host;
this.peer = peer;
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
+ this.writeHeadersFunc = writeHeadersFunc;
+ this.writeOptionsHolder = writeOptionsHolder;
+ }
+
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ return writeHeadersFunc(responseHeaders);
+ }
+
+ /// <summary>
+ /// Creates a propagation token to be used to propagate call context to a child call.
+ /// </summary>
+ public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null)
+ {
+ return new ContextPropagationToken(callHandle, deadline, cancellationToken, options);
}
/// <summary>Name of method called in this RPC.</summary>
@@ -110,7 +130,7 @@ namespace Grpc.Core
}
}
- ///<summary>Cancellation token signals when call is cancelled.</summary>
+ /// <summary>Cancellation token signals when call is cancelled.</summary>
public CancellationToken CancellationToken
{
get
@@ -141,5 +161,31 @@ namespace Grpc.Core
status = value;
}
}
+
+ /// <summary>
+ /// Allows setting write options for the following write.
+ /// For streaming response calls, this property is also exposed as on IServerStreamWriter for convenience.
+ /// Both properties are backed by the same underlying value.
+ /// </summary>
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptionsHolder.WriteOptions;
+ }
+
+ set
+ {
+ writeOptionsHolder.WriteOptions = value;
+ }
+ }
+ }
+
+ /// <summary>
+ /// Allows sharing write options between ServerCallContext and other objects.
+ /// </summary>
+ public interface IHasWriteOptions
+ {
+ WriteOptions WriteOptions { get; set; }
}
}
diff --git a/src/csharp/Grpc.Core/Version.cs b/src/csharp/Grpc.Core/Version.cs
index b5cb652945..d2a029fbb4 100644
--- a/src/csharp/Grpc.Core/Version.cs
+++ b/src/csharp/Grpc.Core/Version.cs
@@ -2,4 +2,4 @@ using System.Reflection;
using System.Runtime.CompilerServices;
// The current version of gRPC C#.
-[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".*")]
+[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".0")]
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index 656a3d47bb..939372e237 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -8,6 +8,6 @@ namespace Grpc.Core
/// <summary>
/// Current version of gRPC
/// </summary>
- public const string CurrentVersion = "0.6.0";
+ public const string CurrentVersion = "0.6.1";
}
}
diff --git a/src/csharp/Grpc.Core/WriteOptions.cs b/src/csharp/Grpc.Core/WriteOptions.cs
new file mode 100644
index 0000000000..7ef3189d76
--- /dev/null
+++ b/src/csharp/Grpc.Core/WriteOptions.cs
@@ -0,0 +1,82 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ /// <summary>
+ /// Flags for write operations.
+ /// </summary>
+ [Flags]
+ public enum WriteFlags
+ {
+ /// <summary>
+ /// Hint that the write may be buffered and need not go out on the wire immediately.
+ /// gRPC is free to buffer the message until the next non-buffered
+ /// write, or until write stream completion, but it need not buffer completely or at all.
+ /// </summary>
+ BufferHint = 0x1,
+
+ /// <summary>
+ /// Force compression to be disabled for a particular write.
+ /// </summary>
+ NoCompress = 0x2
+ }
+
+ /// <summary>
+ /// Options for write operations.
+ /// </summary>
+ public class WriteOptions
+ {
+ /// <summary>
+ /// Default write options.
+ /// </summary>
+ public static readonly WriteOptions Default = new WriteOptions();
+
+ private WriteFlags flags;
+
+ public WriteOptions(WriteFlags flags = default(WriteFlags))
+ {
+ this.flags = flags;
+ }
+
+ public WriteFlags Flags
+ {
+ get
+ {
+ return this.flags;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index 08aece7ef2..73d2a1ca9b 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -92,15 +92,8 @@ namespace math.Tests
[Test]
public void DivByZero()
{
- try
- {
- DivReply response = client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build());
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode);
- }
+ var ex = Assert.Throws<RpcException>(() => client.Div(new DivArgs.Builder { Dividend = 0, Divisor = 0 }.Build()));
+ Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
}
[Test]
@@ -158,15 +151,10 @@ namespace math.Tests
using (var call = client.Fib(new FibArgs.Builder { Limit = 0 }.Build(),
deadline: DateTime.UtcNow.AddMilliseconds(500)))
{
- try
- {
- await call.ResponseStream.ToList();
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.DeadlineExceeded, e.Status.StatusCode);
- }
+ var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.ToList());
+
+ // We can't guarantee the status code always DeadlineExceeded. See issue #2685.
+ Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal });
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 7411d91d5a..6802de489d 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -404,15 +404,8 @@ namespace Grpc.IntegrationTesting
await Task.Delay(1000);
cts.Cancel();
- try
- {
- var response = await call.ResponseAsync;
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
- }
+ var ex = Assert.Throws<RpcException>(async () => await call.ResponseAsync);
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
Console.WriteLine("Passed!");
}
@@ -435,15 +428,8 @@ namespace Grpc.IntegrationTesting
cts.Cancel();
- try
- {
- await call.ResponseStream.MoveNext();
- Assert.Fail();
- }
- catch (RpcException e)
- {
- Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
- }
+ var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext());
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
Console.WriteLine("Passed!");
}
diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat
index 9e1253bf0b..8a11d01430 100644
--- a/src/csharp/build_packages.bat
+++ b/src/csharp/build_packages.bat
@@ -1,8 +1,8 @@
@rem Builds gRPC NuGet packages
@rem Current package versions
-set VERSION=0.6.0
-set CORE_VERSION=0.10.0
+set VERSION=0.6.1
+set CORE_VERSION=0.10.1
@rem Adjust the location of nuget.exe
set NUGET=C:\nuget\nuget.exe
diff --git a/src/csharp/doc/README.md b/src/csharp/doc/README.md
new file mode 100644
index 0000000000..585500b5ca
--- /dev/null
+++ b/src/csharp/doc/README.md
@@ -0,0 +1,2 @@
+
+SandCastle project files to generate HTML reference documentation. \ No newline at end of file
diff --git a/src/csharp/doc/grpc_csharp_public.shfbproj b/src/csharp/doc/grpc_csharp_public.shfbproj
new file mode 100644
index 0000000000..05c93f4a13
--- /dev/null
+++ b/src/csharp/doc/grpc_csharp_public.shfbproj
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <!-- The configuration and platform will be used to determine which assemblies to include from solution and
+ project documentation sources -->
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{77e3da09-fc92-486f-a90a-99ca788e8b59}</ProjectGuid>
+ <SHFBSchemaVersion>2015.6.5.0</SHFBSchemaVersion>
+ <!-- AssemblyName, Name, and RootNamespace are not used by SHFB but Visual Studio adds them anyway -->
+ <AssemblyName>Documentation</AssemblyName>
+ <RootNamespace>Documentation</RootNamespace>
+ <Name>Documentation</Name>
+ <!-- SHFB properties -->
+ <FrameworkVersion>.NET Framework 4.5</FrameworkVersion>
+ <OutputPath>..\..\..\doc\ref\csharp\html</OutputPath>
+ <Language>en-US</Language>
+ <DocumentationSources>
+ <DocumentationSource sourceFile="..\Grpc.Auth\Grpc.Auth.csproj" />
+<DocumentationSource sourceFile="..\Grpc.Core\Grpc.Core.csproj" /></DocumentationSources>
+ <BuildAssemblerVerbosity>OnlyWarningsAndErrors</BuildAssemblerVerbosity>
+ <HelpFileFormat>Website</HelpFileFormat>
+ <IndentHtml>False</IndentHtml>
+ <KeepLogFile>True</KeepLogFile>
+ <DisableCodeBlockComponent>False</DisableCodeBlockComponent>
+ <CleanIntermediates>True</CleanIntermediates>
+ <HelpFileVersion>1.0.0.0</HelpFileVersion>
+ <MaximumGroupParts>2</MaximumGroupParts>
+ <NamespaceGrouping>False</NamespaceGrouping>
+ <SyntaxFilters>Standard</SyntaxFilters>
+ <SdkLinkTarget>Blank</SdkLinkTarget>
+ <RootNamespaceContainer>True</RootNamespaceContainer>
+ <PresentationStyle>VS2013</PresentationStyle>
+ <Preliminary>False</Preliminary>
+ <NamingMethod>MemberName</NamingMethod>
+ <HelpTitle>gRPC C#</HelpTitle>
+ <ContentPlacement>AboveNamespaces</ContentPlacement>
+ <HtmlHelpName>Documentation</HtmlHelpName>
+ </PropertyGroup>
+ <!-- There are no properties for these groups. AnyCPU needs to appear in order for Visual Studio to perform
+ the build. The others are optional common platform types that may appear. -->
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x86' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x64' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|x64' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|Win32' ">
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|Win32' ">
+ </PropertyGroup>
+ <!-- Import the SHFB build targets -->
+ <Import Project="$(SHFBROOT)\SandcastleHelpFileBuilder.targets" />
+ <!-- The pre-build and post-build event properties must appear *after* the targets file import in order to be
+ evaluated correctly. -->
+ <PropertyGroup>
+ <PreBuildEvent>
+ </PreBuildEvent>
+ <PostBuildEvent>
+ </PostBuildEvent>
+ <RunPostBuildEvent>OnBuildSuccess</RunPostBuildEvent>
+ </PropertyGroup>
+</Project> \ No newline at end of file
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index d28ce1b383..88f092573e 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -377,10 +377,12 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) {
}
GPR_EXPORT grpc_call *GPR_CALLTYPE
-grpcsharp_channel_create_call(grpc_channel *channel, grpc_completion_queue *cq,
+grpcsharp_channel_create_call(grpc_channel *channel, grpc_call *parent_call,
+ gpr_uint32 propagation_mask,
+ grpc_completion_queue *cq,
const char *method, const char *host,
gpr_timespec deadline) {
- return grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
+ return grpc_channel_create_call(channel, parent_call, propagation_mask, cq,
method, host, deadline, NULL);
}
@@ -498,7 +500,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,
- grpc_metadata_array *initial_metadata) {
+ grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[6];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -512,7 +514,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
- ops[1].flags = 0;
+ ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = 0;
@@ -581,7 +583,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
- size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
+ size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[5];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -595,7 +597,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
- ops[1].flags = 0;
+ ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = 0;
@@ -656,16 +658,22 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
- const char *send_buffer, size_t send_buffer_len) {
+ const char *send_buffer, size_t send_buffer_len,
+ gpr_uint32 write_flags,
+ gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
- grpc_op ops[1];
+ grpc_op ops[2];
+ size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
- ops[0].flags = 0;
+ ops[0].flags = write_flags;
+ ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[1].data.send_initial_metadata.count = 0;
+ ops[1].data.send_initial_metadata.metadata = NULL;
+ ops[1].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -682,9 +690,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
- const char *status_details, grpc_metadata_array *trailing_metadata) {
+ const char *status_details, grpc_metadata_array *trailing_metadata,
+ gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
- grpc_op ops[1];
+ grpc_op ops[2];
+ size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@@ -696,9 +706,12 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ops[0].data.send_status_from_server.trailing_metadata =
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0;
+ ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[1].data.send_initial_metadata.count = 0;
+ ops[1].data.send_initial_metadata.metadata = NULL;
+ ops[1].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
- NULL);
+ return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -715,16 +728,28 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
- grpc_op ops[2];
- ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
- ops[0].data.send_initial_metadata.count = 0;
- ops[0].data.send_initial_metadata.metadata = NULL;
+ grpc_op ops[1];
+ ops[0].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops[0].data.recv_close_on_server.cancelled =
+ (&ctx->recv_close_on_server_cancelled);
ops[0].flags = 0;
- ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- ops[1].data.recv_close_on_server.cancelled =
- (&ctx->recv_close_on_server_cancelled);
- ops[1].flags = 0;
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+}
+
+GPR_EXPORT grpc_call_error GPR_CALLTYPE
+grpcsharp_call_send_initial_metadata(grpc_call *call,
+ grpcsharp_batch_context *ctx,
+ grpc_metadata_array *initial_metadata) {
+ /* TODO: don't use magic number */
+ grpc_op ops[1];
+ ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
+ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
+ initial_metadata);
+ ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
+ ops[0].data.send_initial_metadata.metadata =
+ ctx->send_initial_metadata.metadata;
+ ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@@ -859,6 +884,11 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_redirect_log(grpcsharp_log_func func) {
typedef void(GPR_CALLTYPE *test_callback_funcptr)(gpr_int32 success);
+/* Version info */
+GPR_EXPORT const char *GPR_CALLTYPE grpcsharp_version_string() {
+ return grpc_version_string();
+}
+
/* For testing */
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_test_callback(test_callback_funcptr callback) {
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js
index 236b36616c..221d69e246 100644
--- a/src/node/interop/interop_client.js
+++ b/src/node/interop/interop_client.js
@@ -69,9 +69,6 @@ function zeroBuffer(size) {
function emptyUnary(client, done) {
var call = client.emptyCall({}, function(err, resp) {
assert.ifError(err);
- });
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
@@ -96,9 +93,6 @@ function largeUnary(client, done) {
assert.ifError(err);
assert.strictEqual(resp.payload.type, 'COMPRESSABLE');
assert.strictEqual(resp.payload.body.length, 314159);
- });
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
@@ -115,9 +109,6 @@ function clientStreaming(client, done) {
var call = client.streamingInputCall(function(err, resp) {
assert.ifError(err);
assert.strictEqual(resp.aggregated_payload_size, 74922);
- });
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
@@ -308,9 +299,6 @@ function authTest(expected_user, scope, client, done) {
assert.strictEqual(resp.payload.body.length, 314159);
assert.strictEqual(resp.username, expected_user);
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
- });
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
@@ -344,9 +332,6 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) {
assert.ifError(err);
assert.strictEqual(resp.username, expected_user);
assert.strictEqual(resp.oauth_scope, AUTH_SCOPE_RESPONSE);
- });
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
if (done) {
done();
}
@@ -358,7 +343,6 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) {
client.updateMetadata = updateMetadata;
makeTestCall(null, {});
}
-
});
});
}
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 5f7d74bca8..0f4c811ce4 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -74,11 +74,20 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// all. This wrapper over our actual writeable ensures thread-safety and
// correct ordering.
GRXConcurrentWriteable *_responseWriteable;
+
+ // The network thread wants the requestWriter to resume (when the server is ready for more input),
+ // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
+ // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
+ // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
+ // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
+ // pause the writer immediately on writeValue:, so we need our locking to be recursive.
GRXWriter *_requestWriter;
// To create a retain cycle when a call is started, up until it finishes. See
- // |startWithWriteable:| and |finishWithError:|.
- GRPCCall *_self;
+ // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
+ // reference to the call object if all they're interested in is the handler being executed when
+ // the response arrives.
+ GRPCCall *_retainSelf;
NSMutableDictionary *_requestMetadata;
NSMutableDictionary *_responseMetadata;
@@ -136,11 +145,12 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
- (void)finishWithError:(NSError *)errorOrNil {
// If the call isn't retained anywhere else, it can be deallocated now.
- _self = nil;
+ _retainSelf = nil;
// If there were still request messages coming, stop them.
- _requestWriter.state = GRXWriterStateFinished;
- _requestWriter = nil;
+ @synchronized(_requestWriter) {
+ _requestWriter.state = GRXWriterStateFinished;
+ }
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
@@ -240,12 +250,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
- strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ @synchronized(strongSelf->_requestWriter) {
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ }
}
};
- [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc]
- initWithMessage:message
- handler:resumingHandler]] errorHandler:errorHandler];
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
+ handler:resumingHandler]]
+ errorHandler:errorHandler];
}
- (void)writeValue:(id)value {
@@ -253,7 +265,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
- _requestWriter.state = GRXWriterStatePaused;
+ @synchronized(_requestWriter) {
+ _requestWriter.state = GRXWriterStatePaused;
+ }
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
@@ -273,7 +287,6 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _requestWriter = nil;
if (errorOrNil) {
[self cancel];
} else {
@@ -327,7 +340,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
}];
// Now that the RPC has been initiated, request writes can start.
- [_requestWriter startWithWriteable:self];
+ @synchronized(_requestWriter) {
+ [_requestWriter startWithWriteable:self];
+ }
}
#pragma mark GRXWriter implementation
@@ -338,7 +353,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// before being autoreleased).
// Care is taken not to retain self strongly in any of the blocks used in this implementation, so
// that the life of the instance is determined by this retain cycle.
- _self = self;
+ _retainSelf = self;
_responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
[self sendHeaders:_requestMetadata];
diff --git a/src/objective-c/GRPCClient/private/GRPCSecureChannel.m b/src/objective-c/GRPCClient/private/GRPCSecureChannel.m
index 9b4b6768f8..0a54804bb2 100644
--- a/src/objective-c/GRPCClient/private/GRPCSecureChannel.m
+++ b/src/objective-c/GRPCClient/private/GRPCSecureChannel.m
@@ -38,15 +38,18 @@
// Returns NULL if the file at path couldn't be read. In that case, if errorPtr isn't NULL,
// *errorPtr will be an object describing what went wrong.
static grpc_credentials *CertificatesAtPath(NSString *path, NSError **errorPtr) {
- NSString *certsContent = [NSString stringWithContentsOfFile:path
- encoding:NSASCIIStringEncoding
+ // Files in PEM format can have non-ASCII characters in their comments (e.g. for the name of the
+ // issuer). Load them as UTF8 and produce an ASCII equivalent.
+ NSString *contentInUTF8 = [NSString stringWithContentsOfFile:path
+ encoding:NSUTF8StringEncoding
error:errorPtr];
- if (!certsContent) {
+ NSData *contentInASCII = [contentInUTF8 dataUsingEncoding:NSASCIIStringEncoding
+ allowLossyConversion:YES];
+ if (!contentInASCII.bytes) {
// Passing NULL to grpc_ssl_credentials_create produces behavior we don't want, so return.
return NULL;
}
- const char * asCString = [certsContent cStringUsingEncoding:NSASCIIStringEncoding];
- return grpc_ssl_credentials_create(asCString, NULL);
+ return grpc_ssl_credentials_create(contentInASCII.bytes, NULL);
}
@implementation GRPCSecureChannel
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index b6296e1ed7..ca94ce275f 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -36,13 +36,11 @@
#import "GRXWriteable.h"
#import "GRXWriter.h"
-// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
-// to -startWithWriteable:).
+// A buffered pipe is a Writer that also acts as a Writeable.
// Once it is started, whatever values are written into it (via -writeValue:) will be propagated
// immediately, unless flow control prevents it.
// If it is throttled and keeps receiving values, as well as if it receives values before being
-// started, it will buffer them and propagate them in order as soon as its state becomes
-// GRXWriterStateStarted.
+// started, it will buffer them and propagate them in order as soon as its state becomes Started.
// If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
// propagate the error immediately.
//
@@ -51,6 +49,9 @@
// pipe will keep buffering all data written to it, your application could run out of memory and
// crash. If you want to react to flow control signals to prevent that, instead of using this class
// you can implement an object that conforms to GRXWriter.
+//
+// Thread-safety:
+// The methods of an object of this class should not be called concurrently from different threads.
@interface GRXBufferedPipe : GRXWriter<GRXWriteable>
// Convenience constructor.
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h
index d004333d2b..f310832284 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.h
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h
@@ -33,11 +33,17 @@
#import "GRXWriter.h"
-// A "proxy" class that simply forwards values, completion, and errors from its
-// input writer to its writeable.
+// A "proxy" class that simply forwards values, completion, and errors from its input writer to its
+// writeable.
// It is useful as a superclass for pipes that act as a transformation of their
// input writer, and for classes that represent objects with input and
// output sequences of values, like an RPC.
+//
+// Thread-safety:
+// All messages sent to this object need to be serialized. When it is started, the writer it wraps
+// is started in the same thread. Manual state changes are propagated to the wrapped writer in the
+// same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be
+// serialized with any message sent to this object.
@interface GRXForwardingWriter : GRXWriter
- (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER;
@end
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m
index 2342f51ab3..a72be9ace2 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.m
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m
@@ -48,7 +48,11 @@
// Designated initializer
- (instancetype)initWithWriter:(GRXWriter *)writer {
if (!writer) {
- [NSException raise:NSInvalidArgumentException format:@"writer can't be nil."];
+ return nil;
+ }
+ if (writer.state != GRXWriterStateNotStarted) {
+ [NSException raise:NSInvalidArgumentException
+ format:@"The writer argument must not have already started."];
}
if ((self = [super init])) {
_writer = writer;
diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.h b/src/objective-c/RxLibrary/GRXImmediateWriter.h
index b171f0c760..3fcc259434 100644
--- a/src/objective-c/RxLibrary/GRXImmediateWriter.h
+++ b/src/objective-c/RxLibrary/GRXImmediateWriter.h
@@ -36,10 +36,17 @@
#import "GRXWriter.h"
// Utility to construct GRXWriter instances from values that are immediately available when
-// required. The returned writers all support pausing and early termination.
+// required.
//
-// Unless the writeable callback pauses them or stops them early, these writers will do all their
-// interactions with the writeable before the start method returns.
+// Thread-safety:
+//
+// An object of this class shouldn't be messaged concurrently by more than one thread. It will start
+// messaging the writeable before |startWithWriteable:| returns, in the same thread. That is the
+// only place where the writer can be paused or stopped prematurely.
+//
+// If a paused writer of this class is resumed, it will start messaging the writeable, in the same
+// thread, before |setState:| returns. Because the object can't be legally accessed concurrently,
+// that's the only place where it can be paused again (or stopped).
@interface GRXImmediateWriter : GRXWriter
// Returns a writer that pulls values from the passed NSEnumerator instance and pushes them to
diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h
index 5d6e1a472a..b1c994aa38 100644
--- a/src/objective-c/RxLibrary/GRXWriter.h
+++ b/src/objective-c/RxLibrary/GRXWriter.h
@@ -35,84 +35,73 @@
#import "GRXWriteable.h"
+// States of a writer.
typedef NS_ENUM(NSInteger, GRXWriterState) {
- // The writer has not yet been given a writeable to which it can push its
- // values. To have an writer transition to the Started state, send it a
- // startWithWriteable: message.
+ // The writer has not yet been given a writeable to which it can push its values. To have a writer
+ // transition to the Started state, send it a startWithWriteable: message.
//
- // An writer's state cannot be manually set to this value.
+ // A writer's state cannot be manually set to this value.
GRXWriterStateNotStarted,
// The writer might push values to the writeable at any moment.
GRXWriterStateStarted,
- // The writer is temporarily paused, and won't send any more values to the
- // writeable unless its state is set back to Started. The writer might still
- // transition to the Finished state at any moment, and is allowed to send
- // writesFinishedWithError: to its writeable.
- //
- // Not all implementations of writer have to support pausing, and thus
- // trying to set an writer's state to this value might have no effect.
+ // The writer is temporarily paused, and won't send any more values to the writeable unless its
+ // state is set back to Started. The writer might still transition to the Finished state at any
+ // moment, and is allowed to send writesFinishedWithError: to its writeable.
GRXWriterStatePaused,
// The writer has released its writeable and won't interact with it anymore.
//
- // One seldomly wants to set an writer's state to this value, as its
- // writeable isn't notified with a writesFinishedWithError: message. Instead, sending
- // finishWithError: to the writer will make it notify the writeable and then
- // transition to this state.
+ // One seldomly wants to set a writer's state to this value, as its writeable isn't notified with
+ // a writesFinishedWithError: message. Instead, sending finishWithError: to the writer will make
+ // it notify the writeable and then transition to this state.
GRXWriterStateFinished
};
-// An object that conforms to this protocol can produce, on demand, a sequence
-// of values. The sequence may be produced asynchronously, and it may consist of
-// any number of elements, including none or an infinite number.
+// An GRXWriter object can produce, on demand, a sequence of values. The sequence may be produced
+// asynchronously, and it may consist of any number of elements, including none or an infinite
+// number.
+//
+// GRXWriter is the active dual of NSEnumerator. The difference between them is thus whether the
+// object plays an active or passive role during usage: A user of NSEnumerator pulls values off it,
+// and passes the values to a writeable. A user of GRXWriter, though, just gives it a writeable, and
+// the GRXWriter instance pushes values to the writeable. This makes this protocol suitable to
+// represent a sequence of future values, as well as collections with internal iteration.
//
-// GRXWriter is the active dual of NSEnumerator. The difference between them
-// is thus whether the object plays an active or passive role during usage: A
-// user of NSEnumerator pulls values off it, and passes the values to a writeable.
-// A user of GRXWriter, though, just gives it a writeable, and the
-// GRXWriter instance pushes values to the writeable. This makes this protocol
-// suitable to represent a sequence of future values, as well as collections
-// with internal iteration.
+// An instance of GRXWriter can start producing values after a writeable is passed to it. It can
+// also be commanded to finish the sequence immediately (with an optional error). Finally, it can be
+// asked to pause, and resumed later. All GRXWriter objects support pausing and early termination.
//
-// An instance of GRXWriter can start producing values after a writeable is
-// passed to it. It can also be commanded to finish the sequence immediately
-// (with an optional error). Finally, it can be asked to pause, but the
-// conforming instance is not required to oblige.
+// Thread-safety:
//
-// Unless otherwise indicated by a conforming class, no messages should be sent
-// concurrently to a GRXWriter. I.e., conforming classes aren't required to
-// be thread-safe.
+// State transitions take immediate effect if the object is used from a single thread. Subclasses
+// might offer stronger guarantees.
+//
+// Unless otherwise indicated by a conforming subclass, no messages should be sent concurrently to a
+// GRXWriter. I.e., conforming classes aren't required to be thread-safe.
@interface GRXWriter : NSObject
-// This property can be used to query the current state of the writer, which
-// determines how it might currently use its writeable. Some state transitions can
-// be triggered by setting this property to the corresponding value, and that's
-// useful for advanced use cases like pausing an writer. For more details,
-// see the documentation of the enum.
+// This property can be used to query the current state of the writer, which determines how it might
+// currently use its writeable. Some state transitions can be triggered by setting this property to
+// the corresponding value, and that's useful for advanced use cases like pausing an writer. For
+// more details, see the documentation of the enum further down.
@property(nonatomic) GRXWriterState state;
-// Start sending messages to the writeable. Messages may be sent before the method
-// returns, or they may be sent later in the future. See GRXWriteable.h for the
-// different messages a writeable can receive.
+// Transition to the Started state, and start sending messages to the writeable (a reference to it
+// is retained). Messages to the writeable may be sent before the method returns, or they may be
+// sent later in the future. See GRXWriteable.h for the different messages a writeable can receive.
//
-// If this writer draws its values from an external source (e.g. from the
-// filesystem or from a server), calling this method will commonly trigger side
-// effects (like network connections).
+// If this writer draws its values from an external source (e.g. from the filesystem or from a
+// server), calling this method will commonly trigger side effects (like network connections).
//
// This method might only be called on writers in the NotStarted state.
- (void)startWithWriteable:(id<GRXWriteable>)writeable;
-// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send
-// any more messages to it.
-//
-// This method might only be called on writers in the Started or Paused
-// state.
+// Send writesFinishedWithError:errorOrNil to the writeable. Then release the reference to it and
+// transition to the Finished state.
//
-// TODO(jcanizales): Consider adding some guarantee about the immediacy of that
-// stopping. I know I've relied on it in part of the code that uses this, but
-// can't remember the details in the presence of concurrency.
+// This method might only be called on writers in the Started or Paused state.
- (void)finishWithError:(NSError *)errorOrNil;
@end
diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m
index e85dd6e65c..f23102988b 100644
--- a/src/objective-c/tests/GRPCClientTests.m
+++ b/src/objective-c/tests/GRPCClientTests.m
@@ -114,7 +114,7 @@ static ProtoMethod *kUnaryCallMethod;
[call startWithWriteable:responsesWriteable];
- [self waitForExpectationsWithTimeout:4 handler:nil];
+ [self waitForExpectationsWithTimeout:8 handler:nil];
}
- (void)testSimpleProtoRPC {
@@ -146,7 +146,7 @@ static ProtoMethod *kUnaryCallMethod;
[call startWithWriteable:responsesWriteable];
- [self waitForExpectationsWithTimeout:4 handler:nil];
+ [self waitForExpectationsWithTimeout:8 handler:nil];
}
- (void)testMetadata {
diff --git a/src/objective-c/tests/InteropTests.h b/src/objective-c/tests/InteropTests.h
index 4eb97e9e06..1045c3d124 100644
--- a/src/objective-c/tests/InteropTests.h
+++ b/src/objective-c/tests/InteropTests.h
@@ -37,8 +37,7 @@
// https://github.com/grpc/grpc/blob/master/doc/interop-test-descriptions.md
@interface InteropTests : XCTestCase
-// Returns @"localhost:5050".
+// Returns @"grpc-test.sandbox.google.com".
// Override in a subclass to perform the same tests against a different address.
-// For interop tests, use @"grpc-test.sandbox.google.com".
+ (NSString *)host;
@end
diff --git a/src/objective-c/tests/InteropTests.m b/src/objective-c/tests/InteropTests.m
index b61d567464..1b63fe2059 100644
--- a/src/objective-c/tests/InteropTests.m
+++ b/src/objective-c/tests/InteropTests.m
@@ -78,20 +78,17 @@
#pragma mark Tests
-static NSString * const kLocalCleartextHost = @"localhost:5050";
+static NSString * const kRemoteSSLHost = @"grpc-test.sandbox.google.com";
@implementation InteropTests {
RMTTestService *_service;
}
+ (NSString *)host {
- return kLocalCleartextHost;
+ return kRemoteSSLHost;
}
- (void)setUp {
- // Register test server as non-SSL.
- [GRPCCall useInsecureConnectionsForHost:kLocalCleartextHost];
-
_service = [[RMTTestService alloc] initWithHost:self.class.host];
}
@@ -131,7 +128,7 @@ static NSString * const kLocalCleartextHost = @"localhost:5050";
[expectation fulfill];
}];
- [self waitForExpectationsWithTimeout:8 handler:nil];
+ [self waitForExpectationsWithTimeout:16 handler:nil];
}
- (void)testClientStreamingRPC {
diff --git a/include/grpc++/auth_property_iterator.h b/src/objective-c/tests/InteropTestsLocalCleartext.m
index c7870c46be..2d7d3c4b2c 100644
--- a/include/grpc++/auth_property_iterator.h
+++ b/src/objective-c/tests/InteropTestsLocalCleartext.m
@@ -31,47 +31,29 @@
*
*/
-#ifndef GRPCXX_AUTH_PROPERTY_ITERATOR_H
-#define GRPCXX_AUTH_PROPERTY_ITERATOR_H
+// Repeat of the tests in InteropTests.m, but sending the RPCs to a local cleartext server instead
+// of the remote SSL one.
-#include <iterator>
-#include <vector>
+#import <GRPCClient/GRPCCall+Tests.h>
-#include <grpc++/config.h>
+#import "InteropTests.h"
-struct grpc_auth_context;
-struct grpc_auth_property;
-struct grpc_auth_property_iterator;
+static NSString * const kLocalCleartextHost = @"localhost:5050";
-namespace grpc {
-class SecureAuthContext;
+@interface InteropTestsLocalCleartext : InteropTests
+@end
-typedef std::pair<grpc::string, grpc::string> AuthProperty;
+@implementation InteropTestsLocalCleartext
-class AuthPropertyIterator
- : public std::iterator<std::input_iterator_tag, const AuthProperty> {
- public:
- ~AuthPropertyIterator();
- AuthPropertyIterator& operator++();
- AuthPropertyIterator operator++(int);
- bool operator==(const AuthPropertyIterator& rhs) const;
- bool operator!=(const AuthPropertyIterator& rhs) const;
- const AuthProperty operator*();
++ (NSString *)host {
+ return kLocalCleartextHost;
+}
- protected:
- AuthPropertyIterator();
- AuthPropertyIterator(const grpc_auth_property* property,
- const grpc_auth_property_iterator* iter);
- private:
- friend class SecureAuthContext;
- const grpc_auth_property* property_;
- // The following items form a grpc_auth_property_iterator.
- const grpc_auth_context* ctx_;
- size_t index_;
- const char* name_;
-};
+- (void)setUp {
+ // Register test server as non-SSL.
+ [GRPCCall useInsecureConnectionsForHost:kLocalCleartextHost];
-} // namespace grpc
-
- #endif // GRPCXX_AUTH_PROPERTY_ITERATOR_H
+ [super setUp];
+}
+@end
diff --git a/src/objective-c/tests/InteropTestsLocalSSL.m b/src/objective-c/tests/InteropTestsLocalSSL.m
index 227ca79659..f69f806dcf 100644
--- a/src/objective-c/tests/InteropTestsLocalSSL.m
+++ b/src/objective-c/tests/InteropTestsLocalSSL.m
@@ -31,8 +31,8 @@
*
*/
-// Repeat of the tests in InteropTests.m, but using SSL to communicate with the local server instead
-// of cleartext.
+// Repeat of the tests in InteropTests.m, but sending the RPCs to a local SSL server instead of the
+// remote one.
#import <GRPCClient/GRPCCall+Tests.h>
diff --git a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
index af98aba9c0..3a1c3d940a 100644
--- a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
+++ b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj
@@ -13,6 +13,7 @@
63423F511B151B77006CF63C /* RxLibraryUnitTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 63423F501B151B77006CF63C /* RxLibraryUnitTests.m */; };
635697CD1B14FC11007A7283 /* Tests.m in Sources */ = {isa = PBXBuildFile; fileRef = 635697CC1B14FC11007A7283 /* Tests.m */; };
635ED2EC1B1A3BC400FDE5C3 /* InteropTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */; };
+ 63715F561B780C020029CB0B /* InteropTestsLocalCleartext.m in Sources */ = {isa = PBXBuildFile; fileRef = 63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */; };
63E240CE1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m in Sources */ = {isa = PBXBuildFile; fileRef = 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */; };
63E240D01B6C63DC005F3B0E /* TestCertificates.bundle in Resources */ = {isa = PBXBuildFile; fileRef = 63E240CF1B6C63DC005F3B0E /* TestCertificates.bundle */; };
7D8A186224D39101F90230F6 /* libPods.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 35F2B6BF3BAE8F0DC4AFD76E /* libPods.a */; };
@@ -51,6 +52,7 @@
635697CC1B14FC11007A7283 /* Tests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = Tests.m; sourceTree = "<group>"; };
635697D81B14FC11007A7283 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTests.m; sourceTree = "<group>"; };
+ 63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTestsLocalCleartext.m; sourceTree = "<group>"; };
63E240CC1B6C4D3A005F3B0E /* InteropTests.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = InteropTests.h; sourceTree = "<group>"; };
63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = InteropTestsLocalSSL.m; sourceTree = "<group>"; };
63E240CF1B6C63DC005F3B0E /* TestCertificates.bundle */ = {isa = PBXFileReference; lastKnownFileType = "wrapper.plug-in"; path = TestCertificates.bundle; sourceTree = "<group>"; };
@@ -117,14 +119,15 @@
635697C91B14FC11007A7283 /* Tests */ = {
isa = PBXGroup;
children = (
- 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */,
6312AE4D1B1BF49B00341DEE /* GRPCClientTests.m */,
- 63175DFE1B1B9FAF00027841 /* LocalClearTextTests.m */,
+ 63E240CC1B6C4D3A005F3B0E /* InteropTests.h */,
635ED2EB1B1A3BC400FDE5C3 /* InteropTests.m */,
+ 63E240CD1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m */,
+ 63715F551B780C020029CB0B /* InteropTestsLocalCleartext.m */,
63423F501B151B77006CF63C /* RxLibraryUnitTests.m */,
+ 63175DFE1B1B9FAF00027841 /* LocalClearTextTests.m */,
635697CC1B14FC11007A7283 /* Tests.m */,
635697D71B14FC11007A7283 /* Supporting Files */,
- 63E240CC1B6C4D3A005F3B0E /* InteropTests.h */,
);
name = Tests;
sourceTree = SOURCE_ROOT;
@@ -261,6 +264,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
+ 63715F561B780C020029CB0B /* InteropTestsLocalCleartext.m in Sources */,
63175DFF1B1B9FAF00027841 /* LocalClearTextTests.m in Sources */,
63423F511B151B77006CF63C /* RxLibraryUnitTests.m in Sources */,
63E240CE1B6C4E2B005F3B0E /* InteropTestsLocalSSL.m in Sources */,
diff --git a/src/python/grpcio/grpc/_adapter/_c/module.c b/src/python/grpcio/grpc/_adapter/_c/module.c
index 1f3aedd9d8..9b93b051f6 100644
--- a/src/python/grpcio/grpc/_adapter/_c/module.c
+++ b/src/python/grpcio/grpc/_adapter/_c/module.c
@@ -53,6 +53,12 @@ PyMODINIT_FUNC init_c(void) {
return;
}
+ if (PyModule_AddStringConstant(
+ module, "PRIMARY_USER_AGENT_KEY",
+ GRPC_ARG_PRIMARY_USER_AGENT_STRING) < 0) {
+ return;
+ }
+
/* GRPC maintains an internal counter of how many times it has been
initialized and handles multiple pairs of grpc_init()/grpc_shutdown()
invocations accordingly. */
diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py
index dcf67dbc11..239aac81b2 100644
--- a/src/python/grpcio/grpc/_adapter/_low.py
+++ b/src/python/grpcio/grpc/_adapter/_low.py
@@ -27,9 +27,12 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+from grpc import _grpcio_metadata
from grpc._adapter import _c
from grpc._adapter import _types
+_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
+
ClientCredentials = _c.ClientCredentials
ServerCredentials = _c.ServerCredentials
@@ -76,6 +79,7 @@ class Call(_types.Call):
class Channel(_types.Channel):
def __init__(self, target, args, creds=None):
+ args = list(args) + [(_c.PRIMARY_USER_AGENT_KEY, _USER_AGENT)]
if creds is None:
self.channel = _c.Channel(target, args)
else:
diff --git a/src/python/grpcio_test/grpc_interop/_interop_test_case.py b/src/python/grpcio_test/grpc_interop/_interop_test_case.py
index ed8f7ef009..b6d06b300d 100644
--- a/src/python/grpcio_test/grpc_interop/_interop_test_case.py
+++ b/src/python/grpcio_test/grpc_interop/_interop_test_case.py
@@ -59,3 +59,6 @@ class InteropTestCase(object):
def testCancelAfterFirstResponse(self):
methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability(self.stub, None)
+
+ def testTimeoutOnSleepingServer(self):
+ methods.TestCase.TIMEOUT_ON_SLEEPING_SERVER.test_interoperability(self.stub, None)
diff --git a/src/python/grpcio_test/grpc_interop/methods.py b/src/python/grpcio_test/grpc_interop/methods.py
index f4c94685ee..7a831f3cbd 100644
--- a/src/python/grpcio_test/grpc_interop/methods.py
+++ b/src/python/grpcio_test/grpc_interop/methods.py
@@ -33,10 +33,12 @@ import enum
import json
import os
import threading
+import time
from oauth2client import client as oauth2client_client
from grpc.framework.alpha import utilities
+from grpc.framework.alpha import exceptions
from grpc_interop import empty_pb2
from grpc_interop import messages_pb2
@@ -318,6 +320,24 @@ def _cancel_after_first_response(stub):
raise ValueError('expected call to be cancelled')
+def _timeout_on_sleeping_server(stub):
+ request_payload_size = 27182
+ with stub, _Pipe() as pipe:
+ response_iterator = stub.FullDuplexCall(pipe, 0.001)
+
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ payload=messages_pb2.Payload(body=b'\x00' * request_payload_size))
+ pipe.add(request)
+ time.sleep(0.1)
+ try:
+ next(response_iterator)
+ except exceptions.ExpirationError:
+ pass
+ else:
+ raise ValueError('expected call to exceed deadline')
+
+
def _compute_engine_creds(stub, args):
response = _large_unary_common_behavior(stub, True, True)
if args.default_service_account != response.username:
@@ -351,6 +371,7 @@ class TestCase(enum.Enum):
CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
SERVICE_ACCOUNT_CREDS = 'service_account_creds'
+ TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
def test_interoperability(self, stub, args):
if self is TestCase.EMPTY_UNARY:
@@ -367,6 +388,8 @@ class TestCase(enum.Enum):
_cancel_after_begin(stub)
elif self is TestCase.CANCEL_AFTER_FIRST_RESPONSE:
_cancel_after_first_response(stub)
+ elif self is TestCase.TIMEOUT_ON_SLEEPING_SERVER:
+ _timeout_on_sleeping_server(stub)
elif self is TestCase.COMPUTE_ENGINE_CREDS:
_compute_engine_creds(stub, args)
elif self is TestCase.SERVICE_ACCOUNT_CREDS:
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/__init__.py b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_protoc_plugin/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/test/compiler/python_plugin_test.py b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py
index 0e58d912b9..b200d129a9 100644
--- a/test/compiler/python_plugin_test.py
+++ b/src/python/grpcio_test/grpc_protoc_plugin/python_plugin_test.py
@@ -29,9 +29,11 @@
import argparse
import contextlib
+import distutils.spawn
import errno
import itertools
import os
+import pkg_resources
import shutil
import subprocess
import sys
@@ -58,9 +60,6 @@ SHORT_TIMEOUT = 2
LONG_TIMEOUT = 600
NO_DELAY = 0
-# Build mode environment variable set by tools/run_tests/run_tests.py.
-_build_mode = os.environ['CONFIG']
-
class _ServicerMethods(object):
@@ -228,9 +227,13 @@ class PythonPluginTest(unittest.TestCase):
"""
def setUp(self):
- protoc_command = '../../bins/%s/protobuf/protoc' % _build_mode
- protoc_plugin_filename = '../../bins/%s/grpc_python_plugin' % _build_mode
- test_proto_filename = './test.proto'
+ # Assume that the appropriate protoc and grpc_python_plugins are on the
+ # path.
+ protoc_command = 'protoc'
+ protoc_plugin_filename = distutils.spawn.find_executable(
+ 'grpc_python_plugin')
+ test_proto_filename = pkg_resources.resource_filename(
+ 'grpc_protoc_plugin', 'test.proto')
if not os.path.isfile(protoc_command):
# Assume that if we haven't built protoc that it's on the system.
protoc_command = 'protoc'
@@ -242,12 +245,13 @@ class PythonPluginTest(unittest.TestCase):
cmd = [
protoc_command,
'--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
- '-I %s' % os.path.dirname(test_proto_filename),
+ '-I .',
'--python_out=%s' % self.outdir,
'--python-grpc_out=%s' % self.outdir,
os.path.basename(test_proto_filename),
]
- subprocess.call(' '.join(cmd), shell=True)
+ subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
+ cwd=os.path.dirname(test_proto_filename))
sys.path.append(self.outdir)
def tearDown(self):
diff --git a/test/compiler/test.proto b/src/python/grpcio_test/grpc_protoc_plugin/test.proto
index ed7c6a7b79..ed7c6a7b79 100644
--- a/test/compiler/test.proto
+++ b/src/python/grpcio_test/grpc_protoc_plugin/test.proto
diff --git a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
index 9a8edfad0c..b6583662f3 100644
--- a/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
+++ b/src/python/grpcio_test/grpc_test/_adapter/_low_test.py
@@ -31,11 +31,12 @@ import threading
import time
import unittest
+from grpc import _grpcio_metadata
from grpc._adapter import _types
from grpc._adapter import _low
-def WaitForEvents(completion_queues, deadline):
+def wait_for_events(completion_queues, deadline):
"""
Args:
completion_queues: list of completion queues to wait for events on
@@ -62,6 +63,7 @@ def WaitForEvents(completion_queues, deadline):
thread.join()
return results
+
class InsecureServerInsecureClient(unittest.TestCase):
def setUp(self):
@@ -123,16 +125,21 @@ class InsecureServerInsecureClient(unittest.TestCase):
], client_call_tag)
self.assertEquals(_types.CallError.OK, client_start_batch_result)
- client_no_event, request_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
+ client_no_event, request_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 2)
self.assertEquals(client_no_event, None)
self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type)
self.assertIsInstance(request_event.call, _low.Call)
self.assertIs(server_request_tag, request_event.tag)
self.assertEquals(1, len(request_event.results))
- got_initial_metadata = dict(request_event.results[0].initial_metadata)
+ received_initial_metadata = dict(request_event.results[0].initial_metadata)
+ # Check that our metadata were transmitted
self.assertEquals(
dict(client_initial_metadata),
- dict((x, got_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
+ dict((x, received_initial_metadata[x]) for x in zip(*client_initial_metadata)[0]))
+ # Check that Python's user agent string is a part of the full user agent
+ # string
+ self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__),
+ received_initial_metadata['user-agent'])
self.assertEquals(METHOD, request_event.call_details.method)
self.assertEquals(HOST, request_event.call_details.host)
self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE)
@@ -150,7 +157,7 @@ class InsecureServerInsecureClient(unittest.TestCase):
], server_call_tag)
self.assertEquals(_types.CallError.OK, server_start_batch_result)
- client_event, server_event, = WaitForEvents([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
+ client_event, server_event, = wait_for_events([self.client_completion_queue, self.server_completion_queue], time.time() + 1)
self.assertEquals(6, len(client_event.results))
found_client_op_types = set()
diff --git a/src/python/grpcio_test/setup.py b/src/python/grpcio_test/setup.py
index 925c32720f..a6203cae2d 100644
--- a/src/python/grpcio_test/setup.py
+++ b/src/python/grpcio_test/setup.py
@@ -48,8 +48,13 @@ _PACKAGE_DIRECTORIES = {
_PACKAGE_DATA = {
'grpc_interop': [
- 'credentials/ca.pem', 'credentials/server1.key',
- 'credentials/server1.pem',]
+ 'credentials/ca.pem',
+ 'credentials/server1.key',
+ 'credentials/server1.pem',
+ ],
+ 'grpc_protoc_plugin': [
+ 'test.proto',
+ ],
}
_SETUP_REQUIRES = (
@@ -75,5 +80,5 @@ setuptools.setup(
package_data=_PACKAGE_DATA,
install_requires=_INSTALL_REQUIRES + _SETUP_REQUIRES,
setup_requires=_SETUP_REQUIRES,
- cmdclass=_COMMAND_CLASS
+ cmdclass=_COMMAND_CLASS,
)
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index ec6cb912f1..70f0795f29 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -179,6 +179,19 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
return Qnil;
}
+/* Called to obtain the peer that this call is connected to. */
+static VALUE grpc_rb_call_get_peer(VALUE self) {
+ VALUE res = Qnil;
+ grpc_call *call = NULL;
+ char *peer = NULL;
+ TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+ peer = grpc_call_get_peer(call);
+ res = rb_str_new2(peer);
+ gpr_free(peer);
+
+ return res;
+}
+
/*
call-seq:
status = call.status
@@ -720,6 +733,7 @@ void Init_grpc_call() {
/* Add ruby analogues of the Call methods. */
rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
+ rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0);
rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index a80e484fe1..01762980fc 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -37,6 +37,7 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
+#include <grpc/support/alloc.h>
#include "rb_grpc.h"
#include "rb_call.h"
#include "rb_channel_args.h"
@@ -250,6 +251,21 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
return Qnil;
}
+
+/* Called to obtain the target that this channel accesses. */
+static VALUE grpc_rb_channel_get_target(VALUE self) {
+ grpc_rb_channel *wrapper = NULL;
+ VALUE res = Qnil;
+ char* target = NULL;
+
+ TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
+ target = grpc_channel_get_target(wrapper->wrapped);
+ res = rb_str_new2(target);
+ gpr_free(target);
+
+ return res;
+}
+
void Init_grpc_channel() {
grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
grpc_rb_cChannel =
@@ -266,6 +282,7 @@ void Init_grpc_channel() {
/* Add ruby analogues of the Channel methods. */
rb_define_method(grpc_rb_cChannel, "create_call",
grpc_rb_channel_create_call, 4);
+ rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
rb_define_alias(grpc_rb_cChannel, "close", "destroy");
diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec
index dd4e27df51..45f31329e9 100755
--- a/src/ruby/grpc.gemspec
+++ b/src/ruby/grpc.gemspec
@@ -16,12 +16,15 @@ Gem::Specification.new do |s|
s.required_ruby_version = '>= 2.0.0'
s.requirements << 'libgrpc ~> 0.10.0 needs to be installed'
- s.files = `git ls-files`.split("\n")
- s.test_files = `git ls-files -- spec/*`.split("\n")
- s.executables = `git ls-files -- bin/*.rb`.split("\n").map do |f|
- File.basename(f)
+ s.files = %w( Rakefile )
+ s.files += Dir.glob('lib/**/*')
+ s.files += Dir.glob('ext/**/*')
+ s.files += Dir.glob('bin/**/*')
+ s.test_files = Dir.glob('spec/**/*')
+ %w(math noproto).each do |b|
+ s.executables += [ "#{b}_client.rb", "#{b}_server.rb" ]
end
- s.require_paths = ['lib']
+ s.require_paths = %w( bin lib )
s.platform = Gem::Platform::RUBY
s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1'
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index 7b2c04aa22..745eab437e 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -28,6 +28,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc/generic/active_call'
+require 'grpc/version'
# GRPC contains the General RPC module.
module GRPC
@@ -36,8 +37,8 @@ module GRPC
include Core::StatusCodes
include Core::TimeConsts
- # Default timeout is 5 seconds.
- DEFAULT_TIMEOUT = 5
+ # Default timeout is infinity.
+ DEFAULT_TIMEOUT = INFINITE_FUTURE
# setup_channel is used by #initialize to constuct a channel from its
# arguments.
@@ -46,6 +47,7 @@ module GRPC
fail(TypeError, '!Channel') unless alt_chan.is_a?(Core::Channel)
return alt_chan
end
+ kw['grpc.primary_user_agent'] = "grpc-ruby/#{VERSION}"
return Core::Channel.new(host, kw) if creds.nil?
fail(TypeError, '!Credentials') unless creds.is_a?(Core::Credentials)
Core::Channel.new(host, kw, creds)
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index 0e85441209..ed8032517b 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -69,6 +69,23 @@ shared_examples 'basic GRPC message delivery is OK' do
include GRPC::Core
include_context 'setup: tags'
+ context 'the test channel' do
+ it 'should have a target' do
+ expect(@ch.target).to be_a(String)
+ end
+ end
+
+ context 'a client call' do
+ it 'should have a peer' do
+ expect(new_client_call.peer).to be_a(String)
+ end
+ end
+
+ it 'calls have peer info' do
+ call = new_client_call
+ expect(call.peer).to be_a(String)
+ end
+
it 'servers receive requests from clients and can respond' do
call = new_client_call
server_call = nil
diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++
index eb14925fff..790e637b72 100644
--- a/tools/doxygen/Doxyfile.c++
+++ b/tools/doxygen/Doxyfile.c++
@@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++"
# could be handy for archiving the generated documentation or if some version
# control system is used.
-PROJECT_NUMBER = 0.10.0.0
+PROJECT_NUMBER = 0.10.1.0
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
@@ -763,7 +763,6 @@ WARN_LOGFILE =
INPUT = include/grpc++/async_generic_service.h \
include/grpc++/async_unary_call.h \
include/grpc++/auth_context.h \
-include/grpc++/auth_property_iterator.h \
include/grpc++/byte_buffer.h \
include/grpc++/channel_arguments.h \
include/grpc++/channel_interface.h \
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 3d6649bef6..cd1279e2a6 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++"
# could be handy for archiving the generated documentation or if some version
# control system is used.
-PROJECT_NUMBER = 0.10.0.0
+PROJECT_NUMBER = 0.10.1.0
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
@@ -763,7 +763,6 @@ WARN_LOGFILE =
INPUT = include/grpc++/async_generic_service.h \
include/grpc++/async_unary_call.h \
include/grpc++/auth_context.h \
-include/grpc++/auth_property_iterator.h \
include/grpc++/byte_buffer.h \
include/grpc++/channel_arguments.h \
include/grpc++/channel_interface.h \
diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core
index 2fb27efc9e..d219ca7ac0 100644
--- a/tools/doxygen/Doxyfile.core
+++ b/tools/doxygen/Doxyfile.core
@@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core"
# could be handy for archiving the generated documentation or if some version
# control system is used.
-PROJECT_NUMBER = 0.10.0.0
+PROJECT_NUMBER = 0.10.1.0
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index cbf5c50a65..1f66fe1d35 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core"
# could be handy for archiving the generated documentation or if some version
# control system is used.
-PROJECT_NUMBER = 0.10.0.0
+PROJECT_NUMBER = 0.10.1.0
# Using the PROJECT_BRIEF tag one can provide an optional one line description
# for a project that appears at the top of each page and should give viewer a
diff --git a/tools/run_tests/run_interops.py b/tools/run_tests/run_interops.py
index 1cf268526d..4e6b5ce2f6 100755
--- a/tools/run_tests/run_interops.py
+++ b/tools/run_tests/run_interops.py
@@ -4,24 +4,20 @@ import jobset
argp = argparse.ArgumentParser(description='Run interop tests.')
argp.add_argument('-l', '--language',
- choices=['build_only', 'c++'],
- nargs='+',
- default=['build_only'])
+ default='c++')
args = argp.parse_args()
# build job
-build_steps = 'tools/run_tests/run_interops_build.sh'
-build_job = jobset.JobSpec(cmdline=build_steps, shortname='build')
+build_job = jobset.JobSpec(cmdline=['tools/run_tests/run_interops_build.sh', '%s' % args.language], shortname='build')
-# test jobs
+# test jobs, each test is a separate job to run in parallel
_TESTS = ['large_unary', 'empty_unary', 'ping_pong', 'client_streaming', 'server_streaming']
jobs = []
jobNumber = 0
-for lang in args.language:
- for test in _TESTS:
- test_job = jobset.JobSpec(cmdline=['tools/run_tests/run_interops_test.sh', '%s' % lang, '%s' % test], shortname=test)
- jobs.append(test_job)
- jobNumber+=1
+for test in _TESTS:
+ test_job = jobset.JobSpec(cmdline=['tools/run_tests/run_interops_test.sh', '%s' % args.language, '%s' % test], shortname=test)
+ jobs.append(test_job)
+ jobNumber+=1
root = ET.Element('testsuites')
testsuite = ET.SubElement(root, 'testsuite', id='1', package='grpc', name='tests')
diff --git a/tools/run_tests/run_interops_build.sh b/tools/run_tests/run_interops_build.sh
index 23441a5300..ff1a26cf89 100755
--- a/tools/run_tests/run_interops_build.sh
+++ b/tools/run_tests/run_interops_build.sh
@@ -29,6 +29,8 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+language=$1
+
set -e
#clean up any old docker files and start mirroring repository if not started already
@@ -40,8 +42,34 @@ sudo docker run -d -e GCS_BUCKET=docker-interop-images -e STORAGE_PATH=/admin/d
#prepare building by pulling down base images and necessary files
sudo docker pull 0.0.0.0:5000/grpc/base
sudo docker tag -f 0.0.0.0:5000/grpc/base grpc/base
-gsutil cp -R gs://docker-interop-images/admin/service_account tools/dockerfile/grpc_cxx
-gsutil cp -R gs://docker-interop-images/admin/cacerts tools/dockerfile/grpc_cxx
-#build docker file, add more languages later
-sudo docker build --no-cache -t grpc/cxx tools/dockerfile/grpc_cxx
+if [ "$language" = "c++" ]
+then
+ gsutil cp -R gs://docker-interop-images/admin/service_account tools/dockerfile/grpc_cxx
+ gsutil cp -R gs://docker-interop-images/admin/cacerts tools/dockerfile/grpc_cxx
+ sudo docker build --no-cache -t grpc/cxx tools/dockerfile/grpc_cxx
+elif [ "$language" = "node" ]
+then
+ sudo docker pull 0.0.0.0:5000/grpc/node_base
+ sudo docker tag -f 0.0.0.0:5000/grpc/node_base grpc/node_base
+ gsutil cp -R gs://docker-interop-images/admin/service_account tools/dockerfile/grpc_node
+ gsutil cp -R gs://docker-interop-images/admin/cacerts tools/dockerfile/grpc_node
+ sudo docker build --no-cache -t grpc/node tools/dockerfile/grpc_node
+elif [ "$language" = "ruby" ]
+then
+ sudo docker pull 0.0.0.0:5000/grpc/ruby_base
+ sudo docker tag -f 0.0.0.0:5000/grpc/ruby_base grpc/ruby_base
+ gsutil cp -R gs://docker-interop-images/admin/service_account tools/dockerfile/grpc_ruby
+ gsutil cp -R gs://docker-interop-images/admin/cacerts tools/dockerfile/grpc_ruby
+ sudo docker build --no-cache -t grpc/ruby tools/dockerfile/grpc_ruby
+elif [ "$language" = "php" ]
+then
+ sudo docker pull 0.0.0.0:5000/grpc/php_base
+ sudo docker tag -f 0.0.0.0:5000/grpc/php_base grpc/php_base
+ gsutil cp -R gs://docker-interop-images/admin/service_account tools/dockerfile/grpc_php
+ gsutil cp -R gs://docker-interop-images/admin/cacerts tools/dockerfile/grpc_php
+ sudo docker build --no-cache -t grpc/php tools/dockerfile/grpc_php
+else
+ echo "interop testss not added for $language"
+ exit 1
+fi
diff --git a/tools/run_tests/run_interops_test.sh b/tools/run_tests/run_interops_test.sh
index 1d0eedad85..9be253af46 100755
--- a/tools/run_tests/run_interops_test.sh
+++ b/tools/run_tests/run_interops_test.sh
@@ -36,6 +36,17 @@ set -e
if [ "$language" = "c++" ]
then
sudo docker run grpc/cxx /var/local/git/grpc/bins/opt/interop_client --enable_ssl --use_prod_roots --server_host_override=grpc-test.sandbox.google.com --server_host=grpc-test.sandbox.google.com --server_port=443 --test_case=$test_case
+elif [ "$language" = "node" ]
+then
+ sudo docker run grpc/node /usr/bin/nodejs /var/local/git/grpc/src/node/interop/interop_client.js --use_tls=true --use_test_ca=true --server_port=443 --server_host=grpc-test.sandbox.google.com --server_host_override=grpc-test.sandbox.google.com --test_case=$test_case
+elif [ "$language" = "ruby" ]
+then
+ cmd_prefix="SSL_CERT_FILE=/cacerts/roots.pem ruby /var/local/git/grpc/src/ruby/bin/interop/interop_client.rb --use_tls --server_port=443 --server_host=grpc-test.sandbox.google.com --server_host_override=grpc-test.sandbox.google.com "
+ cmd="$cmd_prefix --test_case=$test_case"
+ sudo docker run grpc/ruby bin/bash -l -c '$cmd'
+elif [ "$language" = "php" ]
+then
+ sudo docker run -e SSL_CERT_FILE=/cacerts/roots.pem grpc/php /var/local/git/grpc/src/php/bin/interop_client.sh --server_port=443 --server_host=grpc-test.sandbox.google.com --server_host_override=grpc-test.sandbox.google.com --test_case=$test_case
else
echo "interop testss not added for $language"
exit 1
diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh
index 5ffd4460b9..6f80219b0e 100755
--- a/tools/run_tests/run_python.sh
+++ b/tools/run_tests/run_python.sh
@@ -37,5 +37,6 @@ ROOT=`pwd`
GRPCIO_TEST=$ROOT/src/python/grpcio_test
export LD_LIBRARY_PATH=$ROOT/libs/$CONFIG
export DYLD_LIBRARY_PATH=$ROOT/libs/$CONFIG
+export PATH=$ROOT/bins/$CONFIG:$ROOT/bins/$CONFIG/protobuf:$PATH
source "python"$PYVER"_virtual_environment"/bin/activate
"python"$PYVER $GRPCIO_TEST/setup.py test -a "-n8 --cov=grpc --junitxml=./report.xml"
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index 5d23bf9e88..d2c3ec3add 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -13051,7 +13051,6 @@
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
- "include/grpc++/auth_property_iterator.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
@@ -13102,7 +13101,6 @@
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
- "include/grpc++/auth_property_iterator.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
@@ -13227,7 +13225,6 @@
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
- "include/grpc++/auth_property_iterator.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
@@ -13275,7 +13272,6 @@
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/auth_context.h",
- "include/grpc++/auth_property_iterator.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
diff --git a/vsprojects/grpc++/grpc++.vcxproj b/vsprojects/grpc++/grpc++.vcxproj
index 58474511fc..929bc1500e 100644
--- a/vsprojects/grpc++/grpc++.vcxproj
+++ b/vsprojects/grpc++/grpc++.vcxproj
@@ -216,7 +216,6 @@
<ClInclude Include="..\..\include\grpc++\async_generic_service.h" />
<ClInclude Include="..\..\include\grpc++\async_unary_call.h" />
<ClInclude Include="..\..\include\grpc++\auth_context.h" />
- <ClInclude Include="..\..\include\grpc++\auth_property_iterator.h" />
<ClInclude Include="..\..\include\grpc++\byte_buffer.h" />
<ClInclude Include="..\..\include\grpc++\channel_arguments.h" />
<ClInclude Include="..\..\include\grpc++\channel_interface.h" />
diff --git a/vsprojects/grpc++/grpc++.vcxproj.filters b/vsprojects/grpc++/grpc++.vcxproj.filters
index 2a8ee08b08..0408fb46a5 100644
--- a/vsprojects/grpc++/grpc++.vcxproj.filters
+++ b/vsprojects/grpc++/grpc++.vcxproj.filters
@@ -105,9 +105,6 @@
<ClInclude Include="..\..\include\grpc++\auth_context.h">
<Filter>include\grpc++</Filter>
</ClInclude>
- <ClInclude Include="..\..\include\grpc++\auth_property_iterator.h">
- <Filter>include\grpc++</Filter>
- </ClInclude>
<ClInclude Include="..\..\include\grpc++\byte_buffer.h">
<Filter>include\grpc++</Filter>
</ClInclude>
diff --git a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
index 0d989c4a93..2ff252e04e 100644
--- a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
+++ b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj
@@ -216,7 +216,6 @@
<ClInclude Include="..\..\include\grpc++\async_generic_service.h" />
<ClInclude Include="..\..\include\grpc++\async_unary_call.h" />
<ClInclude Include="..\..\include\grpc++\auth_context.h" />
- <ClInclude Include="..\..\include\grpc++\auth_property_iterator.h" />
<ClInclude Include="..\..\include\grpc++\byte_buffer.h" />
<ClInclude Include="..\..\include\grpc++\channel_arguments.h" />
<ClInclude Include="..\..\include\grpc++\channel_interface.h" />
diff --git a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
index 71d42e5c6d..b4fae7741c 100644
--- a/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
+++ b/vsprojects/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
@@ -90,9 +90,6 @@
<ClInclude Include="..\..\include\grpc++\auth_context.h">
<Filter>include\grpc++</Filter>
</ClInclude>
- <ClInclude Include="..\..\include\grpc++\auth_property_iterator.h">
- <Filter>include\grpc++</Filter>
- </ClInclude>
<ClInclude Include="..\..\include\grpc++\byte_buffer.h">
<Filter>include\grpc++</Filter>
</ClInclude>