diff options
Diffstat (limited to 'src/python')
40 files changed, 2403 insertions, 533 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 28c30e5d35..237f430799 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -41,9 +41,8 @@ cdef class CompletionQueue: cdef object user_tag = None cdef Call operation_call = None cdef CallDetails request_call_details = None - cdef Metadata request_metadata = None + cdef object request_metadata = None cdef Operations batch_operations = None - cdef Operation batch_operation = None if event.type == GRPC_QUEUE_TIMEOUT: return Event( event.type, False, None, None, None, None, False, None) @@ -63,14 +62,8 @@ cdef class CompletionQueue: operation_call = tag.operation_call request_call_details = tag.request_call_details if tag.request_metadata is not None: - request_metadata = tag.request_metadata - request_metadata._claim_slice_ownership() + request_metadata = tuple(tag.request_metadata) batch_operations = tag.batch_operations - if tag.batch_operations is not None: - for op in batch_operations.operations: - batch_operation = <Operation>op - if batch_operation._received_metadata is not None: - batch_operation._received_metadata._claim_slice_ownership() if tag.is_new_request: # Stuff in the tag not explicitly handled by us needs to live through # the life of the call diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 98d7a9820d..57816f1cab 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -76,7 +76,7 @@ cdef class CredentialsMetadataPlugin: """ Args: plugin_callback (callable): Callback accepting a service URL (str/bytes) - and callback object (accepting a Metadata, + and callback object (accepting a MetadataArray, grpc_status_code, and a str/bytes error message). This argument when called should be non-blocking and eventually call the callback object with the appropriate status code/details and metadata (if @@ -129,8 +129,7 @@ cdef void plugin_get_metadata( def python_callback( Metadata metadata, grpc_status_code status, bytes error_details): - cb(user_data, metadata.c_metadata_array.metadata, - metadata.c_metadata_array.count, status, error_details) + cb(user_data, metadata.c_metadata, metadata.c_count, status, error_details) called_flag[0] = True cdef CredentialsMetadataPlugin self = <CredentialsMetadataPlugin>state cdef AuthMetadataContext cy_context = AuthMetadataContext() @@ -139,8 +138,8 @@ cdef void plugin_get_metadata( self.plugin_callback(cy_context, python_callback) except Exception as error: if not called_flag[0]: - cb(user_data, Metadata([]).c_metadata_array.metadata, - 0, StatusCode.unknown, traceback.format_exc().encode()) + cb(user_data, NULL, 0, StatusCode.unknown, + traceback.format_exc().encode()) cdef void plugin_destroy_c_plugin_state(void *state) with gil: cpython.Py_DECREF(<CredentialsMetadataPlugin>state) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 5950bfa0e6..840af5c43a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -59,6 +59,7 @@ cdef extern from "grpc/grpc.h": grpc_slice grpc_slice_malloc(size_t length) nogil grpc_slice grpc_slice_from_copied_string(const char *source) nogil grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len) nogil + grpc_slice grpc_slice_copy(grpc_slice s) nogil # Declare functions for function-like macros (because Cython)... void *grpc_slice_start_ptr "GRPC_SLICE_START_PTR" (grpc_slice s) nogil @@ -522,7 +523,7 @@ cdef extern from "grpc/compression.h": int grpc_compression_algorithm_parse( grpc_slice value, grpc_compression_algorithm *algorithm) nogil int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, - char **name) nogil + const char **name) nogil grpc_compression_algorithm grpc_compression_algorithm_for_level( grpc_compression_level level, uint32_t accepted_encodings) nogil void grpc_compression_options_init(grpc_compression_options *opts) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 8ace6aeb52..9c40ebf0c2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -37,7 +37,7 @@ cdef class OperationTag: cdef Server shutting_down_server cdef Call operation_call cdef CallDetails request_call_details - cdef Metadata request_metadata + cdef MetadataArray request_metadata cdef Operations batch_operations cdef bint is_new_request @@ -51,7 +51,7 @@ cdef class Event: # For Server.request_call cdef readonly bint is_new_request cdef readonly CallDetails request_call_details - cdef readonly Metadata request_metadata + cdef readonly object request_metadata # For server calls cdef readonly Call operation_call @@ -92,15 +92,20 @@ cdef class Metadatum: cdef class Metadata: + cdef grpc_metadata *c_metadata + cdef readonly size_t c_count + + +cdef class MetadataArray: + cdef grpc_metadata_array c_metadata_array - cdef void _claim_slice_ownership(self) cdef class Operation: cdef grpc_op c_op cdef ByteBuffer _received_message - cdef Metadata _received_metadata + cdef MetadataArray _received_metadata cdef grpc_status_code _received_status_code cdef grpc_slice _status_details cdef int _received_cancelled diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 1b2ddd2469..d860173b5d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -238,7 +238,7 @@ cdef class Event: def __cinit__(self, grpc_completion_type type, bint success, object tag, Call operation_call, CallDetails request_call_details, - Metadata request_metadata, + object request_metadata, bint is_new_request, Operations batch_operations): self.type = type @@ -437,48 +437,79 @@ cdef class Metadatum: cdef class _MetadataIterator: cdef size_t i - cdef Metadata metadata + cdef size_t _length + cdef object _metadatum_indexable - def __cinit__(self, Metadata metadata not None): + def __cinit__(self, length, metadatum_indexable): + self._length = length + self._metadatum_indexable = metadatum_indexable self.i = 0 - self.metadata = metadata def __iter__(self): return self def __next__(self): - if self.i < len(self.metadata): - result = self.metadata[self.i] + if self.i < self._length: + result = self._metadatum_indexable[self.i] self.i = self.i + 1 return result else: raise StopIteration +# TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this; just use an +# ordinary sequence of pairs of bytestrings all the way down to the +# grpc_call_start_batch call. cdef class Metadata: + """Metadata being passed from application to core.""" def __cinit__(self, metadata_iterable): + metadata_sequence = tuple(metadata_iterable) + cdef size_t count = len(metadata_sequence) with nogil: grpc_init() - grpc_metadata_array_init(&self.c_metadata_array) - metadata = list(metadata_iterable) - for metadatum in metadata: - if not isinstance(metadatum, Metadatum): - raise TypeError("expected list of Metadatum") - self.c_metadata_array.count = len(metadata) - self.c_metadata_array.capacity = len(metadata) + self.c_metadata = <grpc_metadata *>gpr_malloc( + count * sizeof(grpc_metadata)) + self.c_count = count + for index, metadatum in enumerate(metadata_sequence): + self.c_metadata[index].key = grpc_slice_copy( + (<Metadatum>metadatum).c_metadata.key) + self.c_metadata[index].value = grpc_slice_copy( + (<Metadatum>metadatum).c_metadata.value) + + def __dealloc__(self): + with nogil: + for index in range(self.c_count): + grpc_slice_unref(self.c_metadata[index].key) + grpc_slice_unref(self.c_metadata[index].value) + gpr_free(self.c_metadata) + grpc_shutdown() + + def __len__(self): + return self.c_count + + def __getitem__(self, size_t index): + if index < self.c_count: + key = _slice_bytes(self.c_metadata[index].key) + value = _slice_bytes(self.c_metadata[index].value) + return Metadatum(key, value) + else: + raise IndexError() + + def __iter__(self): + return _MetadataIterator(self.c_count, self) + + +cdef class MetadataArray: + """Metadata being passed from core to application.""" + + def __cinit__(self): with nogil: - self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( - self.c_metadata_array.count*sizeof(grpc_metadata) - ) - for i in range(self.c_metadata_array.count): - (<Metadatum>metadata[i])._copy_metadatum(&self.c_metadata_array.metadata[i]) + grpc_init() + grpc_metadata_array_init(&self.c_metadata_array) def __dealloc__(self): with nogil: - # this frees the allocated memory for the grpc_metadata_array (although - # it'd be nice if that were documented somewhere...) - # TODO(atash): document this in the C core grpc_metadata_array_destroy(&self.c_metadata_array) grpc_shutdown() @@ -493,21 +524,7 @@ cdef class Metadata: return Metadatum(key=key, value=value) def __iter__(self): - return _MetadataIterator(self) - - cdef void _claim_slice_ownership(self): - cdef grpc_metadata_array new_c_metadata_array - grpc_metadata_array_init(&new_c_metadata_array) - new_c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( - self.c_metadata_array.count*sizeof(grpc_metadata)) - new_c_metadata_array.count = self.c_metadata_array.count - for i in range(self.c_metadata_array.count): - new_c_metadata_array.metadata[i].key = _copy_slice( - self.c_metadata_array.metadata[i].key) - new_c_metadata_array.metadata[i].value = _copy_slice( - self.c_metadata_array.metadata[i].value) - grpc_metadata_array_destroy(&self.c_metadata_array) - self.c_metadata_array = new_c_metadata_array + return _MetadataIterator(self.c_metadata_array.count, self) cdef class Operation: @@ -547,14 +564,13 @@ cdef class Operation: if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT): raise TypeError("self must be an operation receiving metadata") - return self._received_metadata - - @property - def received_metadata_or_none(self): - if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and - self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT): - return None - return self._received_metadata + # TODO(https://github.com/grpc/grpc/issues/7950): Drop the "all Cython + # objects must be legitimate for use from Python at any time" policy in + # place today, shift the policy toward "Operation objects are only usable + # while their calls are active", and move this making-a-copy-because-this- + # data-needs-to-live-much-longer-than-the-call-from-which-it-arose to the + # lowest Python layer. + return tuple(self._received_metadata) @property def received_status_code(self): @@ -601,9 +617,8 @@ def operation_send_initial_metadata(Metadata metadata, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA op.c_op.flags = flags - op.c_op.data.send_initial_metadata.count = metadata.c_metadata_array.count - op.c_op.data.send_initial_metadata.metadata = ( - metadata.c_metadata_array.metadata) + op.c_op.data.send_initial_metadata.count = metadata.c_count + op.c_op.data.send_initial_metadata.metadata = metadata.c_metadata op.references.append(metadata) op.is_valid = True return op @@ -631,9 +646,8 @@ def operation_send_status_from_server( op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER op.c_op.flags = flags op.c_op.data.send_status_from_server.trailing_metadata_count = ( - metadata.c_metadata_array.count) - op.c_op.data.send_status_from_server.trailing_metadata = ( - metadata.c_metadata_array.metadata) + metadata.c_count) + op.c_op.data.send_status_from_server.trailing_metadata = metadata.c_metadata op.c_op.data.send_status_from_server.status = code grpc_slice_unref(op._status_details) op._status_details = _slice_from_bytes(details) @@ -646,7 +660,7 @@ def operation_receive_initial_metadata(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA op.c_op.flags = flags - op._received_metadata = Metadata([]) + op._received_metadata = MetadataArray() op.c_op.data.receive_initial_metadata.receive_initial_metadata = ( &op._received_metadata.c_metadata_array) op.is_valid = True @@ -669,7 +683,7 @@ def operation_receive_status_on_client(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT op.c_op.flags = flags - op._received_metadata = Metadata([]) + op._received_metadata = MetadataArray() op.c_op.data.receive_status_on_client.trailing_metadata = ( &op._received_metadata.c_metadata_array) op.c_op.data.receive_status_on_client.status = ( @@ -768,7 +782,7 @@ cdef class CompressionOptions: def compression_algorithm_name(grpc_compression_algorithm algorithm): - cdef char* name + cdef const char* name with nogil: grpc_compression_algorithm_name(algorithm, &name) # Let Cython do the right thing with string casting diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index dd276fd57b..b8db27469f 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -44,7 +44,7 @@ cdef class Server: cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = Call() operation_tag.request_call_details = CallDetails() - operation_tag.request_metadata = Metadata([]) + operation_tag.request_metadata = MetadataArray() operation_tag.references.extend([self, call_queue, server_queue]) operation_tag.is_new_request = True operation_tag.batch_operations = Operations([]) diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index dc4d28f95b..7b684f2a58 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -72,9 +72,14 @@ CORE_SOURCE_FILES = [ 'src/core/lib/compression/compression.c', 'src/core/lib/compression/message_compress.c', 'src/core/lib/compression/stream_compression.c', + 'src/core/lib/compression/stream_compression_gzip.c', + 'src/core/lib/compression/stream_compression_identity.c', + 'src/core/lib/debug/stats.c', + 'src/core/lib/debug/stats_data.c', 'src/core/lib/http/format_request.c', 'src/core/lib/http/httpcli.c', 'src/core/lib/http/parser.c', + 'src/core/lib/iomgr/call_combiner.c', 'src/core/lib/iomgr/closure.c', 'src/core/lib/iomgr/combiner.c', 'src/core/lib/iomgr/endpoint.c', @@ -83,8 +88,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/endpoint_pair_windows.c', 'src/core/lib/iomgr/error.c', 'src/core/lib/iomgr/ev_epoll1_linux.c', - 'src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c', - 'src/core/lib/iomgr/ev_epoll_thread_pool_linux.c', 'src/core/lib/iomgr/ev_epollex_linux.c', 'src/core/lib/iomgr/ev_epollsig_linux.c', 'src/core/lib/iomgr/ev_poll_posix.c', @@ -297,8 +300,8 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.c', 'src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c', 'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c', - 'src/core/ext/filters/load_reporting/load_reporting.c', - 'src/core/ext/filters/load_reporting/load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_filter.c', + 'src/core/ext/filters/load_reporting/server_load_reporting_plugin.c', 'src/core/ext/census/base_resources.c', 'src/core/ext/census/context.c', 'src/core/ext/census/gen/census.pb.c', diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index 0299b4cca9..1f5e9c5130 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -34,7 +34,7 @@ CLASSIFIERS = [ 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'License :: OSI Approved :: Apache Software License', -], +] PACKAGE_DIRECTORIES = { '': '.', diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py index bed2311b59..9360550afb 100644 --- a/src/python/grpcio_reflection/setup.py +++ b/src/python/grpcio_reflection/setup.py @@ -35,7 +35,7 @@ CLASSIFIERS = [ 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'License :: OSI Approved :: Apache Software License', -], +] PACKAGE_DIRECTORIES = { '': '.', diff --git a/src/python/grpcio_testing/grpc_testing/__init__.py b/src/python/grpcio_testing/grpc_testing/__init__.py index 14e25f09e2..994274500c 100644 --- a/src/python/grpcio_testing/grpc_testing/__init__.py +++ b/src/python/grpcio_testing/grpc_testing/__init__.py @@ -213,7 +213,7 @@ class StreamStreamChannelRpc(six.with_metaclass(abc.ABCMeta)): raise NotImplementedError() -class Channel(six.with_metaclass(abc.ABCMeta), grpc.Channel): +class Channel(six.with_metaclass(abc.ABCMeta, grpc.Channel)): """A grpc.Channel double with which to test a system that invokes RPCs.""" @abc.abstractmethod @@ -293,6 +293,278 @@ class Channel(six.with_metaclass(abc.ABCMeta), grpc.Channel): raise NotImplementedError() +class UnaryUnaryServerRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a unary-unary RPC serviced by a system under test. + + Enables users to "play client" for the RPC. + """ + + @abc.abstractmethod + def initial_metadata(self): + """Accesses the initial metadata emitted by the system under test. + + This method blocks until the system under test has added initial + metadata to the RPC (or has provided one or more response messages or + has terminated the RPC, either of which will cause gRPC Python to + synthesize initial metadata for the RPC). + + Returns: + The initial metadata for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Cancels the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def termination(self): + """Blocks until the system under test has terminated the RPC. + + Returns: + A (response, trailing_metadata, code, details) sequence with the RPC's + response, trailing metadata, code, and details. + """ + raise NotImplementedError() + + +class UnaryStreamServerRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a unary-stream RPC serviced by a system under test. + + Enables users to "play client" for the RPC. + """ + + @abc.abstractmethod + def initial_metadata(self): + """Accesses the initial metadata emitted by the system under test. + + This method blocks until the system under test has added initial + metadata to the RPC (or has provided one or more response messages or + has terminated the RPC, either of which will cause gRPC Python to + synthesize initial metadata for the RPC). + + Returns: + The initial metadata for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def take_response(self): + """Draws one of the responses added to the RPC by the system under test. + + Successive calls to this method return responses in the same order in + which the system under test added them to the RPC. + + Returns: + A response message added to the RPC by the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Cancels the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def termination(self): + """Blocks until the system under test has terminated the RPC. + + Returns: + A (trailing_metadata, code, details) sequence with the RPC's trailing + metadata, code, and details. + """ + raise NotImplementedError() + + +class StreamUnaryServerRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a stream-unary RPC serviced by a system under test. + + Enables users to "play client" for the RPC. + """ + + @abc.abstractmethod + def initial_metadata(self): + """Accesses the initial metadata emitted by the system under test. + + This method blocks until the system under test has added initial + metadata to the RPC (or has provided one or more response messages or + has terminated the RPC, either of which will cause gRPC Python to + synthesize initial metadata for the RPC). + + Returns: + The initial metadata for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def send_request(self, request): + """Sends a request to the system under test. + + Args: + request: A request message for the RPC to be "sent" to the system + under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def requests_closed(self): + """Indicates the end of the RPC's request stream.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Cancels the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def termination(self): + """Blocks until the system under test has terminated the RPC. + + Returns: + A (response, trailing_metadata, code, details) sequence with the RPC's + response, trailing metadata, code, and details. + """ + raise NotImplementedError() + + +class StreamStreamServerRpc(six.with_metaclass(abc.ABCMeta)): + """Fixture for a stream-stream RPC serviced by a system under test. + + Enables users to "play client" for the RPC. + """ + + @abc.abstractmethod + def initial_metadata(self): + """Accesses the initial metadata emitted by the system under test. + + This method blocks until the system under test has added initial + metadata to the RPC (or has provided one or more response messages or + has terminated the RPC, either of which will cause gRPC Python to + synthesize initial metadata for the RPC). + + Returns: + The initial metadata for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def send_request(self, request): + """Sends a request to the system under test. + + Args: + request: A request message for the RPC to be "sent" to the system + under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def requests_closed(self): + """Indicates the end of the RPC's request stream.""" + raise NotImplementedError() + + @abc.abstractmethod + def take_response(self): + """Draws one of the responses added to the RPC by the system under test. + + Successive calls to this method return responses in the same order in + which the system under test added them to the RPC. + + Returns: + A response message added to the RPC by the system under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Cancels the RPC.""" + raise NotImplementedError() + + @abc.abstractmethod + def termination(self): + """Blocks until the system under test has terminated the RPC. + + Returns: + A (trailing_metadata, code, details) sequence with the RPC's trailing + metadata, code, and details. + """ + raise NotImplementedError() + + +class Server(six.with_metaclass(abc.ABCMeta)): + """A server with which to test a system that services RPCs.""" + + @abc.abstractmethod + def invoke_unary_unary( + self, method_descriptor, invocation_metadata, request, timeout): + """Invokes an RPC to be serviced by the system under test. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a unary-unary + RPC method. + invocation_metadata: The RPC's invocation metadata. + request: The RPC's request. + timeout: A duration of time in seconds for the RPC or None to + indicate that the RPC has no time limit. + + Returns: + A UnaryUnaryServerRpc with which to "play client" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def invoke_unary_stream( + self, method_descriptor, invocation_metadata, request, timeout): + """Invokes an RPC to be serviced by the system under test. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a unary-stream + RPC method. + invocation_metadata: The RPC's invocation metadata. + request: The RPC's request. + timeout: A duration of time in seconds for the RPC or None to + indicate that the RPC has no time limit. + + Returns: + A UnaryStreamServerRpc with which to "play client" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def invoke_stream_unary( + self, method_descriptor, invocation_metadata, timeout): + """Invokes an RPC to be serviced by the system under test. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a stream-unary + RPC method. + invocation_metadata: The RPC's invocation metadata. + timeout: A duration of time in seconds for the RPC or None to + indicate that the RPC has no time limit. + + Returns: + A StreamUnaryServerRpc with which to "play client" for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def invoke_stream_stream( + self, method_descriptor, invocation_metadata, timeout): + """Invokes an RPC to be serviced by the system under test. + + Args: + method_descriptor: A descriptor.MethodDescriptor describing a stream-stream + RPC method. + invocation_metadata: The RPC's invocation metadata. + timeout: A duration of time in seconds for the RPC or None to + indicate that the RPC has no time limit. + + Returns: + A StreamStreamServerRpc with which to "play client" for the RPC. + """ + raise NotImplementedError() + + class Time(six.with_metaclass(abc.ABCMeta)): """A simulation of time. @@ -406,3 +678,20 @@ def channel(service_descriptors, time): """ from grpc_testing import _channel return _channel.testing_channel(service_descriptors, time) + + +def server_from_dictionary(descriptors_to_servicers, time): + """Creates a Server for use in tests of a gRPC Python-using system. + + Args: + descriptors_to_servicers: A dictionary from descriptor.ServiceDescriptors + defining RPC services to servicer objects (usually instances of classes + that implement "Servicer" interfaces defined in generated "_pb2_grpc" + modules) implementing those services. + time: A Time to be used for tests. + + Returns: + A Server for use in tests. + """ + from grpc_testing import _server + return _server.server_from_dictionary(descriptors_to_servicers, time) diff --git a/src/python/grpcio_testing/grpc_testing/_common.py b/src/python/grpcio_testing/grpc_testing/_common.py index cb4a7f5fa2..1517434ca7 100644 --- a/src/python/grpcio_testing/grpc_testing/_common.py +++ b/src/python/grpcio_testing/grpc_testing/_common.py @@ -37,6 +37,16 @@ def fuss_with_metadata(metadata): return _fuss(tuple(metadata)) +def rpc_names(service_descriptors): + rpc_names_to_descriptors = {} + for service_descriptor in service_descriptors: + for method_descriptor in service_descriptor.methods_by_name.values(): + rpc_name = '/{}/{}'.format( + service_descriptor.full_name, method_descriptor.name) + rpc_names_to_descriptors[rpc_name] = method_descriptor + return rpc_names_to_descriptors + + class ChannelRpcRead( collections.namedtuple( 'ChannelRpcRead', @@ -90,3 +100,61 @@ class ChannelHandler(six.with_metaclass(abc.ABCMeta)): self, method_full_rpc_name, invocation_metadata, requests, requests_closed, timeout): raise NotImplementedError() + + +class ServerRpcRead( + collections.namedtuple('ServerRpcRead', + ('request', 'requests_closed', 'terminated',))): + pass + + +REQUESTS_CLOSED = ServerRpcRead(None, True, False) +TERMINATED = ServerRpcRead(None, False, True) + + +class ServerRpcHandler(six.with_metaclass(abc.ABCMeta)): + + @abc.abstractmethod + def send_initial_metadata(self, initial_metadata): + raise NotImplementedError() + + @abc.abstractmethod + def take_request(self): + raise NotImplementedError() + + @abc.abstractmethod + def add_response(self, response): + raise NotImplementedError() + + @abc.abstractmethod + def send_termination(self, trailing_metadata, code, details): + raise NotImplementedError() + + @abc.abstractmethod + def add_termination_callback(self, callback): + raise NotImplementedError() + + +class Serverish(six.with_metaclass(abc.ABCMeta)): + + @abc.abstractmethod + def invoke_unary_unary( + self, method_descriptor, handler, invocation_metadata, request, + deadline): + raise NotImplementedError() + + @abc.abstractmethod + def invoke_unary_stream( + self, method_descriptor, handler, invocation_metadata, request, + deadline): + raise NotImplementedError() + + @abc.abstractmethod + def invoke_stream_unary( + self, method_descriptor, handler, invocation_metadata, deadline): + raise NotImplementedError() + + @abc.abstractmethod + def invoke_stream_stream( + self, method_descriptor, handler, invocation_metadata, deadline): + raise NotImplementedError() diff --git a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_services/__init__.py b/src/python/grpcio_testing/grpc_testing/_server/__init__.py index 5772620b60..759512949a 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_services/__init__.py +++ b/src/python/grpcio_testing/grpc_testing/_server/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2016 gRPC authors. +# Copyright 2017 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,3 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from grpc_testing._server import _server + + +def server_from_dictionary(descriptors_to_servicers, time): + return _server.server_from_descriptor_to_servicers( + descriptors_to_servicers, time) diff --git a/src/python/grpcio_testing/grpc_testing/_server/_handler.py b/src/python/grpcio_testing/grpc_testing/_server/_handler.py new file mode 100644 index 0000000000..b47e04c718 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_server/_handler.py @@ -0,0 +1,215 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import threading + +import grpc +from grpc_testing import _common + +_CLIENT_INACTIVE = object() + + +class Handler(_common.ServerRpcHandler): + + @abc.abstractmethod + def initial_metadata(self): + raise NotImplementedError() + + @abc.abstractmethod + def add_request(self, request): + raise NotImplementedError() + + @abc.abstractmethod + def take_response(self): + raise NotImplementedError() + + @abc.abstractmethod + def requests_closed(self): + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + raise NotImplementedError() + + @abc.abstractmethod + def unary_response_termination(self): + raise NotImplementedError() + + @abc.abstractmethod + def stream_response_termination(self): + raise NotImplementedError() + + +class _Handler(Handler): + + def __init__(self, requests_closed): + self._condition = threading.Condition() + self._requests = [] + self._requests_closed = requests_closed + self._initial_metadata = None + self._responses = [] + self._trailing_metadata = None + self._code = None + self._details = None + self._unary_response = None + self._expiration_future = None + self._termination_callbacks = [] + + def send_initial_metadata(self, initial_metadata): + with self._condition: + self._initial_metadata = initial_metadata + self._condition.notify_all() + + def take_request(self): + with self._condition: + while True: + if self._code is None: + if self._requests: + request = self._requests.pop(0) + self._condition.notify_all() + return _common.ServerRpcRead(request, False, False) + elif self._requests_closed: + return _common.REQUESTS_CLOSED + else: + self._condition.wait() + else: + return _common.TERMINATED + + def is_active(self): + with self._condition: + return self._code is None + + def add_response(self, response): + with self._condition: + self._responses.append(response) + self._condition.notify_all() + + def send_termination(self, trailing_metadata, code, details): + with self._condition: + self._trailing_metadata = trailing_metadata + self._code = code + self._details = details + if self._expiration_future is not None: + self._expiration_future.cancel() + self._condition.notify_all() + + def add_termination_callback(self, termination_callback): + with self._condition: + if self._code is None: + self._termination_callbacks.append(termination_callback) + return True + else: + return False + + def initial_metadata(self): + with self._condition: + while True: + if self._initial_metadata is None: + if self._code is None: + self._condition.wait() + else: + raise ValueError( + 'No initial metadata despite status code!') + else: + return self._initial_metadata + + def add_request(self, request): + with self._condition: + self._requests.append(request) + self._condition.notify_all() + + def take_response(self): + with self._condition: + while True: + if self._responses: + response = self._responses.pop(0) + self._condition.notify_all() + return response + elif self._code is None: + self._condition.wait() + else: + raise ValueError('No more responses!') + + def requests_closed(self): + with self._condition: + self._requests_closed = True + self._condition.notify_all() + + def cancel(self): + with self._condition: + if self._code is None: + self._code = _CLIENT_INACTIVE + termination_callbacks = self._termination_callbacks + self._termination_callbacks = None + if self._expiration_future is not None: + self._expiration_future.cancel() + self._condition.notify_all() + for termination_callback in termination_callbacks: + termination_callback() + + def unary_response_termination(self): + with self._condition: + while True: + if self._code is _CLIENT_INACTIVE: + raise ValueError('Huh? Cancelled but wanting status?') + elif self._code is None: + self._condition.wait() + else: + if self._unary_response is None: + if self._responses: + self._unary_response = self._responses.pop(0) + return ( + self._unary_response, self._trailing_metadata, + self._code, self._details,) + + + def stream_response_termination(self): + with self._condition: + while True: + if self._code is _CLIENT_INACTIVE: + raise ValueError('Huh? Cancelled but wanting status?') + elif self._code is None: + self._condition.wait() + else: + return self._trailing_metadata, self._code, self._details, + + def expire(self): + with self._condition: + if self._code is None: + if self._initial_metadata is None: + self._initial_metadata = _common.FUSSED_EMPTY_METADATA + self._trailing_metadata = _common.FUSSED_EMPTY_METADATA + self._code = grpc.StatusCode.DEADLINE_EXCEEDED + self._details = 'Took too much time!' + termination_callbacks = self._termination_callbacks + self._termination_callbacks = None + self._condition.notify_all() + for termination_callback in termination_callbacks: + termination_callback() + + def set_expiration_future(self, expiration_future): + with self._condition: + self._expiration_future = expiration_future + + +def handler_without_deadline(requests_closed): + return _Handler(requests_closed) + + +def handler_with_deadline(requests_closed, time, deadline): + handler = _Handler(requests_closed) + expiration_future = time.call_at(handler.expire, deadline) + handler.set_expiration_future(expiration_future) + return handler diff --git a/src/python/grpcio_testing/grpc_testing/_server/_rpc.py b/src/python/grpcio_testing/grpc_testing/_server/_rpc.py new file mode 100644 index 0000000000..f81876f4b2 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_server/_rpc.py @@ -0,0 +1,153 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading + +import grpc +from grpc_testing import _common + + +class Rpc(object): + + def __init__(self, handler, invocation_metadata): + self._condition = threading.Condition() + self._handler = handler + self._invocation_metadata = invocation_metadata + self._initial_metadata_sent = False + self._pending_trailing_metadata = None + self._pending_code = None + self._pending_details = None + self._callbacks = [] + self._active = True + self._rpc_errors = [] + + def _ensure_initial_metadata_sent(self): + if not self._initial_metadata_sent: + self._handler.send_initial_metadata(_common.FUSSED_EMPTY_METADATA) + self._initial_metadata_sent = True + + def _call_back(self): + callbacks = tuple(self._callbacks) + self._callbacks = None + + def call_back(): + for callback in callbacks: + try: + callback() + except Exception: # pylint: disable=broad-except + logging.exception('Exception calling server-side callback!') + + callback_calling_thread = threading.Thread(target=call_back) + callback_calling_thread.start() + + def _terminate(self, trailing_metadata, code, details): + if self._active: + self._active = False + self._handler.send_termination(trailing_metadata, code, details) + self._call_back() + self._condition.notify_all() + + def _complete(self): + if self._pending_trailing_metadata is None: + trailing_metadata = _common.FUSSED_EMPTY_METADATA + else: + trailing_metadata = self._pending_trailing_metadata + if self._pending_code is None: + code = grpc.StatusCode.OK + else: + code = self._pending_code + details = '' if self._pending_details is None else self._pending_details + self._terminate(trailing_metadata, code, details) + + def _abort(self, code, details): + self._terminate(_common.FUSSED_EMPTY_METADATA, code, details) + + def add_rpc_error(self, rpc_error): + with self._condition: + self._rpc_errors.append(rpc_error) + + def application_cancel(self): + with self._condition: + self._abort( + grpc.StatusCode.CANCELLED, + 'Cancelled by server-side application!') + + def application_exception_abort(self, exception): + with self._condition: + if exception not in self._rpc_errors: + logging.exception('Exception calling application!') + self._abort( + grpc.StatusCode.UNKNOWN, + 'Exception calling application: {}'.format(exception)) + + def extrinsic_abort(self): + with self._condition: + if self._active: + self._active = False + self._call_back() + self._condition.notify_all() + + def unary_response_complete(self, response): + with self._condition: + self._ensure_initial_metadata_sent() + self._handler.add_response(response) + self._complete() + + def stream_response(self, response): + with self._condition: + self._ensure_initial_metadata_sent() + self._handler.add_response(response) + + def stream_response_complete(self): + with self._condition: + self._ensure_initial_metadata_sent() + self._complete() + + def send_initial_metadata(self, initial_metadata): + with self._condition: + if self._initial_metadata_sent: + return False + else: + self._handler.send_initial_metadata(initial_metadata) + self._initial_metadata_sent = True + return True + + def is_active(self): + with self._condition: + return self._active + + def add_callback(self, callback): + with self._condition: + if self._callbacks is None: + return False + else: + self._callbacks.append(callback) + return True + + def invocation_metadata(self): + with self._condition: + return self._invocation_metadata + + def set_trailing_metadata(self, trailing_metadata): + with self._condition: + self._pending_trailing_metadata = trailing_metadata + + def set_code(self, code): + with self._condition: + self._pending_code = code + + def set_details(self, details): + with self._condition: + self._pending_details = details diff --git a/src/python/grpcio_testing/grpc_testing/_server/_server.py b/src/python/grpcio_testing/grpc_testing/_server/_server.py new file mode 100644 index 0000000000..66bcfc13c0 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_server/_server.py @@ -0,0 +1,149 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + +import grpc_testing +from grpc_testing import _common +from grpc_testing._server import _handler +from grpc_testing._server import _rpc +from grpc_testing._server import _server_rpc +from grpc_testing._server import _service +from grpc_testing._server import _servicer_context + + +def _implementation(descriptors_to_servicers, method_descriptor): + servicer = descriptors_to_servicers[method_descriptor.containing_service] + return getattr(servicer, method_descriptor.name) + + +def _unary_unary_service(request): + def service(implementation, rpc, servicer_context): + _service.unary_unary( + implementation, rpc, request, servicer_context) + return service + + +def _unary_stream_service(request): + def service(implementation, rpc, servicer_context): + _service.unary_stream( + implementation, rpc, request, servicer_context) + return service + + +def _stream_unary_service(handler): + def service(implementation, rpc, servicer_context): + _service.stream_unary(implementation, rpc, handler, servicer_context) + return service + + +def _stream_stream_service(handler): + def service(implementation, rpc, servicer_context): + _service.stream_stream(implementation, rpc, handler, servicer_context) + return service + + +class _Serverish(_common.Serverish): + + def __init__(self, descriptors_to_servicers, time): + self._descriptors_to_servicers = descriptors_to_servicers + self._time = time + + def _invoke( + self, service_behavior, method_descriptor, handler, + invocation_metadata, deadline): + implementation = _implementation( + self._descriptors_to_servicers, method_descriptor) + rpc = _rpc.Rpc(handler, invocation_metadata) + if handler.add_termination_callback(rpc.extrinsic_abort): + servicer_context = _servicer_context.ServicerContext( + rpc, self._time, deadline) + service_thread = threading.Thread( + target=service_behavior, + args=(implementation, rpc, servicer_context,)) + service_thread.start() + + def invoke_unary_unary( + self, method_descriptor, handler, invocation_metadata, request, + deadline): + self._invoke( + _unary_unary_service(request), method_descriptor, handler, + invocation_metadata, deadline) + + def invoke_unary_stream( + self, method_descriptor, handler, invocation_metadata, request, + deadline): + self._invoke( + _unary_stream_service(request), method_descriptor, handler, + invocation_metadata, deadline) + + def invoke_stream_unary( + self, method_descriptor, handler, invocation_metadata, deadline): + self._invoke( + _stream_unary_service(handler), method_descriptor, handler, + invocation_metadata, deadline) + + def invoke_stream_stream( + self, method_descriptor, handler, invocation_metadata, deadline): + self._invoke( + _stream_stream_service(handler), method_descriptor, handler, + invocation_metadata, deadline) + + +def _deadline_and_handler(requests_closed, time, timeout): + if timeout is None: + return None, _handler.handler_without_deadline(requests_closed) + else: + deadline = time.time() + timeout + handler = _handler.handler_with_deadline(requests_closed, time, deadline) + return deadline, handler + + +class _Server(grpc_testing.Server): + + def __init__(self, serverish, time): + self._serverish = serverish + self._time = time + + def invoke_unary_unary( + self, method_descriptor, invocation_metadata, request, timeout): + deadline, handler = _deadline_and_handler(True, self._time, timeout) + self._serverish.invoke_unary_unary( + method_descriptor, handler, invocation_metadata, request, deadline) + return _server_rpc.UnaryUnaryServerRpc(handler) + + def invoke_unary_stream( + self, method_descriptor, invocation_metadata, request, timeout): + deadline, handler = _deadline_and_handler(True, self._time, timeout) + self._serverish.invoke_unary_stream( + method_descriptor, handler, invocation_metadata, request, deadline) + return _server_rpc.UnaryStreamServerRpc(handler) + + def invoke_stream_unary( + self, method_descriptor, invocation_metadata, timeout): + deadline, handler = _deadline_and_handler(False, self._time, timeout) + self._serverish.invoke_stream_unary( + method_descriptor, handler, invocation_metadata, deadline) + return _server_rpc.StreamUnaryServerRpc(handler) + + def invoke_stream_stream( + self, method_descriptor, invocation_metadata, timeout): + deadline, handler = _deadline_and_handler(False, self._time, timeout) + self._serverish.invoke_stream_stream( + method_descriptor, handler, invocation_metadata, deadline) + return _server_rpc.StreamStreamServerRpc(handler) + + +def server_from_descriptor_to_servicers(descriptors_to_servicers, time): + return _Server(_Serverish(descriptors_to_servicers, time), time) diff --git a/src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py b/src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py new file mode 100644 index 0000000000..30de8ff0e2 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py @@ -0,0 +1,93 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc_testing + + +class UnaryUnaryServerRpc(grpc_testing.UnaryUnaryServerRpc): + + def __init__(self, handler): + self._handler = handler + + def initial_metadata(self): + return self._handler.initial_metadata() + + def cancel(self): + self._handler.cancel() + + def termination(self): + return self._handler.unary_response_termination() + + +class UnaryStreamServerRpc(grpc_testing.UnaryStreamServerRpc): + + def __init__(self, handler): + self._handler = handler + + def initial_metadata(self): + return self._handler.initial_metadata() + + def take_response(self): + return self._handler.take_response() + + def cancel(self): + self._handler.cancel() + + def termination(self): + return self._handler.stream_response_termination() + + +class StreamUnaryServerRpc(grpc_testing.StreamUnaryServerRpc): + + def __init__(self, handler): + self._handler = handler + + def initial_metadata(self): + return self._handler.initial_metadata() + + def send_request(self, request): + self._handler.add_request(request) + + def requests_closed(self): + self._handler.requests_closed() + + def cancel(self): + self._handler.cancel() + + def termination(self): + return self._handler.unary_response_termination() + + +class StreamStreamServerRpc(grpc_testing.StreamStreamServerRpc): + + def __init__(self, handler): + self._handler = handler + + def initial_metadata(self): + return self._handler.initial_metadata() + + def send_request(self, request): + self._handler.add_request(request) + + def requests_closed(self): + self._handler.requests_closed() + + def take_response(self): + return self._handler.take_response() + + def cancel(self): + self._handler.cancel() + + def termination(self): + return self._handler.stream_response_termination() diff --git a/src/python/grpcio_testing/grpc_testing/_server/_service.py b/src/python/grpcio_testing/grpc_testing/_server/_service.py new file mode 100644 index 0000000000..36b0a2f7ff --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_server/_service.py @@ -0,0 +1,88 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc + + +class _RequestIterator(object): + + def __init__(self, rpc, handler): + self._rpc = rpc + self._handler = handler + + def _next(self): + read = self._handler.take_request() + if read.requests_closed: + raise StopIteration() + elif read.terminated: + rpc_error = grpc.RpcError() + self._rpc.add_rpc_error(rpc_error) + raise rpc_error + else: + return read.request + + def __iter__(self): + return self + + def __next__(self): + return self._next() + + def next(self): + return self._next() + + +def _unary_response(argument, implementation, rpc, servicer_context): + try: + response = implementation(argument, servicer_context) + except Exception as exception: # pylint: disable=broad-except + rpc.application_exception_abort(exception) + else: + rpc.unary_response_complete(response) + + +def _stream_response(argument, implementation, rpc, servicer_context): + try: + response_iterator = implementation(argument, servicer_context) + except Exception as exception: # pylint: disable=broad-except + rpc.application_exception_abort(exception) + else: + while True: + try: + response = next(response_iterator) + except StopIteration: + rpc.stream_response_complete() + break + except Exception as exception: # pylint: disable=broad-except + rpc.application_exception_abort(exception) + break + else: + rpc.stream_response(response) + + +def unary_unary(implementation, rpc, request, servicer_context): + _unary_response(request, implementation, rpc, servicer_context) + + +def unary_stream(implementation, rpc, request, servicer_context): + _stream_response(request, implementation, rpc, servicer_context) + + +def stream_unary(implementation, rpc, handler, servicer_context): + _unary_response( + _RequestIterator(rpc, handler), implementation, rpc, servicer_context) + + +def stream_stream(implementation, rpc, handler, servicer_context): + _stream_response( + _RequestIterator(rpc, handler), implementation, rpc, servicer_context) diff --git a/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py b/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py new file mode 100644 index 0000000000..496689ded0 --- /dev/null +++ b/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py @@ -0,0 +1,74 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +from grpc_testing import _common + + +class ServicerContext(grpc.ServicerContext): + + def __init__(self, rpc, time, deadline): + self._rpc = rpc + self._time = time + self._deadline = deadline + + def is_active(self): + return self._rpc.is_active() + + def time_remaining(self): + if self._rpc.is_active(): + if self._deadline is None: + return None + else: + return max(0.0, self._deadline - self._time.time()) + else: + return 0.0 + + def cancel(self): + self._rpc.application_cancel() + + def add_callback(self, callback): + return self._rpc.add_callback(callback) + + def invocation_metadata(self): + return self._rpc.invocation_metadata() + + def peer(self): + raise NotImplementedError() + + def peer_identities(self): + raise NotImplementedError() + + def peer_identity_key(self): + raise NotImplementedError() + + def auth_context(self): + raise NotImplementedError() + + def send_initial_metadata(self, initial_metadata): + initial_metadata_sent = self._rpc.send_initial_metadata( + _common.fuss_with_metadata(initial_metadata)) + if not initial_metadata_sent: + raise ValueError( + 'ServicerContext.send_initial_metadata called too late!') + + def set_trailing_metadata(self, trailing_metadata): + self._rpc.set_trailing_metadata( + _common.fuss_with_metadata(trailing_metadata)) + + def set_code(self, code): + self._rpc.set_code(code) + + def set_details(self, details): + self._rpc.set_details(details) diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py index 41a75d46f6..592d08efc3 100644 --- a/src/python/grpcio_testing/grpc_version.py +++ b/src/python/grpcio_testing/grpc_version.py @@ -12,6 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! +# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!! -VERSION = '1.5.0.dev0' +VERSION='1.7.0.dev0' diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 162200112a..93f84572b7 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -67,55 +67,6 @@ class GatherProto(setuptools.Command): open(path, 'a').close() -class BuildProtoModules(setuptools.Command): - """Command to generate project *_pb2.py modules from proto files.""" - - description = 'build protobuf modules' - user_options = [ - ('include=', None, 'path patterns to include in protobuf generation'), - ('exclude=', None, 'path patterns to exclude from protobuf generation') - ] - - def initialize_options(self): - self.exclude = None - self.include = r'.*\.proto$' - - def finalize_options(self): - pass - - def run(self): - import grpc_tools.protoc as protoc - - include_regex = re.compile(self.include) - exclude_regex = re.compile(self.exclude) if self.exclude else None - paths = [] - for walk_root, directories, filenames in os.walk(PROTO_STEM): - for filename in filenames: - path = os.path.join(walk_root, filename) - if include_regex.match(path) and not ( - exclude_regex and exclude_regex.match(path)): - paths.append(path) - - # TODO(kpayson): It would be nice to do this in a batch command, - # but we currently have name conflicts in src/proto - for path in paths: - command = [ - 'grpc_tools.protoc', - '-I {}'.format(PROTO_STEM), - '--python_out={}'.format(PROTO_STEM), - '--grpc_python_out={}'.format(PROTO_STEM), - ] + [path] - if protoc.main(command) != 0: - sys.stderr.write( - 'warning: Command:\n{}\nFailed'.format(command)) - - # Generated proto directories dont include __init__.py, but - # these are needed for python package resolution - for walk_root, _, _ in os.walk(PROTO_STEM): - path = os.path.join(walk_root, '__init__.py') - open(path, 'a').close() - - class BuildPy(build_py.build_py): """Custom project build command.""" diff --git a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/__init__.py b/src/python/grpcio_tests/tests/_sanity/__init__.py index 5772620b60..5772620b60 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/__init__.py +++ b/src/python/grpcio_tests/tests/_sanity/__init__.py diff --git a/src/python/grpcio_tests/tests/unit/_sanity/_sanity_test.py b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py index 19bc8801eb..b4079850ff 100644 --- a/src/python/grpcio_tests/tests/unit/_sanity/_sanity_test.py +++ b/src/python/grpcio_tests/tests/_sanity/_sanity_test.py @@ -21,24 +21,25 @@ import six import tests -class Sanity(unittest.TestCase): +class SanityTest(unittest.TestCase): + + maxDiff = 32768 def testTestsJsonUpToDate(self): """Autodiscovers all test suites and checks that tests.json is up to date""" loader = tests.Loader() loader.loadTestsFromNames(['tests']) - test_suite_names = [ + test_suite_names = sorted({ test_case_class.id().rsplit('.', 1)[0] for test_case_class in tests._loader.iterate_suite_cases( loader.suite) - ] - test_suite_names = sorted(set(test_suite_names)) + }) tests_json_string = pkg_resources.resource_string('tests', 'tests.json') - if six.PY3: - tests_json_string = tests_json_string.decode() - tests_json = json.loads(tests_json_string) - self.assertListEqual(test_suite_names, tests_json) + tests_json = json.loads(tests_json_string.decode() + if six.PY3 else tests_json_string) + + self.assertSequenceEqual(tests_json, test_suite_names) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/http2/negative_http2_client.py b/src/python/grpcio_tests/tests/http2/negative_http2_client.py index 6d8a6bce77..8dab5b67f1 100644 --- a/src/python/grpcio_tests/tests/http2/negative_http2_client.py +++ b/src/python/grpcio_tests/tests/http2/negative_http2_client.py @@ -17,7 +17,7 @@ import argparse import grpc import time -from src.proto.grpc.testing import test_pb2 +from src.proto.grpc.testing import test_pb2_grpc from src.proto.grpc.testing import messages_pb2 @@ -147,7 +147,7 @@ def _stub(server_host, server_port): target = '{}:{}'.format(server_host, server_port) channel = grpc.insecure_channel(target) grpc.channel_ready_future(channel).result() - return test_pb2.TestServiceStub(channel) + return test_pb2_grpc.TestServiceStub(channel) def main(): diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index 47ae96472d..e520c08290 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -19,7 +19,7 @@ import os from google import auth as google_auth from google.auth import jwt as google_auth_jwt import grpc -from src.proto.grpc.testing import test_pb2 +from src.proto.grpc.testing import test_pb2_grpc from tests.interop import methods from tests.interop import resources @@ -106,9 +106,9 @@ def _stub(args): else: channel = grpc.insecure_channel(target) if args.test_case == "unimplemented_service": - return test_pb2.UnimplementedServiceStub(channel) + return test_pb2_grpc.UnimplementedServiceStub(channel) else: - return test_pb2.TestServiceStub(channel) + return test_pb2_grpc.TestServiceStub(channel) def _test_case_from_arg(test_case_arg): diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py index 71493bfec6..5b84001aab 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py @@ -33,7 +33,7 @@ from tests.unit.framework.common import test_constants import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2 import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2 import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2 -import tests.protoc_plugin.protos.service.test_service_pb2 as service_pb2 +import tests.protoc_plugin.protos.service.test_service_pb2_grpc as service_pb2_grpc # Identifiers of entities we expect to find in the generated module. STUB_IDENTIFIER = 'TestServiceStub' @@ -138,7 +138,7 @@ def _CreateService(): """ servicer_methods = _ServicerMethods() - class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)): + class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): def UnaryCall(self, request, context): return servicer_methods.UnaryCall(request, context) @@ -157,11 +157,12 @@ def _CreateService(): server = grpc.server( futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) - getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server) + getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), + server) port = server.add_insecure_port('[::]:0') server.start() channel = grpc.insecure_channel('localhost:{}'.format(port)) - stub = getattr(service_pb2, STUB_IDENTIFIER)(channel) + stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel) return _Service(servicer_methods, server, stub) @@ -173,16 +174,17 @@ def _CreateIncompleteService(): servicer_methods implements none of the methods required of it. """ - class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)): + class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): pass server = grpc.server( futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) - getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server) + getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), + server) port = server.add_insecure_port('[::]:0') server.start() channel = grpc.insecure_channel('localhost:{}'.format(port)) - stub = getattr(service_pb2, STUB_IDENTIFIER)(channel) + stub = getattr(service_pb2_grpc, STUB_IDENTIFIER)(channel) return _Service(None, server, stub) @@ -223,10 +225,11 @@ class PythonPluginTest(unittest.TestCase): def testImportAttributes(self): # check that we can access the generated module and its members. - self.assertIsNotNone(getattr(service_pb2, STUB_IDENTIFIER, None)) - self.assertIsNotNone(getattr(service_pb2, SERVICER_IDENTIFIER, None)) + self.assertIsNotNone(getattr(service_pb2_grpc, STUB_IDENTIFIER, None)) self.assertIsNotNone( - getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER, None)) + getattr(service_pb2_grpc, SERVICER_IDENTIFIER, None)) + self.assertIsNotNone( + getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER, None)) def testUpDown(self): service = _CreateService() diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py index 1aeb62a7c5..7868cdbfb3 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py @@ -12,22 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections +import abc from concurrent import futures import contextlib -import distutils.spawn -import errno import importlib import os -import os.path +from os import path import pkgutil +import platform import shutil -import subprocess import sys import tempfile -import threading import unittest -import platform + +import six import grpc from grpc_tools import protoc @@ -37,292 +35,285 @@ _MESSAGES_IMPORT = b'import "messages.proto";' _SPLIT_NAMESPACE = b'package grpc_protoc_plugin.invocation_testing.split;' _COMMON_NAMESPACE = b'package grpc_protoc_plugin.invocation_testing;' +_RELATIVE_PROTO_PATH = 'relative_proto_path' +_RELATIVE_PYTHON_OUT = 'relative_python_out' + @contextlib.contextmanager -def _system_path(path): +def _system_path(path_insertion): old_system_path = sys.path[:] - sys.path = sys.path[0:1] + path + sys.path[1:] + sys.path = sys.path[0:1] + path_insertion + sys.path[1:] yield sys.path = old_system_path -class DummySplitServicer(object): +# NOTE(nathaniel): https://twitter.com/exoplaneteer/status/677259364256747520 +# Life lesson "just always default to idempotence" reinforced. +def _create_directory_tree(root, path_components_sequence): + created = set() + for path_components in path_components_sequence: + thus_far = '' + for path_component in path_components: + relative_path = path.join(thus_far, path_component) + if relative_path not in created: + os.makedirs(path.join(root, relative_path)) + created.add(relative_path) + thus_far = path.join(thus_far, path_component) + + +def _massage_proto_content(proto_content, test_name_bytes, + messages_proto_relative_file_name_bytes): + package_substitution = (b'package grpc_protoc_plugin.invocation_testing.' + + test_name_bytes + b';') + common_namespace_substituted = proto_content.replace(_COMMON_NAMESPACE, + package_substitution) + split_namespace_substituted = common_namespace_substituted.replace( + _SPLIT_NAMESPACE, package_substitution) + message_import_replaced = split_namespace_substituted.replace( + _MESSAGES_IMPORT, + b'import "' + messages_proto_relative_file_name_bytes + b'";') + return message_import_replaced + + +def _packagify(directory): + for subdirectory, _, _ in os.walk(directory): + init_file_name = path.join(subdirectory, '__init__.py') + with open(init_file_name, 'wb') as init_file: + init_file.write(b'') - def __init__(self, request_class, response_class): - self.request_class = request_class - self.response_class = response_class + +class _Servicer(object): + + def __init__(self, response_class): + self._response_class = response_class def Call(self, request, context): - return self.response_class() + return self._response_class() -class SeparateTestMixin(object): +def _protoc(proto_path, python_out, grpc_python_out_flag, grpc_python_out, + absolute_proto_file_names): + args = [ + '', + '--proto_path={}'.format(proto_path), + ] + if python_out is not None: + args.append('--python_out={}'.format(python_out)) + if grpc_python_out is not None: + args.append('--grpc_python_out={}:{}'.format(grpc_python_out_flag, + grpc_python_out)) + args.extend(absolute_proto_file_names) + return protoc.main(args) - def testImportAttributes(self): - with _system_path([self.python_out_directory]): - pb2 = importlib.import_module(self.pb2_import) - pb2.Request - pb2.Response - if self.should_find_services_in_pb2: - pb2.TestServiceServicer - else: - with self.assertRaises(AttributeError): - pb2.TestServiceServicer - - with _system_path([self.grpc_python_out_directory]): - pb2_grpc = importlib.import_module(self.pb2_grpc_import) - pb2_grpc.TestServiceServicer - with self.assertRaises(AttributeError): - pb2_grpc.Request - with self.assertRaises(AttributeError): - pb2_grpc.Response - - def testCall(self): - with _system_path([self.python_out_directory]): - pb2 = importlib.import_module(self.pb2_import) - with _system_path([self.grpc_python_out_directory]): - pb2_grpc = importlib.import_module(self.pb2_grpc_import) - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) - pb2_grpc.add_TestServiceServicer_to_server( - DummySplitServicer(pb2.Request, pb2.Response), server) - port = server.add_insecure_port('[::]:0') - server.start() - channel = grpc.insecure_channel('localhost:{}'.format(port)) - stub = pb2_grpc.TestServiceStub(channel) - request = pb2.Request() - expected_response = pb2.Response() - response = stub.Call(request) - self.assertEqual(expected_response, response) - - -class CommonTestMixin(object): - - def testImportAttributes(self): - with _system_path([self.python_out_directory]): - pb2 = importlib.import_module(self.pb2_import) - pb2.Request - pb2.Response - if self.should_find_services_in_pb2: - pb2.TestServiceServicer - else: - with self.assertRaises(AttributeError): - pb2.TestServiceServicer - - with _system_path([self.grpc_python_out_directory]): - pb2_grpc = importlib.import_module(self.pb2_grpc_import) - pb2_grpc.TestServiceServicer - with self.assertRaises(AttributeError): - pb2_grpc.Request - with self.assertRaises(AttributeError): - pb2_grpc.Response - - def testCall(self): - with _system_path([self.python_out_directory]): - pb2 = importlib.import_module(self.pb2_import) - with _system_path([self.grpc_python_out_directory]): - pb2_grpc = importlib.import_module(self.pb2_grpc_import) - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) - pb2_grpc.add_TestServiceServicer_to_server( - DummySplitServicer(pb2.Request, pb2.Response), server) - port = server.add_insecure_port('[::]:0') - server.start() - channel = grpc.insecure_channel('localhost:{}'.format(port)) - stub = pb2_grpc.TestServiceStub(channel) - request = pb2.Request() - expected_response = pb2.Response() - response = stub.Call(request) - self.assertEqual(expected_response, response) - - -@unittest.skipIf(platform.python_implementation() == "PyPy", - "Skip test if run with PyPy") -class SameSeparateTest(unittest.TestCase, SeparateTestMixin): - def setUp(self): - same_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing', 'same.proto') - self.directory = tempfile.mkdtemp(suffix='same_separate', dir='.') - self.proto_directory = os.path.join(self.directory, 'proto_path') - self.python_out_directory = os.path.join(self.directory, 'python_out') - self.grpc_python_out_directory = os.path.join(self.directory, - 'grpc_python_out') - os.makedirs(self.proto_directory) - os.makedirs(self.python_out_directory) - os.makedirs(self.grpc_python_out_directory) - same_proto_file = os.path.join(self.proto_directory, - 'same_separate.proto') - open(same_proto_file, 'wb').write( - same_proto_contents.replace( - _COMMON_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.same_separate;')) - protoc_result = protoc.main([ - '', - '--proto_path={}'.format(self.proto_directory), - '--python_out={}'.format(self.python_out_directory), - '--grpc_python_out=grpc_2_0:{}'.format( - self.grpc_python_out_directory), - same_proto_file, - ]) - if protoc_result != 0: - raise Exception("unexpected protoc error") - open(os.path.join(self.grpc_python_out_directory, '__init__.py'), - 'w').write('') - open(os.path.join(self.python_out_directory, '__init__.py'), - 'w').write('') - self.pb2_import = 'same_separate_pb2' - self.pb2_grpc_import = 'same_separate_pb2_grpc' - self.should_find_services_in_pb2 = False +class _Mid2016ProtocStyle(object): - def tearDown(self): - shutil.rmtree(self.directory) + def name(self): + return 'Mid2016ProtocStyle' + def grpc_in_pb2_expected(self): + return True -@unittest.skipIf(platform.python_implementation() == "PyPy", - "Skip test if run with PyPy") -class SameCommonTest(unittest.TestCase, CommonTestMixin): + def protoc(self, proto_path, python_out, absolute_proto_file_names): + return (_protoc(proto_path, python_out, 'grpc_1_0', python_out, + absolute_proto_file_names),) - def setUp(self): - same_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing', 'same.proto') - self.directory = tempfile.mkdtemp(suffix='same_common', dir='.') - self.proto_directory = os.path.join(self.directory, 'proto_path') - self.python_out_directory = os.path.join(self.directory, 'python_out') - self.grpc_python_out_directory = self.python_out_directory - os.makedirs(self.proto_directory) - os.makedirs(self.python_out_directory) - same_proto_file = os.path.join(self.proto_directory, - 'same_common.proto') - open(same_proto_file, 'wb').write( - same_proto_contents.replace( - _COMMON_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.same_common;')) - - protoc_result = protoc.main([ - '', - '--proto_path={}'.format(self.proto_directory), - '--python_out={}'.format(self.python_out_directory), - '--grpc_python_out={}'.format(self.grpc_python_out_directory), - same_proto_file, - ]) - if protoc_result != 0: - raise Exception("unexpected protoc error") - open(os.path.join(self.python_out_directory, '__init__.py'), - 'w').write('') - self.pb2_import = 'same_common_pb2' - self.pb2_grpc_import = 'same_common_pb2_grpc' - self.should_find_services_in_pb2 = True - def tearDown(self): - shutil.rmtree(self.directory) +class _SingleProtocExecutionProtocStyle(object): + def name(self): + return 'SingleProtocExecutionProtocStyle' -@unittest.skipIf(platform.python_implementation() == "PyPy", - "Skip test if run with PyPy") -class SplitCommonTest(unittest.TestCase, CommonTestMixin): + def grpc_in_pb2_expected(self): + return False - def setUp(self): - services_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing.split_services', - 'services.proto') - messages_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing.split_messages', - 'messages.proto') - self.directory = tempfile.mkdtemp(suffix='split_common', dir='.') - self.proto_directory = os.path.join(self.directory, 'proto_path') - self.python_out_directory = os.path.join(self.directory, 'python_out') - self.grpc_python_out_directory = self.python_out_directory - os.makedirs(self.proto_directory) - os.makedirs(self.python_out_directory) - services_proto_file = os.path.join(self.proto_directory, - 'split_common_services.proto') - messages_proto_file = os.path.join(self.proto_directory, - 'split_common_messages.proto') - open(services_proto_file, 'wb').write( - services_proto_contents.replace( - _MESSAGES_IMPORT, b'import "split_common_messages.proto";') - .replace( - _SPLIT_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.split_common;')) - open(messages_proto_file, 'wb').write( - messages_proto_contents.replace( - _SPLIT_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.split_common;')) - protoc_result = protoc.main([ - '', - '--proto_path={}'.format(self.proto_directory), - '--python_out={}'.format(self.python_out_directory), - '--grpc_python_out={}'.format(self.grpc_python_out_directory), - services_proto_file, - messages_proto_file, - ]) - if protoc_result != 0: - raise Exception("unexpected protoc error") - open(os.path.join(self.python_out_directory, '__init__.py'), - 'w').write('') - self.pb2_import = 'split_common_messages_pb2' - self.pb2_grpc_import = 'split_common_services_pb2_grpc' - self.should_find_services_in_pb2 = False + def protoc(self, proto_path, python_out, absolute_proto_file_names): + return (_protoc(proto_path, python_out, 'grpc_2_0', python_out, + absolute_proto_file_names),) + + +class _ProtoBeforeGrpcProtocStyle(object): + + def name(self): + return 'ProtoBeforeGrpcProtocStyle' + + def grpc_in_pb2_expected(self): + return False + + def protoc(self, proto_path, python_out, absolute_proto_file_names): + pb2_protoc_exit_code = _protoc(proto_path, python_out, None, None, + absolute_proto_file_names) + pb2_grpc_protoc_exit_code = _protoc( + proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names) + return pb2_protoc_exit_code, pb2_grpc_protoc_exit_code, - def tearDown(self): - shutil.rmtree(self.directory) +class _GrpcBeforeProtoProtocStyle(object): -@unittest.skipIf(platform.python_implementation() == "PyPy", - "Skip test if run with PyPy") -class SplitSeparateTest(unittest.TestCase, SeparateTestMixin): + def name(self): + return 'GrpcBeforeProtoProtocStyle' + + def grpc_in_pb2_expected(self): + return False + + def protoc(self, proto_path, python_out, absolute_proto_file_names): + pb2_grpc_protoc_exit_code = _protoc( + proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names) + pb2_protoc_exit_code = _protoc(proto_path, python_out, None, None, + absolute_proto_file_names) + return pb2_grpc_protoc_exit_code, pb2_protoc_exit_code, + + +_PROTOC_STYLES = (_Mid2016ProtocStyle(), _SingleProtocExecutionProtocStyle(), + _ProtoBeforeGrpcProtocStyle(), _GrpcBeforeProtoProtocStyle(),) + + +@unittest.skipIf(platform.python_implementation() == 'PyPy', + 'Skip test if run with PyPy!') +class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)): def setUp(self): - services_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing.split_services', - 'services.proto') - messages_proto_contents = pkgutil.get_data( - 'tests.protoc_plugin.protos.invocation_testing.split_messages', - 'messages.proto') - self.directory = tempfile.mkdtemp(suffix='split_separate', dir='.') - self.proto_directory = os.path.join(self.directory, 'proto_path') - self.python_out_directory = os.path.join(self.directory, 'python_out') - self.grpc_python_out_directory = os.path.join(self.directory, - 'grpc_python_out') - os.makedirs(self.proto_directory) - os.makedirs(self.python_out_directory) - os.makedirs(self.grpc_python_out_directory) - services_proto_file = os.path.join(self.proto_directory, - 'split_separate_services.proto') - messages_proto_file = os.path.join(self.proto_directory, - 'split_separate_messages.proto') - open(services_proto_file, 'wb').write( - services_proto_contents.replace( - _MESSAGES_IMPORT, b'import "split_separate_messages.proto";') - .replace( - _SPLIT_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.split_separate;' - )) - open(messages_proto_file, 'wb').write( - messages_proto_contents.replace( - _SPLIT_NAMESPACE, - b'package grpc_protoc_plugin.invocation_testing.split_separate;' - )) - protoc_result = protoc.main([ - '', - '--proto_path={}'.format(self.proto_directory), - '--python_out={}'.format(self.python_out_directory), - '--grpc_python_out=grpc_2_0:{}'.format( - self.grpc_python_out_directory), - services_proto_file, - messages_proto_file, - ]) - if protoc_result != 0: - raise Exception("unexpected protoc error") - open(os.path.join(self.python_out_directory, '__init__.py'), - 'w').write('') - self.pb2_import = 'split_separate_messages_pb2' - self.pb2_grpc_import = 'split_separate_services_pb2_grpc' - self.should_find_services_in_pb2 = False + self._directory = tempfile.mkdtemp(suffix=self.NAME, dir='.') + self._proto_path = path.join(self._directory, _RELATIVE_PROTO_PATH) + self._python_out = path.join(self._directory, _RELATIVE_PYTHON_OUT) + + os.makedirs(self._proto_path) + os.makedirs(self._python_out) + + proto_directories_and_names = { + (self.MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES, + self.MESSAGES_PROTO_FILE_NAME,), + (self.SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES, + self.SERVICES_PROTO_FILE_NAME,), + } + messages_proto_relative_file_name_forward_slashes = '/'.join( + self.MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES + ( + self.MESSAGES_PROTO_FILE_NAME,)) + _create_directory_tree(self._proto_path, ( + relative_proto_directory_names + for relative_proto_directory_names, _ in proto_directories_and_names + )) + self._absolute_proto_file_names = set() + for relative_directory_names, file_name in proto_directories_and_names: + absolute_proto_file_name = path.join( + self._proto_path, *relative_directory_names + (file_name,)) + raw_proto_content = pkgutil.get_data( + 'tests.protoc_plugin.protos.invocation_testing', + path.join(*relative_directory_names + (file_name,))) + massaged_proto_content = _massage_proto_content( + raw_proto_content, + self.NAME.encode(), + messages_proto_relative_file_name_forward_slashes.encode()) + with open(absolute_proto_file_name, 'wb') as proto_file: + proto_file.write(massaged_proto_content) + self._absolute_proto_file_names.add(absolute_proto_file_name) def tearDown(self): - shutil.rmtree(self.directory) + shutil.rmtree(self._directory) + + def _protoc(self): + protoc_exit_codes = self.PROTOC_STYLE.protoc( + self._proto_path, self._python_out, self._absolute_proto_file_names) + for protoc_exit_code in protoc_exit_codes: + self.assertEqual(0, protoc_exit_code) + + _packagify(self._python_out) + + generated_modules = {} + expected_generated_full_module_names = { + self.EXPECTED_MESSAGES_PB2, + self.EXPECTED_SERVICES_PB2, + self.EXPECTED_SERVICES_PB2_GRPC, + } + with _system_path([self._python_out]): + for full_module_name in expected_generated_full_module_names: + module = importlib.import_module(full_module_name) + generated_modules[full_module_name] = module + + self._messages_pb2 = generated_modules[self.EXPECTED_MESSAGES_PB2] + self._services_pb2 = generated_modules[self.EXPECTED_SERVICES_PB2] + self._services_pb2_grpc = generated_modules[ + self.EXPECTED_SERVICES_PB2_GRPC] + + def _services_modules(self): + if self.PROTOC_STYLE.grpc_in_pb2_expected(): + return self._services_pb2, self._services_pb2_grpc, + else: + return self._services_pb2_grpc, + + def test_imported_attributes(self): + self._protoc() + + self._messages_pb2.Request + self._messages_pb2.Response + self._services_pb2.DESCRIPTOR.services_by_name['TestService'] + for services_module in self._services_modules(): + services_module.TestServiceStub + services_module.TestServiceServicer + services_module.add_TestServiceServicer_to_server + + def test_call(self): + self._protoc() + + for services_module in self._services_modules(): + server = grpc.server( + futures.ThreadPoolExecutor( + max_workers=test_constants.POOL_SIZE)) + services_module.add_TestServiceServicer_to_server( + _Servicer(self._messages_pb2.Response), server) + port = server.add_insecure_port('[::]:0') + server.start() + channel = grpc.insecure_channel('localhost:{}'.format(port)) + stub = services_module.TestServiceStub(channel) + response = stub.Call(self._messages_pb2.Request()) + self.assertEqual(self._messages_pb2.Response(), response) + + +def _create_test_case_class(split_proto, protoc_style): + attributes = {} + + name = '{}{}'.format('SplitProto' if split_proto else 'SameProto', + protoc_style.name()) + attributes['NAME'] = name + + if split_proto: + attributes['MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES'] = ( + 'split_messages', 'sub',) + attributes['MESSAGES_PROTO_FILE_NAME'] = 'messages.proto' + attributes['SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES'] = ( + 'split_services',) + attributes['SERVICES_PROTO_FILE_NAME'] = 'services.proto' + attributes['EXPECTED_MESSAGES_PB2'] = 'split_messages.sub.messages_pb2' + attributes['EXPECTED_SERVICES_PB2'] = 'split_services.services_pb2' + attributes['EXPECTED_SERVICES_PB2_GRPC'] = ( + 'split_services.services_pb2_grpc') + else: + attributes['MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES'] = () + attributes['MESSAGES_PROTO_FILE_NAME'] = 'same.proto' + attributes['SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES'] = () + attributes['SERVICES_PROTO_FILE_NAME'] = 'same.proto' + attributes['EXPECTED_MESSAGES_PB2'] = 'same_pb2' + attributes['EXPECTED_SERVICES_PB2'] = 'same_pb2' + attributes['EXPECTED_SERVICES_PB2_GRPC'] = 'same_pb2_grpc' + + attributes['PROTOC_STYLE'] = protoc_style + + attributes['__module__'] = _Test.__module__ + + return type('{}Test'.format(name), (_Test,), attributes) + + +def _create_test_case_classes(): + for split_proto in (False, True,): + for protoc_style in _PROTOC_STYLES: + yield _create_test_case_class(split_proto, protoc_style) + + +def load_tests(loader, tests, pattern): + tests = tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in _create_test_case_classes()) + return unittest.TestSuite(tests=tests) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py index 83f21ecbbb..424b153ff8 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py @@ -12,19 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import argparse import contextlib -import distutils.spawn -import errno -import itertools +import importlib import os -import pkg_resources +from os import path +import pkgutil import shutil -import subprocess import sys import tempfile import threading -import time import unittest from six import moves @@ -33,12 +29,22 @@ from grpc.beta import implementations from grpc.beta import interfaces from grpc.framework.foundation import future from grpc.framework.interfaces.face import face +from grpc_tools import protoc from tests.unit.framework.common import test_constants -import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2 -import tests.protoc_plugin.protos.requests.r.test_requests_pb2 as request_pb2 -import tests.protoc_plugin.protos.responses.test_responses_pb2 as response_pb2 -import tests.protoc_plugin.protos.service.test_service_pb2 as service_pb2 +_RELATIVE_PROTO_PATH = 'relative_proto_path' +_RELATIVE_PYTHON_OUT = 'relative_python_out' + +_PROTO_FILES_PATH_COMPONENTS = ( + ('beta_grpc_plugin_test', 'payload', 'test_payload.proto',), + ('beta_grpc_plugin_test', 'requests', 'r', 'test_requests.proto',), + ('beta_grpc_plugin_test', 'responses', 'test_responses.proto',), + ('beta_grpc_plugin_test', 'service', 'test_service.proto',),) + +_PAYLOAD_PB2 = 'beta_grpc_plugin_test.payload.test_payload_pb2' +_REQUESTS_PB2 = 'beta_grpc_plugin_test.requests.r.test_requests_pb2' +_RESPONSES_PB2 = 'beta_grpc_plugin_test.responses.test_responses_pb2' +_SERVICE_PB2 = 'beta_grpc_plugin_test.service.test_service_pb2' # Identifiers of entities we expect to find in the generated module. SERVICER_IDENTIFIER = 'BetaTestServiceServicer' @@ -47,12 +53,50 @@ SERVER_FACTORY_IDENTIFIER = 'beta_create_TestService_server' STUB_FACTORY_IDENTIFIER = 'beta_create_TestService_stub' +@contextlib.contextmanager +def _system_path(path_insertion): + old_system_path = sys.path[:] + sys.path = sys.path[0:1] + path_insertion + sys.path[1:] + yield + sys.path = old_system_path + + +def _create_directory_tree(root, path_components_sequence): + created = set() + for path_components in path_components_sequence: + thus_far = '' + for path_component in path_components: + relative_path = path.join(thus_far, path_component) + if relative_path not in created: + os.makedirs(path.join(root, relative_path)) + created.add(relative_path) + thus_far = path.join(thus_far, path_component) + + +def _massage_proto_content(raw_proto_content): + imports_substituted = raw_proto_content.replace( + b'import "tests/protoc_plugin/protos/', + b'import "beta_grpc_plugin_test/') + package_statement_substituted = imports_substituted.replace( + b'package grpc_protoc_plugin;', b'package beta_grpc_protoc_plugin;') + return package_statement_substituted + + +def _packagify(directory): + for subdirectory, _, _ in os.walk(directory): + init_file_name = path.join(subdirectory, '__init__.py') + with open(init_file_name, 'wb') as init_file: + init_file.write(b'') + + class _ServicerMethods(object): - def __init__(self): + def __init__(self, payload_pb2, responses_pb2): self._condition = threading.Condition() self._paused = False self._fail = False + self._payload_pb2 = payload_pb2 + self._responses_pb2 = responses_pb2 @contextlib.contextmanager def pause(self): # pylint: disable=invalid-name @@ -79,22 +123,22 @@ class _ServicerMethods(object): self._condition.wait() def UnaryCall(self, request, unused_rpc_context): - response = response_pb2.SimpleResponse() - response.payload.payload_type = payload_pb2.COMPRESSABLE + response = self._responses_pb2.SimpleResponse() + response.payload.payload_type = self._payload_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * request.response_size self._control() return response def StreamingOutputCall(self, request, unused_rpc_context): for parameter in request.response_parameters: - response = response_pb2.StreamingOutputCallResponse() - response.payload.payload_type = payload_pb2.COMPRESSABLE + response = self._responses_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._payload_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() yield response def StreamingInputCall(self, request_iter, unused_rpc_context): - response = response_pb2.StreamingInputCallResponse() + response = self._responses_pb2.StreamingInputCallResponse() aggregated_payload_size = 0 for request in request_iter: aggregated_payload_size += len(request.payload.payload_compressable) @@ -105,8 +149,8 @@ class _ServicerMethods(object): def FullDuplexCall(self, request_iter, unused_rpc_context): for request in request_iter: for parameter in request.response_parameters: - response = response_pb2.StreamingOutputCallResponse() - response.payload.payload_type = payload_pb2.COMPRESSABLE + response = self._responses_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._payload_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() yield response @@ -115,8 +159,8 @@ class _ServicerMethods(object): responses = [] for request in request_iter: for parameter in request.response_parameters: - response = response_pb2.StreamingOutputCallResponse() - response.payload.payload_type = payload_pb2.COMPRESSABLE + response = self._responses_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._payload_pb2.COMPRESSABLE response.payload.payload_compressable = 'a' * parameter.size self._control() responses.append(response) @@ -125,7 +169,7 @@ class _ServicerMethods(object): @contextlib.contextmanager -def _CreateService(): +def _CreateService(payload_pb2, responses_pb2, service_pb2): """Provides a servicer backend and a stub. The servicer is just the implementation of the actual servicer passed to the @@ -136,7 +180,7 @@ def _CreateService(): the service bound to the stub and and stub is the stub on which to invoke RPCs. """ - servicer_methods = _ServicerMethods() + servicer_methods = _ServicerMethods(payload_pb2, responses_pb2) class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)): @@ -161,12 +205,12 @@ def _CreateService(): server.start() channel = implementations.insecure_channel('localhost', port) stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel) - yield (servicer_methods, stub) + yield servicer_methods, stub, server.stop(0) @contextlib.contextmanager -def _CreateIncompleteService(): +def _CreateIncompleteService(service_pb2): """Provides a servicer backend that fails to implement methods and its stub. The servicer is just the implementation of the actual servicer passed to the @@ -192,16 +236,16 @@ def _CreateIncompleteService(): server.stop(0) -def _streaming_input_request_iterator(): +def _streaming_input_request_iterator(payload_pb2, requests_pb2): for _ in range(3): - request = request_pb2.StreamingInputCallRequest() + request = requests_pb2.StreamingInputCallRequest() request.payload.payload_type = payload_pb2.COMPRESSABLE request.payload.payload_compressable = 'a' yield request -def _streaming_output_request(): - request = request_pb2.StreamingOutputCallRequest() +def _streaming_output_request(requests_pb2): + request = requests_pb2.StreamingOutputCallRequest() sizes = [1, 2, 3] request.response_parameters.add(size=sizes[0], interval_us=0) request.response_parameters.add(size=sizes[1], interval_us=0) @@ -209,11 +253,11 @@ def _streaming_output_request(): return request -def _full_duplex_request_iterator(): - request = request_pb2.StreamingOutputCallRequest() +def _full_duplex_request_iterator(requests_pb2): + request = requests_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request - request = request_pb2.StreamingOutputCallRequest() + request = requests_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=2, interval_us=0) request.response_parameters.add(size=3, interval_us=0) yield request @@ -227,22 +271,78 @@ class PythonPluginTest(unittest.TestCase): methods and does not exist for response-streaming methods. """ + def setUp(self): + self._directory = tempfile.mkdtemp(dir='.') + self._proto_path = path.join(self._directory, _RELATIVE_PROTO_PATH) + self._python_out = path.join(self._directory, _RELATIVE_PYTHON_OUT) + + os.makedirs(self._proto_path) + os.makedirs(self._python_out) + + directories_path_components = { + proto_file_path_components[:-1] + for proto_file_path_components in _PROTO_FILES_PATH_COMPONENTS + } + _create_directory_tree(self._proto_path, directories_path_components) + self._proto_file_names = set() + for proto_file_path_components in _PROTO_FILES_PATH_COMPONENTS: + raw_proto_content = pkgutil.get_data( + 'tests.protoc_plugin.protos', + path.join(*proto_file_path_components[1:])) + massaged_proto_content = _massage_proto_content(raw_proto_content) + proto_file_name = path.join(self._proto_path, + *proto_file_path_components) + with open(proto_file_name, 'wb') as proto_file: + proto_file.write(massaged_proto_content) + self._proto_file_names.add(proto_file_name) + + def tearDown(self): + shutil.rmtree(self._directory) + + def _protoc(self): + args = [ + '', + '--proto_path={}'.format(self._proto_path), + '--python_out={}'.format(self._python_out), + '--grpc_python_out=grpc_1_0:{}'.format(self._python_out), + ] + list(self._proto_file_names) + protoc_exit_code = protoc.main(args) + self.assertEqual(0, protoc_exit_code) + + _packagify(self._python_out) + + with _system_path([ + self._python_out, + ]): + self._payload_pb2 = importlib.import_module(_PAYLOAD_PB2) + self._requests_pb2 = importlib.import_module(_REQUESTS_PB2) + self._responses_pb2 = importlib.import_module(_RESPONSES_PB2) + self._service_pb2 = importlib.import_module(_SERVICE_PB2) + def testImportAttributes(self): + self._protoc() + # check that we can access the generated module and its members. - self.assertIsNotNone(getattr(service_pb2, SERVICER_IDENTIFIER, None)) - self.assertIsNotNone(getattr(service_pb2, STUB_IDENTIFIER, None)) self.assertIsNotNone( - getattr(service_pb2, SERVER_FACTORY_IDENTIFIER, None)) + getattr(self._service_pb2, SERVICER_IDENTIFIER, None)) + self.assertIsNotNone(getattr(self._service_pb2, STUB_IDENTIFIER, None)) self.assertIsNotNone( - getattr(service_pb2, STUB_FACTORY_IDENTIFIER, None)) + getattr(self._service_pb2, SERVER_FACTORY_IDENTIFIER, None)) + self.assertIsNotNone( + getattr(self._service_pb2, STUB_FACTORY_IDENTIFIER, None)) def testUpDown(self): - with _CreateService(): - request_pb2.SimpleRequest(response_size=13) + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2): + self._requests_pb2.SimpleRequest(response_size=13) def testIncompleteServicer(self): - with _CreateIncompleteService() as (_, stub): - request = request_pb2.SimpleRequest(response_size=13) + self._protoc() + + with _CreateIncompleteService(self._service_pb2) as (_, stub): + request = self._requests_pb2.SimpleRequest(response_size=13) try: stub.UnaryCall(request, test_constants.LONG_TIMEOUT) except face.AbortionError as error: @@ -250,15 +350,21 @@ class PythonPluginTest(unittest.TestCase): error.code) def testUnaryCall(self): - with _CreateService() as (methods, stub): - request = request_pb2.SimpleRequest(response_size=13) + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + request = self._requests_pb2.SimpleRequest(response_size=13) response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT) expected_response = methods.UnaryCall(request, 'not a real context!') self.assertEqual(expected_response, response) def testUnaryCallFuture(self): - with _CreateService() as (methods, stub): - request = request_pb2.SimpleRequest(response_size=13) + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + request = self._requests_pb2.SimpleRequest(response_size=13) # Check that the call does not block waiting for the server to respond. with methods.pause(): response_future = stub.UnaryCall.future( @@ -268,8 +374,11 @@ class PythonPluginTest(unittest.TestCase): self.assertEqual(expected_response, response) def testUnaryCallFutureExpired(self): - with _CreateService() as (methods, stub): - request = request_pb2.SimpleRequest(response_size=13) + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + request = self._requests_pb2.SimpleRequest(response_size=13) with methods.pause(): response_future = stub.UnaryCall.future( request, test_constants.SHORT_TIMEOUT) @@ -277,24 +386,33 @@ class PythonPluginTest(unittest.TestCase): response_future.result() def testUnaryCallFutureCancelled(self): - with _CreateService() as (methods, stub): - request = request_pb2.SimpleRequest(response_size=13) + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + request = self._requests_pb2.SimpleRequest(response_size=13) with methods.pause(): response_future = stub.UnaryCall.future(request, 1) response_future.cancel() self.assertTrue(response_future.cancelled()) def testUnaryCallFutureFailed(self): - with _CreateService() as (methods, stub): - request = request_pb2.SimpleRequest(response_size=13) + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + request = self._requests_pb2.SimpleRequest(response_size=13) with methods.fail(): response_future = stub.UnaryCall.future( request, test_constants.LONG_TIMEOUT) self.assertIsNotNone(response_future.exception()) def testStreamingOutputCall(self): - with _CreateService() as (methods, stub): - request = _streaming_output_request() + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + request = _streaming_output_request(self._requests_pb2) responses = stub.StreamingOutputCall(request, test_constants.LONG_TIMEOUT) expected_responses = methods.StreamingOutputCall( @@ -304,8 +422,11 @@ class PythonPluginTest(unittest.TestCase): self.assertEqual(expected_response, response) def testStreamingOutputCallExpired(self): - with _CreateService() as (methods, stub): - request = _streaming_output_request() + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + request = _streaming_output_request(self._requests_pb2) with methods.pause(): responses = stub.StreamingOutputCall( request, test_constants.SHORT_TIMEOUT) @@ -313,8 +434,11 @@ class PythonPluginTest(unittest.TestCase): list(responses) def testStreamingOutputCallCancelled(self): - with _CreateService() as (methods, stub): - request = _streaming_output_request() + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + request = _streaming_output_request(self._requests_pb2) responses = stub.StreamingOutputCall(request, test_constants.LONG_TIMEOUT) next(responses) @@ -323,8 +447,11 @@ class PythonPluginTest(unittest.TestCase): next(responses) def testStreamingOutputCallFailed(self): - with _CreateService() as (methods, stub): - request = _streaming_output_request() + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + request = _streaming_output_request(self._requests_pb2) with methods.fail(): responses = stub.StreamingOutputCall(request, 1) self.assertIsNotNone(responses) @@ -332,30 +459,46 @@ class PythonPluginTest(unittest.TestCase): next(responses) def testStreamingInputCall(self): - with _CreateService() as (methods, stub): + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): response = stub.StreamingInputCall( - _streaming_input_request_iterator(), + _streaming_input_request_iterator(self._payload_pb2, + self._requests_pb2), test_constants.LONG_TIMEOUT) expected_response = methods.StreamingInputCall( - _streaming_input_request_iterator(), 'not a real RpcContext!') + _streaming_input_request_iterator(self._payload_pb2, + self._requests_pb2), + 'not a real RpcContext!') self.assertEqual(expected_response, response) def testStreamingInputCallFuture(self): - with _CreateService() as (methods, stub): + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): with methods.pause(): response_future = stub.StreamingInputCall.future( - _streaming_input_request_iterator(), + _streaming_input_request_iterator(self._payload_pb2, + self._requests_pb2), test_constants.LONG_TIMEOUT) response = response_future.result() expected_response = methods.StreamingInputCall( - _streaming_input_request_iterator(), 'not a real RpcContext!') + _streaming_input_request_iterator(self._payload_pb2, + self._requests_pb2), + 'not a real RpcContext!') self.assertEqual(expected_response, response) def testStreamingInputCallFutureExpired(self): - with _CreateService() as (methods, stub): + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): with methods.pause(): response_future = stub.StreamingInputCall.future( - _streaming_input_request_iterator(), + _streaming_input_request_iterator(self._payload_pb2, + self._requests_pb2), test_constants.SHORT_TIMEOUT) with self.assertRaises(face.ExpirationError): response_future.result() @@ -363,10 +506,14 @@ class PythonPluginTest(unittest.TestCase): face.ExpirationError) def testStreamingInputCallFutureCancelled(self): - with _CreateService() as (methods, stub): + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): with methods.pause(): response_future = stub.StreamingInputCall.future( - _streaming_input_request_iterator(), + _streaming_input_request_iterator(self._payload_pb2, + self._requests_pb2), test_constants.LONG_TIMEOUT) response_future.cancel() self.assertTrue(response_future.cancelled()) @@ -374,26 +521,38 @@ class PythonPluginTest(unittest.TestCase): response_future.result() def testStreamingInputCallFutureFailed(self): - with _CreateService() as (methods, stub): + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): with methods.fail(): response_future = stub.StreamingInputCall.future( - _streaming_input_request_iterator(), + _streaming_input_request_iterator(self._payload_pb2, + self._requests_pb2), test_constants.LONG_TIMEOUT) self.assertIsNotNone(response_future.exception()) def testFullDuplexCall(self): - with _CreateService() as (methods, stub): - responses = stub.FullDuplexCall(_full_duplex_request_iterator(), - test_constants.LONG_TIMEOUT) + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + responses = stub.FullDuplexCall( + _full_duplex_request_iterator(self._requests_pb2), + test_constants.LONG_TIMEOUT) expected_responses = methods.FullDuplexCall( - _full_duplex_request_iterator(), 'not a real RpcContext!') + _full_duplex_request_iterator(self._requests_pb2), + 'not a real RpcContext!') for expected_response, response in moves.zip_longest( expected_responses, responses): self.assertEqual(expected_response, response) def testFullDuplexCallExpired(self): - request_iterator = _full_duplex_request_iterator() - with _CreateService() as (methods, stub): + self._protoc() + + request_iterator = _full_duplex_request_iterator(self._requests_pb2) + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): with methods.pause(): responses = stub.FullDuplexCall(request_iterator, test_constants.SHORT_TIMEOUT) @@ -401,8 +560,11 @@ class PythonPluginTest(unittest.TestCase): list(responses) def testFullDuplexCallCancelled(self): - with _CreateService() as (methods, stub): - request_iterator = _full_duplex_request_iterator() + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): + request_iterator = _full_duplex_request_iterator(self._requests_pb2) responses = stub.FullDuplexCall(request_iterator, test_constants.LONG_TIMEOUT) next(responses) @@ -411,8 +573,11 @@ class PythonPluginTest(unittest.TestCase): next(responses) def testFullDuplexCallFailed(self): - request_iterator = _full_duplex_request_iterator() - with _CreateService() as (methods, stub): + self._protoc() + + request_iterator = _full_duplex_request_iterator(self._requests_pb2) + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): with methods.fail(): responses = stub.FullDuplexCall(request_iterator, test_constants.LONG_TIMEOUT) @@ -421,13 +586,16 @@ class PythonPluginTest(unittest.TestCase): next(responses) def testHalfDuplexCall(self): - with _CreateService() as (methods, stub): + self._protoc() + + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): def half_duplex_request_iterator(): - request = request_pb2.StreamingOutputCallRequest() + request = self._requests_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request - request = request_pb2.StreamingOutputCallRequest() + request = self._requests_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=2, interval_us=0) request.response_parameters.add(size=3, interval_us=0) yield request @@ -441,6 +609,8 @@ class PythonPluginTest(unittest.TestCase): self.assertEqual(expected_response, response) def testHalfDuplexCallWedged(self): + self._protoc() + condition = threading.Condition() wait_cell = [False] @@ -455,14 +625,15 @@ class PythonPluginTest(unittest.TestCase): condition.notify_all() def half_duplex_request_iterator(): - request = request_pb2.StreamingOutputCallRequest() + request = self._requests_pb2.StreamingOutputCallRequest() request.response_parameters.add(size=1, interval_us=0) yield request with condition: while wait_cell[0]: condition.wait() - with _CreateService() as (methods, stub): + with _CreateService(self._payload_pb2, self._responses_pb2, + self._service_pb2) as (methods, stub): with wait(): responses = stub.HalfDuplexCall(half_duplex_request_iterator(), test_constants.SHORT_TIMEOUT) diff --git a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/messages.proto b/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/sub/messages.proto index 1b780c69ba..1b780c69ba 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/messages.proto +++ b/src/python/grpcio_tests/tests/protoc_plugin/protos/invocation_testing/split_messages/sub/messages.proto diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index 5f4df79c5b..17fa61ea36 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -22,7 +22,7 @@ from six.moves import queue import grpc from src.proto.grpc.testing import messages_pb2 -from src.proto.grpc.testing import services_pb2 +from src.proto.grpc.testing import services_pb2_grpc from tests.unit import resources from tests.unit import test_common @@ -58,7 +58,7 @@ class BenchmarkClient: if config.payload_config.WhichOneof('payload') == 'simple_params': self._generic = False - self._stub = services_pb2.BenchmarkServiceStub(channel) + self._stub = services_pb2_grpc.BenchmarkServiceStub(channel) payload = messages_pb2.Payload( body='\0' * config.payload_config.simple_params.req_size) self._request = messages_pb2.SimpleRequest( diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py index 05101fdc6d..bb07844491 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_server.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py @@ -13,10 +13,10 @@ # limitations under the License. from src.proto.grpc.testing import messages_pb2 -from src.proto.grpc.testing import services_pb2 +from src.proto.grpc.testing import services_pb2_grpc -class BenchmarkServer(services_pb2.BenchmarkServiceServicer): +class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer): """Synchronous Server implementation for the Benchmark service.""" def UnaryCall(self, request, context): @@ -29,7 +29,7 @@ class BenchmarkServer(services_pb2.BenchmarkServiceServicer): yield messages_pb2.SimpleResponse(payload=payload) -class GenericBenchmarkServer(services_pb2.BenchmarkServiceServicer): +class GenericBenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer): """Generic Server implementation for the Benchmark service.""" def __init__(self, resp_size): diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py index 299ce75e79..a86743fa5a 100644 --- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py +++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py @@ -95,9 +95,6 @@ class ReflectionServicerTest(unittest.TestCase): )),) self.assertSequenceEqual(expected_responses, responses) - @unittest.skip( - 'TODO(mmx): enable when (pure) python protobuf issue is fixed' - '(see https://github.com/google/protobuf/issues/2882)') def testFileContainingExtension(self): requests = (reflection_pb2.ServerReflectionRequest( file_containing_extension=reflection_pb2.ExtensionRequest( diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py index d5ff0064fd..40caa3926a 100644 --- a/src/python/grpcio_tests/tests/stress/client.py +++ b/src/python/grpcio_tests/tests/stress/client.py @@ -20,7 +20,7 @@ import threading import grpc from six.moves import queue from src.proto.grpc.testing import metrics_pb2_grpc -from src.proto.grpc.testing import test_pb2 +from src.proto.grpc.testing import test_pb2_grpc from tests.interop import methods from tests.interop import resources @@ -133,7 +133,7 @@ def run_test(args): for _ in xrange(args.num_channels_per_server): channel = _get_channel(test_server_target, args) for _ in xrange(args.num_stubs_per_channel): - stub = test_pb2.TestServiceStub(channel) + stub = test_pb2_grpc.TestServiceStub(channel) runner = test_runner.TestRunner(stub, test_cases, hist, exception_queue, stop_event) runners.append(runner) diff --git a/src/python/grpcio_tests/tests/stress/metrics_server.py b/src/python/grpcio_tests/tests/stress/metrics_server.py index 11ab6c3f4e..33a74b4a38 100644 --- a/src/python/grpcio_tests/tests/stress/metrics_server.py +++ b/src/python/grpcio_tests/tests/stress/metrics_server.py @@ -16,11 +16,12 @@ import time from src.proto.grpc.testing import metrics_pb2 +from src.proto.grpc.testing import metrics_pb2_grpc GAUGE_NAME = 'python_overall_qps' -class MetricsServer(metrics_pb2.MetricsServiceServicer): +class MetricsServer(metrics_pb2_grpc.MetricsServiceServicer): def __init__(self, histogram): self._start_time = time.time() diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py new file mode 100644 index 0000000000..06f09c8cb4 --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_server_application.py @@ -0,0 +1,66 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example gRPC Python-using server-side application.""" + +import grpc + +# requests_pb2 is a semantic dependency of this module. +from tests.testing import _application_common +from tests.testing.proto import requests_pb2 # pylint: disable=unused-import +from tests.testing.proto import services_pb2 +from tests.testing.proto import services_pb2_grpc + + +class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer): + """Services RPCs.""" + + def UnUn(self, request, context): + if _application_common.UNARY_UNARY_REQUEST == request: + return _application_common.UNARY_UNARY_RESPONSE + else: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + context.set_details('Something is wrong with your request!') + return services_pb2.Down() + + def UnStre(self, request, context): + if _application_common.UNARY_STREAM_REQUEST != request: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + context.set_details('Something is wrong with your request!') + return + yield services_pb2.Strange() + + def StreUn(self, request_iterator, context): + context.send_initial_metadata(( + ('server_application_metadata_key', 'Hi there!',),)) + for request in request_iterator: + if request != _application_common.STREAM_UNARY_REQUEST: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + context.set_details('Something is wrong with your request!') + return services_pb2.Strange() + elif not context.is_active(): + return services_pb2.Strange() + else: + return _application_common.STREAM_UNARY_RESPONSE + + def StreStre(self, request_iterator, context): + for request in request_iterator: + if request != _application_common.STREAM_STREAM_REQUEST: + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + context.set_details('Something is wrong with your request!') + return + elif not context.is_active(): + return + else: + yield _application_common.STREAM_STREAM_RESPONSE + yield _application_common.STREAM_STREAM_RESPONSE diff --git a/src/python/grpcio_tests/tests/testing/_server_test.py b/src/python/grpcio_tests/tests/testing/_server_test.py new file mode 100644 index 0000000000..7897bcce01 --- /dev/null +++ b/src/python/grpcio_tests/tests/testing/_server_test.py @@ -0,0 +1,169 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import unittest + +import grpc +import grpc_testing + +from tests.testing import _application_common +from tests.testing import _application_testing_common +from tests.testing import _server_application +from tests.testing.proto import services_pb2 + + +# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip. +@unittest.skipIf( + services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None, + 'Fix protobuf issue 3452!') +class FirstServiceServicerTest(unittest.TestCase): + + def setUp(self): + self._real_time = grpc_testing.strict_real_time() + self._fake_time = grpc_testing.strict_fake_time(time.time()) + servicer = _server_application.FirstServiceServicer() + descriptors_to_servicers = { + _application_testing_common.FIRST_SERVICE: servicer + } + self._real_time_server = grpc_testing.server_from_dictionary( + descriptors_to_servicers, self._real_time) + self._fake_time_server = grpc_testing.server_from_dictionary( + descriptors_to_servicers, self._fake_time) + + def test_successful_unary_unary(self): + rpc = self._real_time_server.invoke_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN, (), + _application_common.UNARY_UNARY_REQUEST, None) + initial_metadata = rpc.initial_metadata() + response, trailing_metadata, code, details = rpc.termination() + + self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response) + self.assertIs(code, grpc.StatusCode.OK) + + def test_successful_unary_stream(self): + rpc = self._real_time_server.invoke_unary_stream( + _application_testing_common.FIRST_SERVICE_UNSTRE, (), + _application_common.UNARY_STREAM_REQUEST, None) + initial_metadata = rpc.initial_metadata() + trailing_metadata, code, details = rpc.termination() + + self.assertIs(code, grpc.StatusCode.OK) + + def test_successful_stream_unary(self): + rpc = self._real_time_server.invoke_stream_unary( + _application_testing_common.FIRST_SERVICE_STREUN, (), None) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + rpc.requests_closed() + initial_metadata = rpc.initial_metadata() + response, trailing_metadata, code, details = rpc.termination() + + self.assertEqual(_application_common.STREAM_UNARY_RESPONSE, response) + self.assertIs(code, grpc.StatusCode.OK) + + def test_successful_stream_stream(self): + rpc = self._real_time_server.invoke_stream_stream( + _application_testing_common.FIRST_SERVICE_STRESTRE, (), None) + rpc.send_request(_application_common.STREAM_STREAM_REQUEST) + initial_metadata = rpc.initial_metadata() + responses = [ + rpc.take_response(), + rpc.take_response(), + ] + rpc.send_request(_application_common.STREAM_STREAM_REQUEST) + rpc.send_request(_application_common.STREAM_STREAM_REQUEST) + responses.extend([ + rpc.take_response(), + rpc.take_response(), + rpc.take_response(), + rpc.take_response(), + ]) + rpc.requests_closed() + trailing_metadata, code, details = rpc.termination() + + for response in responses: + self.assertEqual(_application_common.STREAM_STREAM_RESPONSE, + response) + self.assertIs(code, grpc.StatusCode.OK) + + def test_server_rpc_idempotence(self): + rpc = self._real_time_server.invoke_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN, (), + _application_common.UNARY_UNARY_REQUEST, None) + first_initial_metadata = rpc.initial_metadata() + second_initial_metadata = rpc.initial_metadata() + third_initial_metadata = rpc.initial_metadata() + first_termination = rpc.termination() + second_termination = rpc.termination() + third_termination = rpc.termination() + + for later_initial_metadata in (second_initial_metadata, + third_initial_metadata,): + self.assertEqual(first_initial_metadata, later_initial_metadata) + response = first_termination[0] + terminal_metadata = first_termination[1] + code = first_termination[2] + details = first_termination[3] + for later_termination in (second_termination, third_termination,): + self.assertEqual(response, later_termination[0]) + self.assertEqual(terminal_metadata, later_termination[1]) + self.assertIs(code, later_termination[2]) + self.assertEqual(details, later_termination[3]) + self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response) + self.assertIs(code, grpc.StatusCode.OK) + + def test_misbehaving_client_unary_unary(self): + rpc = self._real_time_server.invoke_unary_unary( + _application_testing_common.FIRST_SERVICE_UNUN, (), + _application_common.ERRONEOUS_UNARY_UNARY_REQUEST, None) + initial_metadata = rpc.initial_metadata() + response, trailing_metadata, code, details = rpc.termination() + + self.assertIsNot(code, grpc.StatusCode.OK) + + def test_infinite_request_stream_real_time(self): + rpc = self._real_time_server.invoke_stream_unary( + _application_testing_common.FIRST_SERVICE_STREUN, (), + _application_common.INFINITE_REQUEST_STREAM_TIMEOUT) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + initial_metadata = rpc.initial_metadata() + self._real_time.sleep_for( + _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + response, trailing_metadata, code, details = rpc.termination() + + self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED) + + def test_infinite_request_stream_fake_time(self): + rpc = self._fake_time_server.invoke_stream_unary( + _application_testing_common.FIRST_SERVICE_STREUN, (), + _application_common.INFINITE_REQUEST_STREAM_TIMEOUT) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + initial_metadata = rpc.initial_metadata() + self._fake_time.sleep_for( + _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2) + rpc.send_request(_application_common.STREAM_UNARY_REQUEST) + response, trailing_metadata, code, details = rpc.termination() + + self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index c10719b86f..8512d5b96f 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -1,15 +1,21 @@ [ + "_sanity._sanity_test.SanityTest", "health_check._health_servicer_test.HealthServicerTest", "interop._insecure_intraop_test.InsecureIntraopTest", "interop._secure_intraop_test.SecureIntraopTest", "protoc_plugin._python_plugin_test.PythonPluginTest", - "protoc_plugin._split_definitions_test.SameCommonTest", - "protoc_plugin._split_definitions_test.SameSeparateTest", - "protoc_plugin._split_definitions_test.SplitCommonTest", - "protoc_plugin._split_definitions_test.SplitSeparateTest", + "protoc_plugin._split_definitions_test.SameProtoGrpcBeforeProtoProtocStyleTest", + "protoc_plugin._split_definitions_test.SameProtoMid2016ProtocStyleTest", + "protoc_plugin._split_definitions_test.SameProtoProtoBeforeGrpcProtocStyleTest", + "protoc_plugin._split_definitions_test.SameProtoSingleProtocExecutionProtocStyleTest", + "protoc_plugin._split_definitions_test.SplitProtoGrpcBeforeProtoProtocStyleTest", + "protoc_plugin._split_definitions_test.SplitProtoMid2016ProtocStyleTest", + "protoc_plugin._split_definitions_test.SplitProtoProtoBeforeGrpcProtocStyleTest", + "protoc_plugin._split_definitions_test.SplitProtoSingleProtocExecutionProtocStyleTest", "protoc_plugin.beta_python_plugin_test.PythonPluginTest", "reflection._reflection_servicer_test.ReflectionServicerTest", "testing._client_test.ClientTest", + "testing._server_test.FirstServiceServicerTest", "testing._time_test.StrictFakeTimeTest", "testing._time_test.StrictRealTimeTest", "unit._api_test.AllTest", @@ -25,6 +31,8 @@ "unit._credentials_test.CredentialsTest", "unit._cython._cancel_many_calls_test.CancelManyCallsTest", "unit._cython._channel_test.ChannelTest", + "unit._cython._no_messages_server_completion_queue_per_call_test.Test", + "unit._cython._no_messages_single_server_completion_queue_test.Test", "unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest", "unit._cython.cygrpc_test.InsecureServerInsecureClient", "unit._cython.cygrpc_test.SecureServerSecureClient", @@ -38,7 +46,6 @@ "unit._reconnect_test.ReconnectTest", "unit._resource_exhausted_test.ResourceExhaustedTest", "unit._rpc_test.RPCTest", - "unit._sanity._sanity_test.Sanity", "unit._thread_cleanup_test.CleanupThreadTest", "unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest", diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py new file mode 100644 index 0000000000..ac66d1db3d --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -0,0 +1,118 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Common utilities for tests of the Cython layer of gRPC Python.""" + +import collections +import threading + +from grpc._cython import cygrpc + +RPC_COUNT = 4000 + +INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) +EMPTY_FLAGS = 0 + +INVOCATION_METADATA = cygrpc.Metadata( + (cygrpc.Metadatum(b'client-md-key', b'client-md-key'), + cygrpc.Metadatum(b'client-md-key-bin', b'\x00\x01' * 3000),)) + +INITIAL_METADATA = cygrpc.Metadata( + (cygrpc.Metadatum(b'server-initial-md-key', b'server-initial-md-value'), + cygrpc.Metadatum(b'server-initial-md-key-bin', b'\x00\x02' * 3000),)) + +TRAILING_METADATA = cygrpc.Metadata( + (cygrpc.Metadatum(b'server-trailing-md-key', b'server-trailing-md-value'), + cygrpc.Metadatum(b'server-trailing-md-key-bin', b'\x00\x03' * 3000),)) + + +class QueueDriver(object): + + def __init__(self, condition, completion_queue): + self._condition = condition + self._completion_queue = completion_queue + self._due = collections.defaultdict(int) + self._events = collections.defaultdict(list) + + def add_due(self, tags): + if not self._due: + + def in_thread(): + while True: + event = self._completion_queue.poll() + with self._condition: + self._events[event.tag].append(event) + self._due[event.tag] -= 1 + self._condition.notify_all() + if self._due[event.tag] <= 0: + self._due.pop(event.tag) + if not self._due: + return + + thread = threading.Thread(target=in_thread) + thread.start() + for tag in tags: + self._due[tag] += 1 + + def event_with_tag(self, tag): + with self._condition: + while True: + if self._events[tag]: + return self._events[tag].pop(0) + else: + self._condition.wait() + + +def execute_many_times(behavior): + return tuple(behavior() for _ in range(RPC_COUNT)) + + +class OperationResult( + collections.namedtuple('OperationResult', ( + 'start_batch_result', 'completion_type', 'success',))): + pass + + +SUCCESSFUL_OPERATION_RESULT = OperationResult( + cygrpc.CallError.ok, cygrpc.CompletionType.operation_complete, True) + + +class RpcTest(object): + + def setUp(self): + self.server_completion_queue = cygrpc.CompletionQueue() + self.server = cygrpc.Server(cygrpc.ChannelArgs([])) + self.server.register_completion_queue(self.server_completion_queue) + port = self.server.add_http2_port(b'[::]:0') + self.server.start() + self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), + cygrpc.ChannelArgs([])) + + self._server_shutdown_tag = 'server_shutdown_tag' + self.server_condition = threading.Condition() + self.server_driver = QueueDriver(self.server_condition, + self.server_completion_queue) + with self.server_condition: + self.server_driver.add_due({ + self._server_shutdown_tag, + }) + + self.client_condition = threading.Condition() + self.client_completion_queue = cygrpc.CompletionQueue() + self.client_driver = QueueDriver(self.client_condition, + self.client_completion_queue) + + def tearDown(self): + self.server.shutdown(self.server_completion_queue, + self._server_shutdown_tag) + self.server.cancel_all_calls() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py new file mode 100644 index 0000000000..14cc66675c --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -0,0 +1,131 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test a corner-case at the level of the Cython API.""" + +import threading +import unittest + +from grpc._cython import cygrpc + +from tests.unit._cython import _common + + +class Test(_common.RpcTest, unittest.TestCase): + + def _do_rpcs(self): + server_call_condition = threading.Condition() + server_call_completion_queue = cygrpc.CompletionQueue() + server_call_driver = _common.QueueDriver(server_call_condition, + server_call_completion_queue) + + server_request_call_tag = 'server_request_call_tag' + server_send_initial_metadata_tag = 'server_send_initial_metadata_tag' + server_complete_rpc_tag = 'server_complete_rpc_tag' + + with self.server_condition: + server_request_call_start_batch_result = self.server.request_call( + server_call_completion_queue, self.server_completion_queue, + server_request_call_tag) + self.server_driver.add_due({ + server_request_call_tag, + }) + + client_call = self.channel.create_call( + None, _common.EMPTY_FLAGS, self.client_completion_queue, + b'/twinkies', None, _common.INFINITE_FUTURE) + client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' + client_complete_rpc_tag = 'client_complete_rpc_tag' + with self.client_condition: + client_receive_initial_metadata_start_batch_result = ( + client_call.start_client_batch( + cygrpc.Operations([ + cygrpc.operation_receive_initial_metadata( + _common.EMPTY_FLAGS), + ]), client_receive_initial_metadata_tag)) + client_complete_rpc_start_batch_result = client_call.start_client_batch( + cygrpc.Operations([ + cygrpc.operation_send_initial_metadata( + _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), + cygrpc.operation_send_close_from_client( + _common.EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client( + _common.EMPTY_FLAGS), + ]), client_complete_rpc_tag) + self.client_driver.add_due({ + client_receive_initial_metadata_tag, + client_complete_rpc_tag, + }) + + server_request_call_event = self.server_driver.event_with_tag( + server_request_call_tag) + + with server_call_condition: + server_send_initial_metadata_start_batch_result = ( + server_request_call_event.operation_call.start_server_batch([ + cygrpc.operation_send_initial_metadata( + _common.INITIAL_METADATA, _common.EMPTY_FLAGS), + ], server_send_initial_metadata_tag)) + server_call_driver.add_due({ + server_send_initial_metadata_tag, + }) + server_send_initial_metadata_event = server_call_driver.event_with_tag( + server_send_initial_metadata_tag) + + with server_call_condition: + server_complete_rpc_start_batch_result = ( + server_request_call_event.operation_call.start_server_batch([ + cygrpc.operation_receive_close_on_server( + _common.EMPTY_FLAGS), + cygrpc.operation_send_status_from_server( + _common.TRAILING_METADATA, cygrpc.StatusCode.ok, + b'test details', _common.EMPTY_FLAGS), + ], server_complete_rpc_tag)) + server_call_driver.add_due({ + server_complete_rpc_tag, + }) + server_complete_rpc_event = server_call_driver.event_with_tag( + server_complete_rpc_tag) + + client_receive_initial_metadata_event = self.client_driver.event_with_tag( + client_receive_initial_metadata_tag) + client_complete_rpc_event = self.client_driver.event_with_tag( + client_complete_rpc_tag) + + return (_common.OperationResult(server_request_call_start_batch_result, + server_request_call_event.type, + server_request_call_event.success), + _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.type, + client_receive_initial_metadata_event.success), + _common.OperationResult(client_complete_rpc_start_batch_result, + client_complete_rpc_event.type, + client_complete_rpc_event.success), + _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.type, + server_send_initial_metadata_event.success), + _common.OperationResult(server_complete_rpc_start_batch_result, + server_complete_rpc_event.type, + server_complete_rpc_event.success),) + + def test_rpcs(self): + expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * + 5] * _common.RPC_COUNT + actuallys = _common.execute_many_times(self._do_rpcs) + self.assertSequenceEqual(expecteds, actuallys) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py new file mode 100644 index 0000000000..1e44bcc4dc --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -0,0 +1,126 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test a corner-case at the level of the Cython API.""" + +import threading +import unittest + +from grpc._cython import cygrpc + +from tests.unit._cython import _common + + +class Test(_common.RpcTest, unittest.TestCase): + + def _do_rpcs(self): + server_request_call_tag = 'server_request_call_tag' + server_send_initial_metadata_tag = 'server_send_initial_metadata_tag' + server_complete_rpc_tag = 'server_complete_rpc_tag' + + with self.server_condition: + server_request_call_start_batch_result = self.server.request_call( + self.server_completion_queue, self.server_completion_queue, + server_request_call_tag) + self.server_driver.add_due({ + server_request_call_tag, + }) + + client_call = self.channel.create_call( + None, _common.EMPTY_FLAGS, self.client_completion_queue, + b'/twinkies', None, _common.INFINITE_FUTURE) + client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' + client_complete_rpc_tag = 'client_complete_rpc_tag' + with self.client_condition: + client_receive_initial_metadata_start_batch_result = ( + client_call.start_client_batch( + cygrpc.Operations([ + cygrpc.operation_receive_initial_metadata( + _common.EMPTY_FLAGS), + ]), client_receive_initial_metadata_tag)) + client_complete_rpc_start_batch_result = client_call.start_client_batch( + cygrpc.Operations([ + cygrpc.operation_send_initial_metadata( + _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), + cygrpc.operation_send_close_from_client( + _common.EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client( + _common.EMPTY_FLAGS), + ]), client_complete_rpc_tag) + self.client_driver.add_due({ + client_receive_initial_metadata_tag, + client_complete_rpc_tag, + }) + + server_request_call_event = self.server_driver.event_with_tag( + server_request_call_tag) + + with self.server_condition: + server_send_initial_metadata_start_batch_result = ( + server_request_call_event.operation_call.start_server_batch([ + cygrpc.operation_send_initial_metadata( + _common.INITIAL_METADATA, _common.EMPTY_FLAGS), + ], server_send_initial_metadata_tag)) + self.server_driver.add_due({ + server_send_initial_metadata_tag, + }) + server_send_initial_metadata_event = self.server_driver.event_with_tag( + server_send_initial_metadata_tag) + + with self.server_condition: + server_complete_rpc_start_batch_result = ( + server_request_call_event.operation_call.start_server_batch([ + cygrpc.operation_receive_close_on_server( + _common.EMPTY_FLAGS), + cygrpc.operation_send_status_from_server( + _common.TRAILING_METADATA, cygrpc.StatusCode.ok, + b'test details', _common.EMPTY_FLAGS), + ], server_complete_rpc_tag)) + self.server_driver.add_due({ + server_complete_rpc_tag, + }) + server_complete_rpc_event = self.server_driver.event_with_tag( + server_complete_rpc_tag) + + client_receive_initial_metadata_event = self.client_driver.event_with_tag( + client_receive_initial_metadata_tag) + client_complete_rpc_event = self.client_driver.event_with_tag( + client_complete_rpc_tag) + + return (_common.OperationResult(server_request_call_start_batch_result, + server_request_call_event.type, + server_request_call_event.success), + _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.type, + client_receive_initial_metadata_event.success), + _common.OperationResult(client_complete_rpc_start_batch_result, + client_complete_rpc_event.type, + client_complete_rpc_event.success), + _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.type, + server_send_initial_metadata_event.success), + _common.OperationResult(server_complete_rpc_start_batch_result, + server_complete_rpc_event.type, + server_complete_rpc_event.success),) + + def test_rpcs(self): + expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * + 5] * _common.RPC_COUNT + actuallys = _common.execute_many_times(self._do_rpcs) + self.assertSequenceEqual(expecteds, actuallys) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_sanity/__init__.py b/src/python/grpcio_tests/tests/unit/_sanity/__init__.py deleted file mode 100644 index 5772620b60..0000000000 --- a/src/python/grpcio_tests/tests/unit/_sanity/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. |