aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD5
-rw-r--r--Makefile4
-rw-r--r--build.yaml1
-rw-r--r--examples/python/helloworld/helloworld_pb2.py73
-rw-r--r--examples/python/route_guide/route_guide_pb2.py189
-rw-r--r--gRPC.podspec1
-rwxr-xr-xgrpc.gemspec1
-rw-r--r--include/grpc++/impl/codegen/core_codegen_interface.h32
-rw-r--r--include/grpc++/impl/codegen/proto_utils.h161
-rw-r--r--include/grpc/byte_buffer_reader.h21
-rw-r--r--include/grpc/impl/codegen/byte_buffer_reader.h57
-rw-r--r--include/grpc/impl/codegen/grpc_types.h4
-rw-r--r--package.xml1
-rw-r--r--src/compiler/python_generator.cc47
-rw-r--r--src/core/lib/channel/channel_args.h4
-rw-r--r--src/cpp/common/core_codegen.cc204
-rw-r--r--src/cpp/common/core_codegen.h26
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs63
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj2
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs191
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs441
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs183
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs61
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs48
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRequestStream.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs7
-rw-r--r--src/csharp/tests.json1
-rw-r--r--src/proto/grpc/testing/echo.proto4
-rw-r--r--src/ruby/.rubocop.yml4
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb17
-rw-r--r--test/core/end2end/fixtures/h2_census.c2
-rw-r--r--test/core/surface/public_headers_must_be_c89.c1
-rw-r--r--test/cpp/end2end/async_end2end_test.cc338
-rwxr-xr-xtools/distrib/check_include_guards.py3
-rw-r--r--tools/doxygen/Doxyfile.c++1
-rw-r--r--tools/doxygen/Doxyfile.c++.internal1
-rw-r--r--tools/doxygen/Doxyfile.core1
-rw-r--r--tools/doxygen/Doxyfile.core.internal1
-rw-r--r--tools/run_tests/sources_and_headers.json2
-rw-r--r--vsprojects/vcxproj/grpc++/grpc++.vcxproj1
-rw-r--r--vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters3
-rw-r--r--vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj1
-rw-r--r--vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters3
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj1
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj.filters3
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj1
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters3
-rw-r--r--vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj1
-rw-r--r--vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj.filters3
-rw-r--r--vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj1
-rw-r--r--vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj.filters3
54 files changed, 1513 insertions, 744 deletions
diff --git a/BUILD b/BUILD
index d9dec753b6..5fd5666a0b 100644
--- a/BUILD
+++ b/BUILD
@@ -465,6 +465,7 @@ cc_library(
"include/grpc/grpc.h",
"include/grpc/status.h",
"include/grpc/impl/codegen/byte_buffer.h",
+ "include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
@@ -778,6 +779,7 @@ cc_library(
"include/grpc/grpc.h",
"include/grpc/status.h",
"include/grpc/impl/codegen/byte_buffer.h",
+ "include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
@@ -950,6 +952,7 @@ cc_library(
"include/grpc++/impl/codegen/sync_stream.h",
"include/grpc++/impl/codegen/time.h",
"include/grpc/impl/codegen/byte_buffer.h",
+ "include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
@@ -1095,6 +1098,7 @@ cc_library(
"include/grpc++/impl/codegen/sync_stream.h",
"include/grpc++/impl/codegen/time.h",
"include/grpc/impl/codegen/byte_buffer.h",
+ "include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
@@ -1485,6 +1489,7 @@ objc_library(
"include/grpc/grpc.h",
"include/grpc/status.h",
"include/grpc/impl/codegen/byte_buffer.h",
+ "include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
diff --git a/Makefile b/Makefile
index fae2680c41..d3f0424148 100644
--- a/Makefile
+++ b/Makefile
@@ -2651,6 +2651,7 @@ PUBLIC_HEADERS_C += \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/impl/codegen/byte_buffer.h \
+ include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
@@ -2972,6 +2973,7 @@ PUBLIC_HEADERS_C += \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/impl/codegen/byte_buffer.h \
+ include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
@@ -3258,6 +3260,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/codegen/sync_stream.h \
include/grpc++/impl/codegen/time.h \
include/grpc/impl/codegen/byte_buffer.h \
+ include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
@@ -3561,6 +3564,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/codegen/sync_stream.h \
include/grpc++/impl/codegen/time.h \
include/grpc/impl/codegen/byte_buffer.h \
+ include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
diff --git a/build.yaml b/build.yaml
index 9990b337ef..003502653e 100644
--- a/build.yaml
+++ b/build.yaml
@@ -353,6 +353,7 @@ filegroups:
- name: grpc_codegen
public_headers:
- include/grpc/impl/codegen/byte_buffer.h
+ - include/grpc/impl/codegen/byte_buffer_reader.h
- include/grpc/impl/codegen/compression_types.h
- include/grpc/impl/codegen/connectivity_state.h
- include/grpc/impl/codegen/grpc_types.h
diff --git a/examples/python/helloworld/helloworld_pb2.py b/examples/python/helloworld/helloworld_pb2.py
index 1b2674e4c8..1ee80e4034 100644
--- a/examples/python/helloworld/helloworld_pb2.py
+++ b/examples/python/helloworld/helloworld_pb2.py
@@ -1,6 +1,8 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: helloworld.proto
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
@@ -17,7 +19,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='helloworld.proto',
package='helloworld',
syntax='proto3',
- serialized_pb=b'\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x18\n\x10io.grpc.examples\xa2\x02\x03HLWb\x06proto3'
+ serialized_pb=_b('\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@@ -34,7 +36,7 @@ _HELLOREQUEST = _descriptor.Descriptor(
_descriptor.FieldDescriptor(
name='name', full_name='helloworld.HelloRequest.name', index=0,
number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=b"".decode('utf-8'),
+ has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
@@ -65,7 +67,7 @@ _HELLOREPLY = _descriptor.Descriptor(
_descriptor.FieldDescriptor(
name='message', full_name='helloworld.HelloReply.message', index=0,
number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=b"".decode('utf-8'),
+ has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
@@ -104,69 +106,28 @@ _sym_db.RegisterMessage(HelloReply)
DESCRIPTOR.has_options = True
-DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), b'\n\020io.grpc.examples\242\002\003HLW')
+DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW'))
import abc
+import six
from grpc.beta import implementations as beta_implementations
-from grpc.early_adopter import implementations as early_adopter_implementations
-from grpc.framework.alpha import utilities as alpha_utilities
+from grpc.beta import interfaces as beta_interfaces
from grpc.framework.common import cardinality
from grpc.framework.interfaces.face import utilities as face_utilities
-class EarlyAdopterGreeterServicer(object):
- """<fill me in later!>"""
- __metaclass__ = abc.ABCMeta
- @abc.abstractmethod
- def SayHello(self, request, context):
- raise NotImplementedError()
-class EarlyAdopterGreeterServer(object):
- """<fill me in later!>"""
- __metaclass__ = abc.ABCMeta
- @abc.abstractmethod
- def start(self):
- raise NotImplementedError()
- @abc.abstractmethod
- def stop(self):
- raise NotImplementedError()
-class EarlyAdopterGreeterStub(object):
- """<fill me in later!>"""
- __metaclass__ = abc.ABCMeta
- @abc.abstractmethod
- def SayHello(self, request):
- raise NotImplementedError()
- SayHello.async = None
-def early_adopter_create_Greeter_server(servicer, port, private_key=None, certificate_chain=None):
- import helloworld_pb2
- import helloworld_pb2
- method_service_descriptions = {
- "SayHello": alpha_utilities.unary_unary_service_description(
- servicer.SayHello,
- helloworld_pb2.HelloRequest.FromString,
- helloworld_pb2.HelloReply.SerializeToString,
- ),
- }
- return early_adopter_implementations.server("helloworld.Greeter", method_service_descriptions, port, private_key=private_key, certificate_chain=certificate_chain)
-def early_adopter_create_Greeter_stub(host, port, metadata_transformer=None, secure=False, root_certificates=None, private_key=None, certificate_chain=None, server_host_override=None):
- import helloworld_pb2
- import helloworld_pb2
- method_invocation_descriptions = {
- "SayHello": alpha_utilities.unary_unary_invocation_description(
- helloworld_pb2.HelloRequest.SerializeToString,
- helloworld_pb2.HelloReply.FromString,
- ),
- }
- return early_adopter_implementations.stub("helloworld.Greeter", method_invocation_descriptions, host, port, metadata_transformer=metadata_transformer, secure=secure, root_certificates=root_certificates, private_key=private_key, certificate_chain=certificate_chain, server_host_override=server_host_override)
class BetaGreeterServicer(object):
- """<fill me in later!>"""
- __metaclass__ = abc.ABCMeta
- @abc.abstractmethod
+ """The greeting service definition.
+ """
def SayHello(self, request, context):
- raise NotImplementedError()
+ """Sends a greeting
+ """
+ context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
class BetaGreeterStub(object):
- """The interface to which stubs will conform."""
- __metaclass__ = abc.ABCMeta
- @abc.abstractmethod
+ """The greeting service definition.
+ """
def SayHello(self, request, timeout):
+ """Sends a greeting
+ """
raise NotImplementedError()
SayHello.future = None
diff --git a/examples/python/route_guide/route_guide_pb2.py b/examples/python/route_guide/route_guide_pb2.py
index d4d9f8dcd5..81d5d07527 100644
--- a/examples/python/route_guide/route_guide_pb2.py
+++ b/examples/python/route_guide/route_guide_pb2.py
@@ -1,6 +1,8 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: route_guide.proto
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
@@ -17,7 +19,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='route_guide.proto',
package='routeguide',
syntax='proto3',
- serialized_pb=b'\n\x11route_guide.proto\x12\nrouteguide\",\n\x05Point\x12\x10\n\x08latitude\x18\x01 \x01(\x05\x12\x11\n\tlongitude\x18\x02 \x01(\x05\"I\n\tRectangle\x12\x1d\n\x02lo\x18\x01 \x01(\x0b\x32\x11.routeguide.Point\x12\x1d\n\x02hi\x18\x02 \x01(\x0b\x32\x11.routeguide.Point\"<\n\x07\x46\x65\x61ture\x12\x0c\n\x04name\x18\x01 \x01(\t\x12#\n\x08location\x18\x02 \x01(\x0b\x32\x11.routeguide.Point\"A\n\tRouteNote\x12#\n\x08location\x18\x01 \x01(\x0b\x32\x11.routeguide.Point\x12\x0f\n\x07message\x18\x02 \x01(\t\"b\n\x0cRouteSummary\x12\x13\n\x0bpoint_count\x18\x01 \x01(\x05\x12\x15\n\rfeature_count\x18\x02 \x01(\x05\x12\x10\n\x08\x64istance\x18\x03 \x01(\x05\x12\x14\n\x0c\x65lapsed_time\x18\x04 \x01(\x05\x32\x85\x02\n\nRouteGuide\x12\x36\n\nGetFeature\x12\x11.routeguide.Point\x1a\x13.routeguide.Feature\"\x00\x12>\n\x0cListFeatures\x12\x15.routeguide.Rectangle\x1a\x13.routeguide.Feature\"\x00\x30\x01\x12>\n\x0bRecordRoute\x12\x11.routeguide.Point\x1a\x18.routeguide.RouteSummary\"\x00(\x01\x12?\n\tRouteChat\x12\x15.routeguide.RouteNote\x1a\x15.routeguide.RouteNote\"\x00(\x01\x30\x01\x42\x0f\n\x07\x65x.grpc\xa2\x02\x03RTGb\x06proto3'
+ serialized_pb=_b('\n\x11route_guide.proto\x12\nrouteguide\",\n\x05Point\x12\x10\n\x08latitude\x18\x01 \x01(\x05\x12\x11\n\tlongitude\x18\x02 \x01(\x05\"I\n\tRectangle\x12\x1d\n\x02lo\x18\x01 \x01(\x0b\x32\x11.routeguide.Point\x12\x1d\n\x02hi\x18\x02 \x01(\x0b\x32\x11.routeguide.Point\"<\n\x07\x46\x65\x61ture\x12\x0c\n\x04name\x18\x01 \x01(\t\x12#\n\x08location\x18\x02 \x01(\x0b\x32\x11.routeguide.Point\"A\n\tRouteNote\x12#\n\x08location\x18\x01 \x01(\x0b\x32\x11.routeguide.Point\x12\x0f\n\x07message\x18\x02 \x01(\t\"b\n\x0cRouteSummary\x12\x13\n\x0bpoint_count\x18\x01 \x01(\x05\x12\x15\n\rfeature_count\x18\x02 \x01(\x05\x12\x10\n\x08\x64istance\x18\x03 \x01(\x05\x12\x14\n\x0c\x65lapsed_time\x18\x04 \x01(\x05\x32\x85\x02\n\nRouteGuide\x12\x36\n\nGetFeature\x12\x11.routeguide.Point\x1a\x13.routeguide.Feature\"\x00\x12>\n\x0cListFeatures\x12\x15.routeguide.Rectangle\x1a\x13.routeguide.Feature\"\x00\x30\x01\x12>\n\x0bRecordRoute\x12\x11.routeguide.Point\x1a\x18.routeguide.RouteSummary\"\x00(\x01\x12?\n\tRouteChat\x12\x15.routeguide.RouteNote\x1a\x15.routeguide.RouteNote\"\x00(\x01\x30\x01\x42\x36\n\x1bio.grpc.examples.routeguideB\x0fRouteGuideProtoP\x01\xa2\x02\x03RTGb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@@ -110,7 +112,7 @@ _FEATURE = _descriptor.Descriptor(
_descriptor.FieldDescriptor(
name='name', full_name='routeguide.Feature.name', index=0,
number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=b"".decode('utf-8'),
+ has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
@@ -155,7 +157,7 @@ _ROUTENOTE = _descriptor.Descriptor(
_descriptor.FieldDescriptor(
name='message', full_name='routeguide.RouteNote.message', index=1,
number=2, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=b"".decode('utf-8'),
+ has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
@@ -274,149 +276,86 @@ _sym_db.RegisterMessage(RouteSummary)
DESCRIPTOR.has_options = True
-DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), b'\n\007ex.grpc\242\002\003RTG')
+DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\033io.grpc.examples.routeguideB\017RouteGuideProtoP\001\242\002\003RTG'))
import abc
+import six
from grpc.beta import implementations as beta_implementations
-from grpc.early_adopter import implementations as early_adopter_implementations
-from grpc.framework.alpha import utilities as alpha_utilities
+from grpc.beta import interfaces as beta_interfaces
from grpc.framework.common import cardinality
from grpc.framework.interfaces.face import utilities as face_utilities
-class EarlyAdopterRouteGuideServicer(object):
- """<fill me in later!>"""
- __metaclass__ = abc.ABCMeta
- @abc.abstractmethod
- def GetFeature(self, request, context):
- raise NotImplementedError()
- @abc.abstractmethod
- def ListFeatures(self, request, context):
- raise NotImplementedError()
- @abc.abstractmethod
- def RecordRoute(self, request_iterator, context):
- raise NotImplementedError()
- @abc.abstractmethod
- def RouteChat(self, request_iterator, context):
- raise NotImplementedError()
-class EarlyAdopterRouteGuideServer(object):
- """<fill me in later!>"""
- __metaclass__ = abc.ABCMeta
- @abc.abstractmethod
- def start(self):
- raise NotImplementedError()
- @abc.abstractmethod
- def stop(self):
- raise NotImplementedError()
-class EarlyAdopterRouteGuideStub(object):
- """<fill me in later!>"""
- __metaclass__ = abc.ABCMeta
- @abc.abstractmethod
- def GetFeature(self, request):
- raise NotImplementedError()
- GetFeature.async = None
- @abc.abstractmethod
- def ListFeatures(self, request):
- raise NotImplementedError()
- ListFeatures.async = None
- @abc.abstractmethod
- def RecordRoute(self, request_iterator):
- raise NotImplementedError()
- RecordRoute.async = None
- @abc.abstractmethod
- def RouteChat(self, request_iterator):
- raise NotImplementedError()
- RouteChat.async = None
-def early_adopter_create_RouteGuide_server(servicer, port, private_key=None, certificate_chain=None):
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- method_service_descriptions = {
- "GetFeature": alpha_utilities.unary_unary_service_description(
- servicer.GetFeature,
- route_guide_pb2.Point.FromString,
- route_guide_pb2.Feature.SerializeToString,
- ),
- "ListFeatures": alpha_utilities.unary_stream_service_description(
- servicer.ListFeatures,
- route_guide_pb2.Rectangle.FromString,
- route_guide_pb2.Feature.SerializeToString,
- ),
- "RecordRoute": alpha_utilities.stream_unary_service_description(
- servicer.RecordRoute,
- route_guide_pb2.Point.FromString,
- route_guide_pb2.RouteSummary.SerializeToString,
- ),
- "RouteChat": alpha_utilities.stream_stream_service_description(
- servicer.RouteChat,
- route_guide_pb2.RouteNote.FromString,
- route_guide_pb2.RouteNote.SerializeToString,
- ),
- }
- return early_adopter_implementations.server("routeguide.RouteGuide", method_service_descriptions, port, private_key=private_key, certificate_chain=certificate_chain)
-def early_adopter_create_RouteGuide_stub(host, port, metadata_transformer=None, secure=False, root_certificates=None, private_key=None, certificate_chain=None, server_host_override=None):
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- import route_guide_pb2
- method_invocation_descriptions = {
- "GetFeature": alpha_utilities.unary_unary_invocation_description(
- route_guide_pb2.Point.SerializeToString,
- route_guide_pb2.Feature.FromString,
- ),
- "ListFeatures": alpha_utilities.unary_stream_invocation_description(
- route_guide_pb2.Rectangle.SerializeToString,
- route_guide_pb2.Feature.FromString,
- ),
- "RecordRoute": alpha_utilities.stream_unary_invocation_description(
- route_guide_pb2.Point.SerializeToString,
- route_guide_pb2.RouteSummary.FromString,
- ),
- "RouteChat": alpha_utilities.stream_stream_invocation_description(
- route_guide_pb2.RouteNote.SerializeToString,
- route_guide_pb2.RouteNote.FromString,
- ),
- }
- return early_adopter_implementations.stub("routeguide.RouteGuide", method_invocation_descriptions, host, port, metadata_transformer=metadata_transformer, secure=secure, root_certificates=root_certificates, private_key=private_key, certificate_chain=certificate_chain, server_host_override=server_host_override)
class BetaRouteGuideServicer(object):
- """<fill me in later!>"""
- __metaclass__ = abc.ABCMeta
- @abc.abstractmethod
+ """Interface exported by the server.
+ """
def GetFeature(self, request, context):
- raise NotImplementedError()
- @abc.abstractmethod
+ """A simple RPC.
+
+ Obtains the feature at a given position.
+
+ A feature with an empty name is returned if there's no feature at the given
+ position.
+ """
+ context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def ListFeatures(self, request, context):
- raise NotImplementedError()
- @abc.abstractmethod
+ """A server-to-client streaming RPC.
+
+ Obtains the Features available within the given Rectangle. Results are
+ streamed rather than returned at once (e.g. in a response message with a
+ repeated field), as the rectangle may cover a large area and contain a
+ huge number of features.
+ """
+ context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def RecordRoute(self, request_iterator, context):
- raise NotImplementedError()
- @abc.abstractmethod
+ """A client-to-server streaming RPC.
+
+ Accepts a stream of Points on a route being traversed, returning a
+ RouteSummary when traversal is completed.
+ """
+ context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def RouteChat(self, request_iterator, context):
- raise NotImplementedError()
+ """A Bidirectional streaming RPC.
+
+ Accepts a stream of RouteNotes sent while a route is being traversed,
+ while receiving other RouteNotes (e.g. from other users).
+ """
+ context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
class BetaRouteGuideStub(object):
- """The interface to which stubs will conform."""
- __metaclass__ = abc.ABCMeta
- @abc.abstractmethod
+ """Interface exported by the server.
+ """
def GetFeature(self, request, timeout):
+ """A simple RPC.
+
+ Obtains the feature at a given position.
+
+ A feature with an empty name is returned if there's no feature at the given
+ position.
+ """
raise NotImplementedError()
GetFeature.future = None
- @abc.abstractmethod
def ListFeatures(self, request, timeout):
+ """A server-to-client streaming RPC.
+
+ Obtains the Features available within the given Rectangle. Results are
+ streamed rather than returned at once (e.g. in a response message with a
+ repeated field), as the rectangle may cover a large area and contain a
+ huge number of features.
+ """
raise NotImplementedError()
- @abc.abstractmethod
def RecordRoute(self, request_iterator, timeout):
+ """A client-to-server streaming RPC.
+
+ Accepts a stream of Points on a route being traversed, returning a
+ RouteSummary when traversal is completed.
+ """
raise NotImplementedError()
RecordRoute.future = None
- @abc.abstractmethod
def RouteChat(self, request_iterator, timeout):
+ """A Bidirectional streaming RPC.
+
+ Accepts a stream of RouteNotes sent while a route is being traversed,
+ while receiving other RouteNotes (e.g. from other users).
+ """
raise NotImplementedError()
def beta_create_RouteGuide_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None):
diff --git a/gRPC.podspec b/gRPC.podspec
index b5e8eb5b13..c0bbcfe8da 100644
--- a/gRPC.podspec
+++ b/gRPC.podspec
@@ -306,6 +306,7 @@ Pod::Spec.new do |s|
'include/grpc/grpc.h',
'include/grpc/status.h',
'include/grpc/impl/codegen/byte_buffer.h',
+ 'include/grpc/impl/codegen/byte_buffer_reader.h',
'include/grpc/impl/codegen/compression_types.h',
'include/grpc/impl/codegen/connectivity_state.h',
'include/grpc/impl/codegen/grpc_types.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 368e01ac63..b106d96f21 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -149,6 +149,7 @@ Gem::Specification.new do |s|
s.files += %w( include/grpc/grpc.h )
s.files += %w( include/grpc/status.h )
s.files += %w( include/grpc/impl/codegen/byte_buffer.h )
+ s.files += %w( include/grpc/impl/codegen/byte_buffer_reader.h )
s.files += %w( include/grpc/impl/codegen/compression_types.h )
s.files += %w( include/grpc/impl/codegen/connectivity_state.h )
s.files += %w( include/grpc/impl/codegen/grpc_types.h )
diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h
index 16424bab35..aa9013c4ce 100644
--- a/include/grpc++/impl/codegen/core_codegen_interface.h
+++ b/include/grpc++/impl/codegen/core_codegen_interface.h
@@ -49,18 +49,6 @@ namespace grpc {
/// \warning This interface should be considered internal and private.
class CoreCodegenInterface {
public:
- // Serialize the msg into a buffer created inside the function. The caller
- // should destroy the returned buffer when done with it. If serialization
- // fails,
- // false is returned and buffer is left unchanged.
- virtual Status SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** buffer) = 0;
-
- // The caller keeps ownership of buffer and msg.
- virtual Status DeserializeProto(grpc_byte_buffer* buffer,
- grpc::protobuf::Message* msg,
- int max_message_size) = 0;
-
/// Upon a failed assertion, log the error.
virtual void assert_fail(const char* failed_assertion) = 0;
@@ -76,9 +64,29 @@ class CoreCodegenInterface {
virtual void gpr_free(void* p) = 0;
virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0;
+
+ virtual void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
+ grpc_byte_buffer* buffer) = 0;
+ virtual void grpc_byte_buffer_reader_destroy(
+ grpc_byte_buffer_reader* reader) = 0;
+ virtual int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
+ gpr_slice* slice) = 0;
+
+ virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(gpr_slice* slice,
+ size_t nslices) = 0;
+
+ virtual gpr_slice gpr_slice_malloc(size_t length) = 0;
+ virtual void gpr_slice_unref(gpr_slice slice) = 0;
+ virtual gpr_slice gpr_slice_split_tail(gpr_slice* s, size_t split) = 0;
+ virtual void gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) = 0;
+ virtual void gpr_slice_buffer_pop(gpr_slice_buffer* sb) = 0;
+
virtual void grpc_metadata_array_init(grpc_metadata_array* array) = 0;
virtual void grpc_metadata_array_destroy(grpc_metadata_array* array) = 0;
+ virtual const Status& ok() = 0;
+ virtual const Status& cancelled() = 0;
+
virtual gpr_timespec gpr_inf_future(gpr_clock_type type) = 0;
};
diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h
index 2aaa3c3b30..d044ddc642 100644
--- a/include/grpc++/impl/codegen/proto_utils.h
+++ b/include/grpc++/impl/codegen/proto_utils.h
@@ -41,26 +41,179 @@
#include <grpc++/impl/codegen/serialization_traits.h>
#include <grpc++/impl/codegen/status.h>
#include <grpc/impl/codegen/byte_buffer.h>
+#include <grpc/impl/codegen/byte_buffer_reader.h>
#include <grpc/impl/codegen/log.h>
+#include <grpc/impl/codegen/slice.h>
namespace grpc {
extern CoreCodegenInterface* g_core_codegen_interface;
+namespace {
+
+const int kGrpcBufferWriterMaxBufferLength = 8192;
+
+class GrpcBufferWriter GRPC_FINAL
+ : public ::grpc::protobuf::io::ZeroCopyOutputStream {
+ public:
+ explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
+ : block_size_(block_size), byte_count_(0), have_backup_(false) {
+ *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0);
+ slice_buffer_ = &(*bp)->data.raw.slice_buffer;
+ }
+
+ ~GrpcBufferWriter() GRPC_OVERRIDE {
+ if (have_backup_) {
+ g_core_codegen_interface->gpr_slice_unref(backup_slice_);
+ }
+ }
+
+ bool Next(void** data, int* size) GRPC_OVERRIDE {
+ if (have_backup_) {
+ slice_ = backup_slice_;
+ have_backup_ = false;
+ } else {
+ slice_ = g_core_codegen_interface->gpr_slice_malloc(block_size_);
+ }
+ *data = GPR_SLICE_START_PTR(slice_);
+ // On win x64, int is only 32bit
+ GPR_CODEGEN_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
+ byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
+ g_core_codegen_interface->gpr_slice_buffer_add(slice_buffer_, slice_);
+ return true;
+ }
+
+ void BackUp(int count) GRPC_OVERRIDE {
+ g_core_codegen_interface->gpr_slice_buffer_pop(slice_buffer_);
+ if (count == block_size_) {
+ backup_slice_ = slice_;
+ } else {
+ backup_slice_ = g_core_codegen_interface->gpr_slice_split_tail(
+ &slice_, GPR_SLICE_LENGTH(slice_) - count);
+ g_core_codegen_interface->gpr_slice_buffer_add(slice_buffer_, slice_);
+ }
+ have_backup_ = true;
+ byte_count_ -= count;
+ }
+
+ grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; }
+
+ private:
+ const int block_size_;
+ int64_t byte_count_;
+ gpr_slice_buffer* slice_buffer_;
+ bool have_backup_;
+ gpr_slice backup_slice_;
+ gpr_slice slice_;
+};
+
+class GrpcBufferReader GRPC_FINAL
+ : public ::grpc::protobuf::io::ZeroCopyInputStream {
+ public:
+ explicit GrpcBufferReader(grpc_byte_buffer* buffer)
+ : byte_count_(0), backup_count_(0) {
+ g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader_, buffer);
+ }
+ ~GrpcBufferReader() GRPC_OVERRIDE {
+ g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_);
+ }
+
+ bool Next(const void** data, int* size) GRPC_OVERRIDE {
+ if (backup_count_ > 0) {
+ *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
+ backup_count_;
+ GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX);
+ *size = (int)backup_count_;
+ backup_count_ = 0;
+ return true;
+ }
+ if (!g_core_codegen_interface->grpc_byte_buffer_reader_next(&reader_,
+ &slice_)) {
+ return false;
+ }
+ g_core_codegen_interface->gpr_slice_unref(slice_);
+ *data = GPR_SLICE_START_PTR(slice_);
+ // On win x64, int is only 32bit
+ GPR_CODEGEN_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
+ byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
+ return true;
+ }
+
+ void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }
+
+ bool Skip(int count) GRPC_OVERRIDE {
+ const void* data;
+ int size;
+ while (Next(&data, &size)) {
+ if (size >= count) {
+ BackUp(size - count);
+ return true;
+ }
+ // size < count;
+ count -= size;
+ }
+ // error or we have too large count;
+ return false;
+ }
+
+ grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE {
+ return byte_count_ - backup_count_;
+ }
+
+ private:
+ int64_t byte_count_;
+ int64_t backup_count_;
+ grpc_byte_buffer_reader reader_;
+ gpr_slice slice_;
+};
+} // namespace
+
template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of<
grpc::protobuf::Message, T>::value>::type> {
public:
static Status Serialize(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** buffer, bool* own_buffer) {
+ grpc_byte_buffer** bp, bool* own_buffer) {
*own_buffer = true;
- return g_core_codegen_interface->SerializeProto(msg, buffer);
+ int byte_size = msg.ByteSize();
+ if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
+ gpr_slice slice = g_core_codegen_interface->gpr_slice_malloc(byte_size);
+ GPR_CODEGEN_ASSERT(
+ GPR_SLICE_END_PTR(slice) ==
+ msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
+ *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
+ g_core_codegen_interface->gpr_slice_unref(slice);
+ return g_core_codegen_interface->ok();
+ } else {
+ GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
+ return msg.SerializeToZeroCopyStream(&writer)
+ ? g_core_codegen_interface->ok()
+ : Status(StatusCode::INTERNAL, "Failed to serialize message");
+ }
}
+
static Status Deserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg,
int max_message_size) {
- return g_core_codegen_interface->DeserializeProto(buffer, msg,
- max_message_size);
+ if (buffer == nullptr) {
+ return Status(StatusCode::INTERNAL, "No payload");
+ }
+ Status result = g_core_codegen_interface->ok();
+ {
+ GrpcBufferReader reader(buffer);
+ ::grpc::protobuf::io::CodedInputStream decoder(&reader);
+ if (max_message_size > 0) {
+ decoder.SetTotalBytesLimit(max_message_size, max_message_size);
+ }
+ if (!msg->ParseFromCodedStream(&decoder)) {
+ result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
+ }
+ if (!decoder.ConsumedEntireMessage()) {
+ result = Status(StatusCode::INTERNAL, "Did not read entire message");
+ }
+ }
+ g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
+ return result;
}
};
diff --git a/include/grpc/byte_buffer_reader.h b/include/grpc/byte_buffer_reader.h
index 9a1c6178ab..e95bf2f80d 100644
--- a/include/grpc/byte_buffer_reader.h
+++ b/include/grpc/byte_buffer_reader.h
@@ -34,25 +34,6 @@
#ifndef GRPC_BYTE_BUFFER_READER_H
#define GRPC_BYTE_BUFFER_READER_H
-#include <grpc/byte_buffer.h>
-#include <grpc/grpc.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-struct grpc_byte_buffer_reader {
- grpc_byte_buffer *buffer_in;
- grpc_byte_buffer *buffer_out;
- /* Different current objects correspond to different types of byte buffers */
- union {
- /* Index into a slice buffer's array of slices */
- unsigned index;
- } current;
-};
-
-#ifdef __cplusplus
-}
-#endif
+#include <grpc/impl/codegen/byte_buffer_reader.h>
#endif /* GRPC_BYTE_BUFFER_READER_H */
diff --git a/include/grpc/impl/codegen/byte_buffer_reader.h b/include/grpc/impl/codegen/byte_buffer_reader.h
new file mode 100644
index 0000000000..10c382924e
--- /dev/null
+++ b/include/grpc/impl/codegen/byte_buffer_reader.h
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_IMPL_CODEGEN_BYTE_BUFFER_READER_H
+#define GRPC_IMPL_CODEGEN_BYTE_BUFFER_READER_H
+
+#include <grpc/impl/codegen/byte_buffer.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct grpc_byte_buffer_reader {
+ grpc_byte_buffer *buffer_in;
+ grpc_byte_buffer *buffer_out;
+ /* Different current objects correspond to different types of byte buffers */
+ union {
+ /* Index into a slice buffer's array of slices */
+ unsigned index;
+ } current;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* GRPC_IMPL_CODEGEN_BYTE_BUFFER_READER_H */
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 4c7373006b..7b20cc14d4 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -307,7 +307,9 @@ typedef enum {
GRPC_OP_RECV_STATUS_ON_CLIENT,
/** Receive close on the server: one and only one must be made on the
server.
- This op completes after the close has been received by the server. */
+ This op completes after the close has been received by the server.
+ This operation always succeeds, meaning ops paired with this operation
+ will also appear to succeed, even though they may not have. */
GRPC_OP_RECV_CLOSE_ON_SERVER
} grpc_op_type;
diff --git a/package.xml b/package.xml
index c4e09da3f1..fe17b729d0 100644
--- a/package.xml
+++ b/package.xml
@@ -156,6 +156,7 @@
<file baseinstalldir="/" name="include/grpc/grpc.h" role="src" />
<file baseinstalldir="/" name="include/grpc/status.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/byte_buffer.h" role="src" />
+ <file baseinstalldir="/" name="include/grpc/impl/codegen/byte_buffer_reader.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/compression_types.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/connectivity_state.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/grpc_types.h" role="src" />
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc
index 59137e1c92..8e76e6dce6 100644
--- a/src/compiler/python_generator.cc
+++ b/src/compiler/python_generator.cc
@@ -182,18 +182,40 @@ bool GetModuleAndMessagePath(const Descriptor* type,
return true;
}
+// Get all comments (leading, leading_detached, trailing) and print them as a
+// docstring. Any leading space of a line will be removed, but the line wrapping
+// will not be changed.
+template <typename DescriptorType>
+static void PrintAllComments(const DescriptorType* desc, Printer* printer) {
+ std::vector<grpc::string> comments;
+ grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING_DETACHED,
+ &comments);
+ grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING,
+ &comments);
+ grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_TRAILING,
+ &comments);
+ if (comments.empty()) {
+ return;
+ }
+ printer->Print("\"\"\"");
+ for (auto it = comments.begin(); it != comments.end(); ++it) {
+ size_t start_pos = it->find_first_not_of(' ');
+ if (start_pos != grpc::string::npos) {
+ printer->Print(it->c_str() + start_pos);
+ }
+ printer->Print("\n");
+ }
+ printer->Print("\"\"\"\n");
+}
+
bool PrintBetaServicer(const ServiceDescriptor* service,
Printer* out) {
- grpc::string doc = "<fill me in later!>";
- map<grpc::string, grpc::string> dict = ListToDict({
- "Service", service->name(),
- "Documentation", doc,
- });
out->Print("\n");
- out->Print(dict, "class Beta$Service$Servicer(object):\n");
+ out->Print("class Beta$Service$Servicer(object):\n", "Service",
+ service->name());
{
IndentScope raii_class_indent(out);
- out->Print(dict, "\"\"\"$Documentation$\"\"\"\n");
+ PrintAllComments(service, out);
for (int i = 0; i < service->method_count(); ++i) {
auto meth = service->method(i);
grpc::string arg_name = meth->client_streaming() ?
@@ -202,6 +224,7 @@ bool PrintBetaServicer(const ServiceDescriptor* service,
"Method", meth->name(), "ArgName", arg_name);
{
IndentScope raii_method_indent(out);
+ PrintAllComments(meth, out);
out->Print("context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)\n");
}
}
@@ -211,16 +234,11 @@ bool PrintBetaServicer(const ServiceDescriptor* service,
bool PrintBetaStub(const ServiceDescriptor* service,
Printer* out) {
- grpc::string doc = "The interface to which stubs will conform.";
- map<grpc::string, grpc::string> dict = ListToDict({
- "Service", service->name(),
- "Documentation", doc,
- });
out->Print("\n");
- out->Print(dict, "class Beta$Service$Stub(object):\n");
+ out->Print("class Beta$Service$Stub(object):\n", "Service", service->name());
{
IndentScope raii_class_indent(out);
- out->Print(dict, "\"\"\"$Documentation$\"\"\"\n");
+ PrintAllComments(service, out);
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* meth = service->method(i);
grpc::string arg_name = meth->client_streaming() ?
@@ -229,6 +247,7 @@ bool PrintBetaStub(const ServiceDescriptor* service,
out->Print(methdict, "def $Method$(self, $ArgName$, timeout):\n");
{
IndentScope raii_method_indent(out);
+ PrintAllComments(meth, out);
out->Print("raise NotImplementedError()\n");
}
if (!meth->server_streaming()) {
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 0a51780a14..23c7b7b897 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -56,10 +56,6 @@ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
/** Destroy arguments created by \a grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args *a);
-/** Reads census_enabled settings from channel args. Returns 1 if census_enabled
- * is specified in channel args, otherwise returns 0. */
-int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
-
/** Returns the compression algorithm set in \a a. */
grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
const grpc_channel_args *a);
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index 33a8f755e6..8e8d42eb29 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -48,124 +48,6 @@
#include "src/core/lib/profiling/timers.h"
-namespace {
-
-const int kGrpcBufferWriterMaxBufferLength = 8192;
-
-class GrpcBufferWriter GRPC_FINAL
- : public ::grpc::protobuf::io::ZeroCopyOutputStream {
- public:
- explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
- : block_size_(block_size), byte_count_(0), have_backup_(false) {
- *bp = grpc_raw_byte_buffer_create(NULL, 0);
- slice_buffer_ = &(*bp)->data.raw.slice_buffer;
- }
-
- ~GrpcBufferWriter() GRPC_OVERRIDE {
- if (have_backup_) {
- gpr_slice_unref(backup_slice_);
- }
- }
-
- bool Next(void** data, int* size) GRPC_OVERRIDE {
- if (have_backup_) {
- slice_ = backup_slice_;
- have_backup_ = false;
- } else {
- slice_ = gpr_slice_malloc(block_size_);
- }
- *data = GPR_SLICE_START_PTR(slice_);
- // On win x64, int is only 32bit
- GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
- byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
- gpr_slice_buffer_add(slice_buffer_, slice_);
- return true;
- }
-
- void BackUp(int count) GRPC_OVERRIDE {
- gpr_slice_buffer_pop(slice_buffer_);
- if (count == block_size_) {
- backup_slice_ = slice_;
- } else {
- backup_slice_ =
- gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count);
- gpr_slice_buffer_add(slice_buffer_, slice_);
- }
- have_backup_ = true;
- byte_count_ -= count;
- }
-
- grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; }
-
- private:
- const int block_size_;
- int64_t byte_count_;
- gpr_slice_buffer* slice_buffer_;
- bool have_backup_;
- gpr_slice backup_slice_;
- gpr_slice slice_;
-};
-
-class GrpcBufferReader GRPC_FINAL
- : public ::grpc::protobuf::io::ZeroCopyInputStream {
- public:
- explicit GrpcBufferReader(grpc_byte_buffer* buffer)
- : byte_count_(0), backup_count_(0) {
- grpc_byte_buffer_reader_init(&reader_, buffer);
- }
- ~GrpcBufferReader() GRPC_OVERRIDE {
- grpc_byte_buffer_reader_destroy(&reader_);
- }
-
- bool Next(const void** data, int* size) GRPC_OVERRIDE {
- if (backup_count_ > 0) {
- *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
- backup_count_;
- GPR_ASSERT(backup_count_ <= INT_MAX);
- *size = (int)backup_count_;
- backup_count_ = 0;
- return true;
- }
- if (!grpc_byte_buffer_reader_next(&reader_, &slice_)) {
- return false;
- }
- gpr_slice_unref(slice_);
- *data = GPR_SLICE_START_PTR(slice_);
- // On win x64, int is only 32bit
- GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
- byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
- return true;
- }
-
- void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }
-
- bool Skip(int count) GRPC_OVERRIDE {
- const void* data;
- int size;
- while (Next(&data, &size)) {
- if (size >= count) {
- BackUp(size - count);
- return true;
- }
- // size < count;
- count -= size;
- }
- // error or we have too large count;
- return false;
- }
-
- grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE {
- return byte_count_ - backup_count_;
- }
-
- private:
- int64_t byte_count_;
- int64_t backup_count_;
- grpc_byte_buffer_reader reader_;
- gpr_slice slice_;
-};
-} // namespace
-
namespace grpc {
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
@@ -192,6 +74,44 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
::grpc_byte_buffer_destroy(bb);
}
+void CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
+ grpc_byte_buffer* buffer) {
+ ::grpc_byte_buffer_reader_init(reader, buffer);
+}
+
+void CoreCodegen::grpc_byte_buffer_reader_destroy(
+ grpc_byte_buffer_reader* reader) {
+ ::grpc_byte_buffer_reader_destroy(reader);
+}
+
+int CoreCodegen::grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
+ gpr_slice* slice) {
+ return ::grpc_byte_buffer_reader_next(reader, slice);
+}
+
+grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(gpr_slice* slice,
+ size_t nslices) {
+ return ::grpc_raw_byte_buffer_create(slice, nslices);
+}
+
+gpr_slice CoreCodegen::gpr_slice_malloc(size_t length) {
+ return ::gpr_slice_malloc(length);
+}
+
+void CoreCodegen::gpr_slice_unref(gpr_slice slice) { ::gpr_slice_unref(slice); }
+
+gpr_slice CoreCodegen::gpr_slice_split_tail(gpr_slice* s, size_t split) {
+ return ::gpr_slice_split_tail(s, split);
+}
+
+void CoreCodegen::gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) {
+ ::gpr_slice_buffer_add(sb, slice);
+}
+
+void CoreCodegen::gpr_slice_buffer_pop(gpr_slice_buffer* sb) {
+ ::gpr_slice_buffer_pop(sb);
+}
+
void CoreCodegen::grpc_metadata_array_init(grpc_metadata_array* array) {
::grpc_metadata_array_init(array);
}
@@ -200,6 +120,10 @@ void CoreCodegen::grpc_metadata_array_destroy(grpc_metadata_array* array) {
::grpc_metadata_array_destroy(array);
}
+const Status& CoreCodegen::ok() { return grpc::Status::OK; }
+
+const Status& CoreCodegen::cancelled() { return grpc::Status::CANCELLED; }
+
gpr_timespec CoreCodegen::gpr_inf_future(gpr_clock_type type) {
return ::gpr_inf_future(type);
}
@@ -209,48 +133,4 @@ void CoreCodegen::assert_fail(const char* failed_assertion) {
abort();
}
-Status CoreCodegen::SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** bp) {
- GPR_TIMER_SCOPE("SerializeProto", 0);
- int byte_size = msg.ByteSize();
- if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
- gpr_slice slice = gpr_slice_malloc(byte_size);
- GPR_ASSERT(GPR_SLICE_END_PTR(slice) ==
- msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
- *bp = grpc_raw_byte_buffer_create(&slice, 1);
- gpr_slice_unref(slice);
- return Status::OK;
- } else {
- GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
- return msg.SerializeToZeroCopyStream(&writer)
- ? Status::OK
- : Status(StatusCode::INTERNAL, "Failed to serialize message");
- }
-}
-
-Status CoreCodegen::DeserializeProto(grpc_byte_buffer* buffer,
- grpc::protobuf::Message* msg,
- int max_message_size) {
- GPR_TIMER_SCOPE("DeserializeProto", 0);
- if (buffer == nullptr) {
- return Status(StatusCode::INTERNAL, "No payload");
- }
- Status result = Status::OK;
- {
- GrpcBufferReader reader(buffer);
- ::grpc::protobuf::io::CodedInputStream decoder(&reader);
- if (max_message_size > 0) {
- decoder.SetTotalBytesLimit(max_message_size, max_message_size);
- }
- if (!msg->ParseFromCodedStream(&decoder)) {
- result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
- }
- if (!decoder.ConsumedEntireMessage()) {
- result = Status(StatusCode::INTERNAL, "Did not read entire message");
- }
- }
- grpc_byte_buffer_destroy(buffer);
- return result;
-}
-
} // namespace grpc
diff --git a/src/cpp/common/core_codegen.h b/src/cpp/common/core_codegen.h
index e15cb4c34a..656b11e7e7 100644
--- a/src/cpp/common/core_codegen.h
+++ b/src/cpp/common/core_codegen.h
@@ -42,13 +42,6 @@ namespace grpc {
/// Implementation of the core codegen interface.
class CoreCodegen : public CoreCodegenInterface {
private:
- Status SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** bp) override;
-
- Status DeserializeProto(grpc_byte_buffer* buffer,
- grpc::protobuf::Message* msg,
- int max_message_size) override;
-
grpc_completion_queue* grpc_completion_queue_create(void* reserved) override;
void grpc_completion_queue_destroy(grpc_completion_queue* cq) override;
grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
@@ -60,11 +53,30 @@ class CoreCodegen : public CoreCodegenInterface {
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;
+ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
+ grpc_byte_buffer* buffer) override;
+ void grpc_byte_buffer_reader_destroy(
+ grpc_byte_buffer_reader* reader) override;
+ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
+ gpr_slice* slice) override;
+
+ grpc_byte_buffer* grpc_raw_byte_buffer_create(gpr_slice* slice,
+ size_t nslices) override;
+
+ gpr_slice gpr_slice_malloc(size_t length) override;
+ void gpr_slice_unref(gpr_slice slice) override;
+ gpr_slice gpr_slice_split_tail(gpr_slice* s, size_t split) override;
+ void gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) override;
+ void gpr_slice_buffer_pop(gpr_slice_buffer* sb) override;
+
void grpc_metadata_array_init(grpc_metadata_array* array) override;
void grpc_metadata_array_destroy(grpc_metadata_array* array) override;
gpr_timespec gpr_inf_future(gpr_clock_type type) override;
+ virtual const Status& ok() override;
+ virtual const Status& cancelled() override;
+
void assert_fail(const char* failed_assertion) override;
};
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 6c13a4fa48..d92addbf54 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -167,6 +167,37 @@ namespace Grpc.Core.Tests
}
[Test]
+ public async Task ServerStreamingCall_EndOfStreamIsIdempotent()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ }
+
+ [Test]
+ public async Task ServerStreamingCall_ErrorCanBeAwaitedTwice()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ context.Status = new Status(StatusCode.InvalidArgument, "");
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
+ Assert.AreEqual(StatusCode.InvalidArgument, ex.Status.StatusCode);
+
+ // attempting MoveNext again should result in throwing the same exception.
+ var ex2 = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
+ Assert.AreEqual(StatusCode.InvalidArgument, ex2.Status.StatusCode);
+ }
+
+ [Test]
public async Task DuplexStreamingCall()
{
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
@@ -209,6 +240,38 @@ namespace Grpc.Core.Tests
}
[Test]
+ public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull()
+ {
+ var handlerStartedBarrier = new TaskCompletionSource<object>();
+ var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>();
+ var successTcs = new TaskCompletionSource<string>();
+
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ handlerStartedBarrier.SetResult(null);
+
+ // wait for cancellation to be delivered.
+ context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null));
+ await cancelNotificationReceivedBarrier.Task;
+
+ var moveNextResult = await requestStream.MoveNext();
+ successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL");
+ return "";
+ });
+
+ var cts = new CancellationTokenSource();
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
+
+ await handlerStartedBarrier.Task;
+ cts.Cancel();
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+
+ Assert.AreEqual("SUCCESS", await successTcs.Task);
+ }
+
+ [Test]
public async Task AsyncUnaryCall_EchoMetadata()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index 0cd059c232..47131fc454 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -84,6 +84,8 @@
<Compile Include="SanityTest.cs" />
<Compile Include="HalfcloseTest.cs" />
<Compile Include="NUnitMain.cs" />
+ <Compile Include="Internal\FakeNativeCall.cs" />
+ <Compile Include="Internal\AsyncCallServerTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
new file mode 100644
index 0000000000..058371521d
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -0,0 +1,191 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+
+using Grpc.Core.Internal;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ /// <summary>
+ /// Uses fake native call to test interaction of <c>AsyncCallServer</c> wrapping code with C core in different situations.
+ /// </summary>
+ public class AsyncCallServerTest
+ {
+ Server server;
+ FakeNativeCall fakeCall;
+ AsyncCallServer<string, string> asyncCallServer;
+
+ [SetUp]
+ public void Init()
+ {
+ var environment = GrpcEnvironment.AddRef();
+
+ // Create a fake server just so we have an instance to refer to.
+ // The server won't actually be used at all.
+ server = new Server()
+ {
+ Ports = { { "localhost", 0, ServerCredentials.Insecure } }
+ };
+ server.Start();
+
+ fakeCall = new FakeNativeCall();
+ asyncCallServer = new AsyncCallServer<string, string>(
+ Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer,
+ environment,
+ server);
+ asyncCallServer.InitializeForTesting(fakeCall);
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ server.ShutdownAsync().Wait();
+ GrpcEnvironment.Release();
+ }
+
+ [Test]
+ public void CancelNotificationAfterStartDisposes()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void CancelNotificationAfterStartDisposesAfterPendingReadFinishes()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
+
+ var moveNextTask = requestStream.MoveNext();
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ fakeCall.ReceivedMessageHandler(true, null);
+ Assert.IsFalse(moveNextTask.Result);
+
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void ReadAfterCancelNotificationCanSucceed()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+
+ // Check that starting a read after cancel notification has been processed is legal.
+ var moveNextTask = requestStream.MoveNext();
+ Assert.IsFalse(moveNextTask.Result);
+
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void ReadCompletionFailureClosesRequestStream()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
+
+ // if a read completion's success==false, the request stream will silently finish
+ // and we rely on C core cancelling the call.
+ var moveNextTask = requestStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(false, null);
+ Assert.IsFalse(moveNextTask.Result);
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void WriteAfterCancelNotificationFails()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
+ var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+
+ // TODO(jtattermusch): should we throw a different exception type instead?
+ Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1"));
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void WriteCompletionFailureThrows()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
+
+ var writeTask = responseStream.WriteAsync("request1");
+ fakeCall.SendCompletionHandler(false);
+ // TODO(jtattermusch): should we throw a different exception type instead?
+ Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void WriteAndWriteStatusCanRunConcurrently()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
+
+ var writeTask = responseStream.WriteAsync("request1");
+ var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata());
+
+ fakeCall.SendCompletionHandler(true);
+ fakeCall.SendStatusFromServerHandler(true);
+
+ Assert.DoesNotThrowAsync(async () => await writeTask);
+ Assert.DoesNotThrowAsync(async () => await writeStatusTask);
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask)
+ {
+ Assert.IsTrue(fakeCall.IsDisposed);
+ Assert.IsTrue(finishedTask.IsCompleted);
+ Assert.DoesNotThrow(() => finishedTask.Wait());
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index a678e4dafe..abe9d4a2e6 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
@@ -40,6 +41,9 @@ using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
+ /// <summary>
+ /// Uses fake native call to test interaction of <c>AsyncCall</c> wrapping code with C core in different situations.
+ /// </summary>
public class AsyncCallTest
{
Channel channel;
@@ -75,8 +79,8 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_StreamingOperationsNotAllowed()
{
asyncCall.UnaryCallAsync("request1");
- Assert.Throws(typeof(InvalidOperationException),
- () => asyncCall.StartReadMessage((x,y) => {}));
+ Assert.ThrowsAsync(typeof(InvalidOperationException),
+ async () => await asyncCall.ReadMessageAsync());
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
}
@@ -119,6 +123,14 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
+ public void ClientStreaming_StreamingReadNotAllowed()
+ {
+ asyncCall.ClientStreamingCallAsync();
+ Assert.ThrowsAsync(typeof(InvalidOperationException),
+ async () => await asyncCall.ReadMessageAsync());
+ }
+
+ [Test]
public void ClientStreaming_NoRequest_Success()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
@@ -142,6 +154,283 @@ namespace Grpc.Core.Internal.Tests
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
}
+ [Test]
+ public void ClientStreaming_MoreRequests_Success()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ var writeTask = requestStream.WriteAsync("request1");
+ fakeCall.SendCompletionHandler(true);
+ writeTask.Wait();
+
+ var writeTask2 = requestStream.WriteAsync("request2");
+ fakeCall.SendCompletionHandler(true);
+ writeTask2.Wait();
+
+ var completeTask = requestStream.CompleteAsync();
+ fakeCall.SendCompletionHandler(true);
+ completeTask.Wait();
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteFailure()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ var writeTask = requestStream.WriteAsync("request1");
+ fakeCall.SendCompletionHandler(false);
+ Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ CreateClientSideStatus(StatusCode.Internal),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteAfterReceivingStatusFails()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
+ }
+
+ [Test]
+ public void ClientStreaming_CompleteAfterReceivingStatusSucceeds()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
+ }
+
+ [Test]
+ public void ClientStreaming_WriteAfterCancellationRequestFails()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ asyncCall.Cancel();
+ Assert.IsTrue(fakeCall.IsCancelled);
+
+ Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
+
+ fakeCall.UnaryResponseClientHandler(true,
+ CreateClientSideStatus(StatusCode.Cancelled),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
+ }
+
+ [Test]
+ public void ServerStreaming_StreamingSendNotAllowed()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ Assert.Throws(typeof(InvalidOperationException),
+ () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
+ }
+
+ [Test]
+ public void ServerStreaming_NoResponse_Success1()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+ var readTask = responseStream.MoveNext();
+
+ fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
+ Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
+
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
+ }
+
+ [Test]
+ public void ServerStreaming_NoResponse_Success2()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+ var readTask = responseStream.MoveNext();
+
+ // try alternative order of completions
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageHandler(true, null);
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
+ }
+
+ [Test]
+ public void ServerStreaming_NoResponse_ReadFailure()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+ var readTask = responseStream.MoveNext();
+
+ fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
+ fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal));
+
+ AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
+ }
+
+ [Test]
+ public void ServerStreaming_MoreResponses_Success()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var readTask1 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ Assert.IsTrue(readTask1.Result);
+ Assert.AreEqual("response1", responseStream.Current);
+
+ var readTask2 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ Assert.IsTrue(readTask2.Result);
+ Assert.AreEqual("response1", responseStream.Current);
+
+ var readTask3 = responseStream.MoveNext();
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageHandler(true, null);
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
+ }
+
+ [Test]
+ public void DuplexStreaming_NoRequestNoResponse_Success()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var writeTask1 = requestStream.CompleteAsync();
+ fakeCall.SendCompletionHandler(true);
+ Assert.DoesNotThrowAsync(async () => await writeTask1);
+
+ var readTask = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
+ }
+
+ [Test]
+ public void DuplexStreaming_WriteAfterReceivingStatusFails()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var readTask = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
+
+ Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
+ }
+
+ [Test]
+ public void DuplexStreaming_CompleteAfterReceivingStatusFails()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var readTask = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
+
+ Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
+ }
+
+ [Test]
+ public void DuplexStreaming_WriteAfterCancellationRequestFails()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ asyncCall.Cancel();
+ Assert.IsTrue(fakeCall.IsCancelled);
+ Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
+
+ var readTask = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
+
+ AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
+ }
+
+ [Test]
+ public void DuplexStreaming_ReadAfterCancellationRequestCanSucceed()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ asyncCall.Cancel();
+ Assert.IsTrue(fakeCall.IsCancelled);
+
+ var readTask1 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ Assert.IsTrue(readTask1.Result);
+ Assert.AreEqual("response1", responseStream.Current);
+
+ var readTask2 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
+
+ AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
+ }
+
+ [Test]
+ public void DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var readTask1 = responseStream.MoveNext(); // initiate the read before cancel request
+ asyncCall.Cancel();
+ Assert.IsTrue(fakeCall.IsCancelled);
+
+ fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ Assert.IsTrue(readTask1.Result);
+ Assert.AreEqual("response1", responseStream.Current);
+
+ var readTask2 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
+
+ AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
+ }
+
ClientSideStatus CreateClientSideStatus(StatusCode statusCode)
{
return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
@@ -163,6 +452,16 @@ namespace Grpc.Core.Internal.Tests
Assert.AreEqual("response1", resultTask.Result);
}
+ static void AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask)
+ {
+ Assert.IsTrue(moveNextTask.IsCompleted);
+ Assert.IsTrue(fakeCall.IsDisposed);
+
+ Assert.IsFalse(moveNextTask.Result);
+ Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
+ Assert.AreEqual(0, asyncCall.GetTrailers().Count);
+ }
+
static void AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode)
{
Assert.IsTrue(resultTask.IsCompleted);
@@ -175,135 +474,15 @@ namespace Grpc.Core.Internal.Tests
Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
- internal class FakeNativeCall : INativeCall
- {
- public UnaryResponseClientHandler UnaryResponseClientHandler
- {
- get;
- set;
- }
-
- public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
- {
- get;
- set;
- }
-
- public ReceivedMessageHandler ReceivedMessageHandler
- {
- get;
- set;
- }
-
- public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
- {
- get;
- set;
- }
-
- public SendCompletionHandler SendCompletionHandler
- {
- get;
- set;
- }
-
- public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
- {
- get;
- set;
- }
-
- public bool IsCancelled
- {
- get;
- set;
- }
-
- public bool IsDisposed
- {
- get;
- set;
- }
-
- public void Cancel()
- {
- IsCancelled = true;
- }
-
- public void CancelWithStatus(Status status)
- {
- IsCancelled = true;
- }
-
- public string GetPeer()
- {
- return "PEER";
- }
-
- public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
- {
- UnaryResponseClientHandler = callback;
- }
-
- public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
- {
- throw new NotImplementedException();
- }
-
- public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
- {
- UnaryResponseClientHandler = callback;
- }
-
- public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
- {
- ReceivedStatusOnClientHandler = callback;
- }
-
- public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
- {
- ReceivedStatusOnClientHandler = callback;
- }
-
- public void StartReceiveMessage(ReceivedMessageHandler callback)
- {
- ReceivedMessageHandler = callback;
- }
-
- public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
- {
- ReceivedResponseHeadersHandler = callback;
- }
-
- public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
- {
- SendCompletionHandler = callback;
- }
-
- public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
- {
- SendCompletionHandler = callback;
- }
-
- public void StartSendCloseFromClient(SendCompletionHandler callback)
- {
- SendCompletionHandler = callback;
- }
-
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
- {
- SendCompletionHandler = callback;
- }
-
- public void StartServerSide(ReceivedCloseOnServerHandler callback)
- {
- ReceivedCloseOnServerHandler = callback;
- }
-
- public void Dispose()
- {
- IsDisposed = true;
- }
+ static void AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode)
+ {
+ Assert.IsTrue(moveNextTask.IsCompleted);
+ Assert.IsTrue(fakeCall.IsDisposed);
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask);
+ Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode);
+ Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
+ Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
}
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
new file mode 100644
index 0000000000..1bec258ca2
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
@@ -0,0 +1,183 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+
+using Grpc.Core.Internal;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ /// <summary>
+ /// For testing purposes.
+ /// </summary>
+ internal class FakeNativeCall : INativeCall
+ {
+ public UnaryResponseClientHandler UnaryResponseClientHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedMessageHandler ReceivedMessageHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
+ {
+ get;
+ set;
+ }
+
+ public SendCompletionHandler SendCompletionHandler
+ {
+ get;
+ set;
+ }
+
+ public SendCompletionHandler SendStatusFromServerHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
+ {
+ get;
+ set;
+ }
+
+ public bool IsCancelled
+ {
+ get;
+ set;
+ }
+
+ public bool IsDisposed
+ {
+ get;
+ set;
+ }
+
+ public void Cancel()
+ {
+ IsCancelled = true;
+ }
+
+ public void CancelWithStatus(Status status)
+ {
+ IsCancelled = true;
+ }
+
+ public string GetPeer()
+ {
+ return "PEER";
+ }
+
+ public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ {
+ UnaryResponseClientHandler = callback;
+ }
+
+ public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
+ {
+ UnaryResponseClientHandler = callback;
+ }
+
+ public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ {
+ ReceivedStatusOnClientHandler = callback;
+ }
+
+ public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
+ {
+ ReceivedStatusOnClientHandler = callback;
+ }
+
+ public void StartReceiveMessage(ReceivedMessageHandler callback)
+ {
+ ReceivedMessageHandler = callback;
+ }
+
+ public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
+ {
+ ReceivedResponseHeadersHandler = callback;
+ }
+
+ public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
+ {
+ SendCompletionHandler = callback;
+ }
+
+ public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
+ {
+ SendCompletionHandler = callback;
+ }
+
+ public void StartSendCloseFromClient(SendCompletionHandler callback)
+ {
+ SendCompletionHandler = callback;
+ }
+
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ {
+ SendStatusFromServerHandler = callback;
+ }
+
+ public void StartServerSide(ReceivedCloseOnServerHandler callback)
+ {
+ ReceivedCloseOnServerHandler = callback;
+ }
+
+ public void Dispose()
+ {
+ IsDisposed = true;
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 50ba617cdb..f522174bd0 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -241,11 +241,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming response. Only one pending read action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate)
+ public Task<TResponse> ReadMessageAsync()
{
- StartReadMessageInternal(completionDelegate);
+ return ReadMessageInternalAsync();
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index ccd047f469..42234dcac2 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -68,7 +68,8 @@ namespace Grpc.Core.Internal
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
- protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
+ protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
+ protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
@@ -150,15 +151,25 @@ namespace Grpc.Core.Internal
/// Initiates reading a message. Only one read operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate)
+ protected Task<TRead> ReadMessageInternalAsync()
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckReadingAllowed();
+ GrpcPreconditions.CheckState(started);
+ if (readingDone)
+ {
+ // the last read that returns null or throws an exception is idempotent
+ // and maintain its state.
+ GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
+ return streamingReadTcs.Task;
+ }
+
+ GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
+ GrpcPreconditions.CheckState(!disposed);
call.StartReceiveMessage(HandleReadFinished);
- readCompletionDelegate = completionDelegate;
+ streamingReadTcs = new TaskCompletionSource<TRead>();
+ return streamingReadTcs.Task;
}
}
@@ -213,15 +224,6 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
- protected virtual void CheckReadingAllowed()
- {
- GrpcPreconditions.CheckState(started);
- GrpcPreconditions.CheckState(!disposed);
-
- GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed.");
- GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
- }
-
protected void CheckNotCancelled()
{
if (cancelRequested)
@@ -322,22 +324,18 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendStatusFromServerFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
-
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server."));
+ sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server."));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ sendStatusFromServerTcs.SetResult(null);
}
}
@@ -346,15 +344,17 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleReadFinished(bool success, byte[] receivedMessage)
{
+ // if success == false, received message will be null. It that case we will
+ // treat this completion as the last read an rely on C core to handle the failed
+ // read (e.g. deliver approriate statusCode on the clientside).
+
TRead msg = default(TRead);
var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
- AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
+ TaskCompletionSource<TRead> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = readCompletionDelegate;
- readCompletionDelegate = null;
-
+ origTcs = streamingReadTcs;
if (receivedMessage == null)
{
// This was the last read.
@@ -364,20 +364,25 @@ namespace Grpc.Core.Internal
if (deserializeException != null && IsClient)
{
readingDone = true;
+
+ // TODO(jtattermusch): it might be too late to set the status
CancelWithStatus(DeserializeResponseFailureStatus);
}
+ if (!readingDone)
+ {
+ streamingReadTcs = null;
+ }
+
ReleaseResourcesIfPossible();
}
- // TODO: handle the case when success==false
-
if (deserializeException != null && !IsClient)
{
- FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException));
+ origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException));
return;
}
- FireCompletion(origCompletionDelegate, msg, null);
+ origTcs.SetResult(msg);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index bea2b3660c..eafe2ccab8 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -65,6 +65,15 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Only for testing purposes.
+ /// </summary>
+ public void InitializeForTesting(INativeCall call)
+ {
+ server.AddCallReference(this);
+ InitializeInternal(call);
+ }
+
+ /// <summary>
/// Starts a server side call.
/// </summary>
public Task ServerSideCallAsync()
@@ -91,11 +100,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming request. Only one pending read action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate)
+ public Task<TRequest> ReadMessageAsync()
{
- StartReadMessageInternal(completionDelegate);
+ return ReadMessageInternalAsync();
}
/// <summary>
@@ -128,24 +136,25 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// Sends call result status, also indicating server is done with streaming responses.
- /// Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
+ /// Sends call result status, indicating we are done with writes.
+ /// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// </summary>
- public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendStatusFromServerAsync(Status status, Metadata trailers)
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: false);
+ GrpcPreconditions.CheckState(started);
+ GrpcPreconditions.CheckState(!disposed);
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once.");
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
- readingDone = true;
- sendCompletionDelegate = completionDelegate;
+ initialMetadataSent = true;
+ sendStatusFromServerTcs = new TaskCompletionSource<object>();
+ return sendStatusFromServerTcs.Task;
}
}
@@ -174,12 +183,6 @@ namespace Grpc.Core.Internal
get { return false; }
}
- protected override void CheckReadingAllowed()
- {
- base.CheckReadingAllowed();
- GrpcPreconditions.CheckArgument(!cancelRequested);
- }
-
protected override void OnAfterReleaseResources()
{
server.RemoveCallReference(this);
@@ -190,12 +193,21 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleFinishedServerside(bool success, bool cancelled)
{
+ // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
+ // success will be always set to true.
lock (myLock)
{
finished = true;
+ if (streamingReadTcs == null)
+ {
+ // if there's no pending read, readingDone=true will dispose now.
+ // if there is a pending read, we will dispose once that read finishes.
+ readingDone = true;
+ streamingReadTcs = new TaskCompletionSource<TRequest>();
+ streamingReadTcs.SetResult(default(TRequest));
+ }
ReleaseResourcesIfPossible();
}
- // TODO(jtattermusch): handle error
if (cancelled)
{
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index d6e34a0f04..ad9423ff58 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
- var taskSource = new AsyncCompletionTaskSource<TResponse>();
- call.StartReadMessage(taskSource.CompletionDelegate);
- var result = await taskSource.Task.ConfigureAwait(false);
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
if (result == null)
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 1f83e51548..00d82d51e8 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -80,8 +80,6 @@ namespace Grpc.Core.Internal
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
- // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
var result = await handler(request, context).ConfigureAwait(false);
status = context.Status;
await responseStream.WriteAsync(result).ConfigureAwait(false);
@@ -93,7 +91,7 @@ namespace Grpc.Core.Internal
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -136,8 +134,6 @@ namespace Grpc.Core.Internal
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
- // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
await handler(request, responseStream, context).ConfigureAwait(false);
status = context.Status;
}
@@ -149,7 +145,7 @@ namespace Grpc.Core.Internal
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -209,7 +205,7 @@ namespace Grpc.Core.Internal
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -260,7 +256,7 @@ namespace Grpc.Core.Internal
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -282,9 +278,7 @@ namespace Grpc.Core.Internal
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
- var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
-
- await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
}
}
@@ -300,7 +294,6 @@ namespace Grpc.Core.Internal
return rpcException.Status;
}
- // TODO(jtattermusch): what is the right status code here?
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index e7be82c318..d76030d1ad 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
- var taskSource = new AsyncCompletionTaskSource<TRequest>();
- call.StartReadMessage(taskSource.CompletionDelegate);
- var result = await taskSource.Task.ConfigureAwait(false);
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
return result != null;
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 03e39efc02..ecfee0bfdd 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -57,13 +57,6 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
- public Task WriteStatusAsync(Status status, Metadata trailers)
- {
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
- return taskSource.Task;
- }
-
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();
diff --git a/src/csharp/tests.json b/src/csharp/tests.json
index f733352a31..f6af3408d5 100644
--- a/src/csharp/tests.json
+++ b/src/csharp/tests.json
@@ -1,5 +1,6 @@
{
"Grpc.Core.Tests": [
+ "Grpc.Core.Internal.Tests.AsyncCallServerTest",
"Grpc.Core.Internal.Tests.AsyncCallTest",
"Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest",
"Grpc.Core.Internal.Tests.CompletionQueueEventTest",
diff --git a/src/proto/grpc/testing/echo.proto b/src/proto/grpc/testing/echo.proto
index 0eef53a92a..c596aabfcc 100644
--- a/src/proto/grpc/testing/echo.proto
+++ b/src/proto/grpc/testing/echo.proto
@@ -45,3 +45,7 @@ service EchoTestService {
service UnimplementedService {
rpc Unimplemented(EchoRequest) returns (EchoResponse);
}
+
+// A service without any rpc defined to test coverage.
+service NoRpcService {
+}
diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml
index d13ce42655..34bb477543 100644
--- a/src/ruby/.rubocop.yml
+++ b/src/ruby/.rubocop.yml
@@ -11,10 +11,10 @@ AllCops:
- 'pb/test/**/*'
Metrics/CyclomaticComplexity:
- Max: 8
+ Max: 9
Metrics/PerceivedComplexity:
- Max: 8
+ Max: 9
Metrics/ClassLength:
Max: 250
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 7f3a38a9f4..a0f4071adc 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -332,15 +332,13 @@ module GRPC
# the current thread to terminate it.
def run_till_terminated
GRPC.trap_signals
- stopped = false
t = Thread.new do
run
- stopped = true
end
+ t.abort_on_exception = true
wait_till_running
- loop do
+ until running_state == :stopped
sleep SIGNAL_CHECK_PERIOD
- break if stopped
break unless GRPC.handle_signals
end
stop
@@ -416,7 +414,7 @@ module GRPC
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
- c.send_status(StatusCodes::RESOURCE_EXHAUSTED, '')
+ c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
nil
end
@@ -427,7 +425,7 @@ module GRPC
GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
- c.send_status(StatusCodes::UNIMPLEMENTED, '')
+ c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
nil
end
@@ -443,7 +441,12 @@ module GRPC
unless active_call.nil?
@pool.schedule(active_call) do |ac|
c, mth = ac
- rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
+ begin
+ rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
+ rescue StandardError
+ c.send_status(GRPC::Core::StatusCodes::INTERNAL,
+ 'Server handler failed')
+ end
end
end
rescue Core::CallError, RuntimeError => e
diff --git a/test/core/end2end/fixtures/h2_census.c b/test/core/end2end/fixtures/h2_census.c
index ff2f028f09..e46b39e476 100644
--- a/test/core/end2end/fixtures/h2_census.c
+++ b/test/core/end2end/fixtures/h2_census.c
@@ -111,7 +111,7 @@ void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) {
/* All test configurations */
static grpc_end2end_test_config configs[] = {
- {"chttp2/fullstack", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION,
+ {"chttp2/fullstack+census", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION,
chttp2_create_fixture_fullstack, chttp2_init_client_fullstack,
chttp2_init_server_fullstack, chttp2_tear_down_fullstack},
};
diff --git a/test/core/surface/public_headers_must_be_c89.c b/test/core/surface/public_headers_must_be_c89.c
index 0eede6c23b..3eeb55d033 100644
--- a/test/core/surface/public_headers_must_be_c89.c
+++ b/test/core/surface/public_headers_must_be_c89.c
@@ -41,6 +41,7 @@
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/atm.h>
#include <grpc/impl/codegen/byte_buffer.h>
+#include <grpc/impl/codegen/byte_buffer_reader.h>
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 7e4d6046d6..0232a9fa31 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -51,6 +51,7 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/string_ref_helper.h"
+#include "test/cpp/util/test_credentials_provider.h"
#ifdef GPR_POSIX_SOCKET
#include "src/core/lib/iomgr/ev_posix.h"
@@ -58,6 +59,7 @@
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
+using grpc::testing::kTlsCredentialsType;
using std::chrono::system_clock;
GPR_TLS_DECL(g_is_async_end2end_test);
@@ -197,20 +199,37 @@ class Verifier {
bool spin_;
};
-class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
+class TestScenario {
+ public:
+ TestScenario(bool non_block, const grpc::string& creds_type,
+ const grpc::string& content)
+ : disable_blocking(non_block),
+ credentials_type(creds_type),
+ message_content(content) {}
+ void Log() const {
+ gpr_log(GPR_INFO,
+ "Scenario: disable_blocking %d, credentials %s, message size %d",
+ disable_blocking, credentials_type.c_str(), message_content.size());
+ }
+ bool disable_blocking;
+ const grpc::string credentials_type;
+ const grpc::string message_content;
+};
+
+class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
protected:
- AsyncEnd2endTest() {}
+ AsyncEnd2endTest() { GetParam().Log(); }
void SetUp() GRPC_OVERRIDE {
- poll_overrider_.reset(new PollingOverrider(!GetParam()));
+ poll_overrider_.reset(new PollingOverrider(!GetParam().disable_blocking));
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
ServerBuilder builder;
- builder.AddListeningPort(server_address_.str(),
- grpc::InsecureServerCredentials());
+ auto server_creds = GetServerCredentials(GetParam().credentials_type);
+ builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
@@ -230,8 +249,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
}
void ResetStub() {
+ ChannelArguments args;
+ auto channel_creds =
+ GetChannelCredentials(GetParam().credentials_type, &args);
std::shared_ptr<Channel> channel =
- CreateChannel(server_address_.str(), InsecureChannelCredentials());
+ CreateCustomChannel(server_address_.str(), channel_creds, args);
stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
@@ -247,22 +269,23 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -302,7 +325,7 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@@ -310,23 +333,22 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
std::chrono::system_clock::now());
std::chrono::system_clock::time_point time_limit(
std::chrono::system_clock::now() + std::chrono::seconds(10));
- Verifier(GetParam()).Verify(cq_.get(), time_now);
- Verifier(GetParam()).Verify(cq_.get(), time_now);
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Verify(cq_.get(), time_limit);
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam())
- .Expect(3, true)
- .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam())
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
.Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
@@ -347,41 +369,48 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ServerContext srv_ctx;
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Expect(1, true)
+ .Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7));
- Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9));
- Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
-
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(9, true)
+ .Expect(10, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -400,39 +429,45 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ServerContext srv_ctx;
ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
- Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Finish(Status::OK, tag(7));
- Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(8));
- Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9));
- Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@@ -450,41 +485,48 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ServerContext srv_ctx;
ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(1, true)
+ .Expect(2, true)
+ .Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
cli_stream->Read(&recv_response, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7));
- Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
-
srv_stream.Read(&recv_request, tag(8));
- Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(7, true)
+ .Expect(8, false)
+ .Verify(cq_.get());
srv_stream.Finish(Status::OK, tag(9));
- Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
-
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(9, true)
+ .Expect(10, true)
+ .Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@@ -503,7 +545,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
@@ -516,7 +558,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second,
@@ -529,11 +571,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
-
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -552,7 +594,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
@@ -561,15 +603,15 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
response_writer.SendInitialMetadata(tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second,
ToString(server_initial_metadata.find(meta1.first)->second));
@@ -579,10 +621,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
-
response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@@ -601,7 +644,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
@@ -610,20 +653,22 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4));
+ response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(4, true)
+ .Expect(5, true)
+ .Verify(cq_.get());
- response_reader->Finish(&recv_response, &recv_status, tag(5));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -647,7 +692,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2(
"key2-bin",
@@ -671,7 +716,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second,
@@ -683,9 +728,9 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second,
ToString(server_initial_metadata.find(meta3.first)->second));
@@ -697,11 +742,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(5));
+ response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(5, true)
+ .Expect(6, true)
+ .Verify(cq_.get());
- response_reader->Finish(&recv_response, &recv_status, tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@@ -726,7 +773,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@@ -734,15 +781,15 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_ctx.TryCancel();
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
}
@@ -761,7 +808,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@@ -769,25 +816,29 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
- Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
- EXPECT_FALSE(srv_ctx.IsCancelled());
-
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Expect(5, true)
+ .Verify(cq_.get());
+ EXPECT_FALSE(srv_ctx.IsCancelled());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
+ ChannelArguments args;
+ auto channel_creds =
+ GetChannelCredentials(GetParam().credentials_type, &args);
std::shared_ptr<Channel> channel =
- CreateChannel(server_address_.str(), InsecureChannelCredentials());
+ CreateCustomChannel(server_address_.str(), channel_creds, args);
std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub;
stub = grpc::testing::UnimplementedService::NewStub(channel);
EchoRequest send_request;
@@ -795,12 +846,12 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
Status recv_status;
ClientContext cli_ctx;
- send_request.set_message("Hello");
+ send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4));
- Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
EXPECT_EQ("", recv_status.error_message());
@@ -847,23 +898,25 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the 'RequestStream' call on client
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
- Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends 3 messages (tags 3, 4 and 5)
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_request.set_message("Ping " + std::to_string(tag_idx));
cli_stream->Write(send_request, tag(tag_idx));
- Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking)
+ .Expect(tag_idx, true)
+ .Verify(cq_.get());
}
cli_stream->WritesDone(tag(6));
- Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
bool expected_server_cq_result = true;
bool ignore_cq_result = false;
@@ -871,7 +924,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// Since cancellation is done before server reads any results, we know
@@ -881,7 +934,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
- auto verif = Verifier(GetParam());
+ auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@@ -939,13 +992,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Server sends the final message and cancelled status (but the RPC is
// already cancelled at this point. So we expect the operation to fail)
srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
- Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
// TODO(sreek): The expectation here should be true. This is a bug (github
// issue #4972)
- Verifier(GetParam()).Expect(10, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, false).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@@ -979,13 +1032,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the 'ResponseStream' call on the client
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
- Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
bool expected_cq_result = true;
@@ -994,7 +1047,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
@@ -1004,7 +1057,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
- auto verif = Verifier(GetParam());
+ auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@@ -1064,7 +1117,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Client attemts to read the three messages from the server
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
cli_stream->Read(&recv_response, tag(tag_idx));
- Verifier(GetParam())
+ Verifier(GetParam().disable_blocking)
.Expect(tag_idx, expected_cq_result)
.Verify(cq_.get(), ignore_cq_result);
}
@@ -1075,11 +1128,11 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Server finishes the stream (but the RPC is already cancelled)
srv_stream.Finish(Status::CANCELLED, tag(9));
- Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@@ -1114,19 +1167,19 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the call from the client side
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
- Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
- Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends the first and the only message
send_request.set_message("Ping");
cli_stream->Write(send_request, tag(3));
- Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
bool expected_cq_result = true;
bool ignore_cq_result = false;
@@ -1134,7 +1187,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
- Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
@@ -1144,7 +1197,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
- auto verif = Verifier(GetParam());
+ auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@@ -1244,10 +1297,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// know that cq results are supposed to return false on server.
srv_stream.Finish(Status::CANCELLED, tag(9));
- Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
- Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
+ Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@@ -1289,11 +1342,48 @@ TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
}
+std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
+ bool test_secure,
+ int test_big_limit) {
+ std::vector<TestScenario> scenarios;
+ std::vector<grpc::string> credentials_types;
+ std::vector<grpc::string> messages;
+
+ credentials_types.push_back(kInsecureCredentialsType);
+ auto sec_list = GetSecureCredentialsTypeList();
+ for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
+ credentials_types.push_back(*sec);
+ }
+
+ messages.push_back("Hello");
+ for (int sz = 1; sz < test_big_limit; sz *= 2) {
+ grpc::string big_msg;
+ for (int i = 0; i < sz * 1024; i++) {
+ char c = 'a' + (i % 26);
+ big_msg += c;
+ }
+ messages.push_back(big_msg);
+ }
+
+ for (auto cred = credentials_types.begin(); cred != credentials_types.end();
+ ++cred) {
+ for (auto msg = messages.begin(); msg != messages.end(); msg++) {
+ scenarios.push_back(TestScenario(false, *cred, *msg));
+ if (test_disable_blocking) {
+ scenarios.push_back(TestScenario(true, *cred, *msg));
+ }
+ }
+ }
+ return scenarios;
+}
+
INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
- ::testing::Values(false, true));
+ ::testing::ValuesIn(CreateTestScenarios(true, true,
+ 1024)));
INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
AsyncEnd2endServerTryCancelTest,
- ::testing::Values(false));
+ ::testing::ValuesIn(CreateTestScenarios(false, false,
+ 0)));
} // namespace
} // namespace testing
diff --git a/tools/distrib/check_include_guards.py b/tools/distrib/check_include_guards.py
index 897a899e7e..6c160c64b6 100755
--- a/tools/distrib/check_include_guards.py
+++ b/tools/distrib/check_include_guards.py
@@ -31,6 +31,7 @@
import argparse
import os
+import os.path
import re
import sys
import subprocess
@@ -187,6 +188,8 @@ filename_list = []
try:
filename_list = subprocess.check_output(FILE_LIST_COMMAND,
shell=True).splitlines()
+ # Filter out non-existent files (ie, file removed or renamed)
+ filename_list = (f for f in filename_list if os.path.isfile(f))
except subprocess.CalledProcessError:
sys.exit(0)
diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++
index 7dc0496047..664ca03d97 100644
--- a/tools/doxygen/Doxyfile.c++
+++ b/tools/doxygen/Doxyfile.c++
@@ -833,6 +833,7 @@ include/grpc++/impl/codegen/sync_no_cxx11.h \
include/grpc++/impl/codegen/sync_stream.h \
include/grpc++/impl/codegen/time.h \
include/grpc/impl/codegen/byte_buffer.h \
+include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal
index 312fd17cb2..5188ef1e8d 100644
--- a/tools/doxygen/Doxyfile.c++.internal
+++ b/tools/doxygen/Doxyfile.c++.internal
@@ -833,6 +833,7 @@ include/grpc++/impl/codegen/sync_no_cxx11.h \
include/grpc++/impl/codegen/sync_stream.h \
include/grpc++/impl/codegen/time.h \
include/grpc/impl/codegen/byte_buffer.h \
+include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core
index 034d9c6e6f..42bdb2f043 100644
--- a/tools/doxygen/Doxyfile.core
+++ b/tools/doxygen/Doxyfile.core
@@ -766,6 +766,7 @@ include/grpc/compression.h \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/impl/codegen/byte_buffer.h \
+include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 6d8f800540..f0052a1cb6 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -766,6 +766,7 @@ include/grpc/compression.h \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/impl/codegen/byte_buffer.h \
+include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json
index 25d9c5df94..990054d0d6 100644
--- a/tools/run_tests/sources_and_headers.json
+++ b/tools/run_tests/sources_and_headers.json
@@ -5906,6 +5906,7 @@
],
"headers": [
"include/grpc/impl/codegen/byte_buffer.h",
+ "include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
@@ -5916,6 +5917,7 @@
"name": "grpc_codegen",
"src": [
"include/grpc/impl/codegen/byte_buffer.h",
+ "include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj
index 29cab37d52..0ec53acf65 100644
--- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj
+++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj
@@ -331,6 +331,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\sync_stream.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\time.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />
diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
index 15e2807fd4..491aeaeb67 100644
--- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
@@ -315,6 +315,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
+ <Filter>include\grpc\impl\codegen</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
index fcda361ef1..96bee4101c 100644
--- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
@@ -331,6 +331,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\sync_stream.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\time.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />
diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
index 1dc95f985a..fe9eed781c 100644
--- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
@@ -300,6 +300,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
+ <Filter>include\grpc\impl\codegen</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj
index ded01ab066..4b5f1681c7 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj
@@ -273,6 +273,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\grpc.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\status.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
index ff90006e84..a9a08e8be2 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
@@ -519,6 +519,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
+ <Filter>include\grpc\impl\codegen</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
index 090d7d41d2..f6d41afd1a 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
@@ -264,6 +264,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\grpc.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\status.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
index 5bdc87402a..0cf1486bfc 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -459,6 +459,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
+ <Filter>include\grpc\impl\codegen</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj b/vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj
index cd0b40c873..34e939cf84 100644
--- a/vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj
+++ b/vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj
@@ -191,6 +191,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\sync_stream.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\time.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />
diff --git a/vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj.filters b/vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj.filters
index 029b8ef774..d66236580c 100644
--- a/vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj.filters
+++ b/vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj.filters
@@ -120,6 +120,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
+ <Filter>include\grpc\impl\codegen</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj b/vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj
index 6d138fae1c..890d77df22 100644
--- a/vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj
+++ b/vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj
@@ -191,6 +191,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\sync_stream.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\time.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />
diff --git a/vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj.filters b/vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj.filters
index dc3f0b2d04..4e0ba656fc 100644
--- a/vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj.filters
+++ b/vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj.filters
@@ -120,6 +120,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
+ <Filter>include\grpc\impl\codegen</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>