diff options
author | 2016-06-21 14:38:00 -0700 | |
---|---|---|
committer | 2016-06-21 14:38:00 -0700 | |
commit | 4bcd135824dcf6e68d6827af757accf7e8297bdb (patch) | |
tree | 4b7d0c25e31709d360b543930b32da1bf7a4a91d /src/python | |
parent | 773a8825bcb9cae06af18263b112b8f8fd96f10d (diff) | |
parent | 5988716d9d6e33cd59631865527d73d3caa87387 (diff) |
Merge branch 'master' into fixes
Diffstat (limited to 'src/python')
34 files changed, 1385 insertions, 1867 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 5ba5a4e1fd..d07a7a721d 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -29,8 +29,6 @@ """gRPC's Python API.""" -__import__('pkg_resources').declare_namespace(__name__) - import abc import enum @@ -212,14 +210,14 @@ class ChannelConnectivity(enum.Enum): READY: The channel is ready to conduct RPCs. TRANSIENT_FAILURE: The channel has seen a failure from which it expects to recover. - FATAL_FAILURE: The channel has seen a failure from which it cannot recover. + SHUTDOWN: The channel has seen a failure from which it cannot recover. """ IDLE = (_cygrpc.ConnectivityState.idle, 'idle') CONNECTING = (_cygrpc.ConnectivityState.connecting, 'connecting') READY = (_cygrpc.ConnectivityState.ready, 'ready') TRANSIENT_FAILURE = ( _cygrpc.ConnectivityState.transient_failure, 'transient failure') - FATAL_FAILURE = (_cygrpc.ConnectivityState.fatal_failure, 'fatal failure') + SHUTDOWN = (_cygrpc.ConnectivityState.shutdown, 'shutdown') @enum.unique @@ -900,6 +898,102 @@ class Server(six.with_metaclass(abc.ABCMeta)): ################################# Functions ################################ +def unary_unary_rpc_method_handler( + behavior, request_deserializer=None, response_serializer=None): + """Creates an RpcMethodHandler for a unary-unary RPC method. + + Args: + behavior: The implementation of an RPC method as a callable behavior taking + a single request value and returning a single response value. + request_deserializer: An optional request deserialization behavior. + response_serializer: An optional response serialization behavior. + + Returns: + An RpcMethodHandler for a unary-unary RPC method constructed from the given + parameters. + """ + from grpc import _utilities + return _utilities.RpcMethodHandler( + False, False, request_deserializer, response_serializer, behavior, None, + None, None) + + +def unary_stream_rpc_method_handler( + behavior, request_deserializer=None, response_serializer=None): + """Creates an RpcMethodHandler for a unary-stream RPC method. + + Args: + behavior: The implementation of an RPC method as a callable behavior taking + a single request value and returning an iterator of response values. + request_deserializer: An optional request deserialization behavior. + response_serializer: An optional response serialization behavior. + + Returns: + An RpcMethodHandler for a unary-stream RPC method constructed from the + given parameters. + """ + from grpc import _utilities + return _utilities.RpcMethodHandler( + False, True, request_deserializer, response_serializer, None, behavior, + None, None) + + +def stream_unary_rpc_method_handler( + behavior, request_deserializer=None, response_serializer=None): + """Creates an RpcMethodHandler for a stream-unary RPC method. + + Args: + behavior: The implementation of an RPC method as a callable behavior taking + an iterator of request values and returning a single response value. + request_deserializer: An optional request deserialization behavior. + response_serializer: An optional response serialization behavior. + + Returns: + An RpcMethodHandler for a stream-unary RPC method constructed from the + given parameters. + """ + from grpc import _utilities + return _utilities.RpcMethodHandler( + True, False, request_deserializer, response_serializer, None, None, + behavior, None) + + +def stream_stream_rpc_method_handler( + behavior, request_deserializer=None, response_serializer=None): + """Creates an RpcMethodHandler for a stream-stream RPC method. + + Args: + behavior: The implementation of an RPC method as a callable behavior taking + an iterator of request values and returning an iterator of response + values. + request_deserializer: An optional request deserialization behavior. + response_serializer: An optional response serialization behavior. + + Returns: + An RpcMethodHandler for a stream-stream RPC method constructed from the + given parameters. + """ + from grpc import _utilities + return _utilities.RpcMethodHandler( + True, True, request_deserializer, response_serializer, None, None, None, + behavior) + + +def method_handlers_generic_handler(service, method_handlers): + """Creates a grpc.GenericRpcHandler from RpcMethodHandlers. + + Args: + service: A service name to be used for the given method handlers. + method_handlers: A dictionary from method name to RpcMethodHandler + implementing the named method. + + Returns: + A GenericRpcHandler constructed from the given parameters. + """ + from grpc import _utilities + return _utilities.DictionaryGenericHandler(service, method_handlers) + + def ssl_channel_credentials( root_certificates=None, private_key=None, certificate_chain=None): """Creates a ChannelCredentials for use with an SSL-enabled Channel. @@ -1059,7 +1153,7 @@ def insecure_channel(target, options=None): A Channel to the target through which RPCs may be conducted. """ from grpc import _channel - return _channel.Channel(target, None, options) + return _channel.Channel(target, options, None) def secure_channel(target, credentials, options=None): @@ -1075,7 +1169,7 @@ def secure_channel(target, credentials, options=None): A Channel to the target through which RPCs may be conducted. """ from grpc import _channel - return _channel.Channel(target, credentials, options) + return _channel.Channel(target, options, credentials._credentials) def server(generic_rpc_handlers, thread_pool, options=None): @@ -1097,3 +1191,49 @@ def server(generic_rpc_handlers, thread_pool, options=None): """ from grpc import _server return _server.Server(generic_rpc_handlers, thread_pool) + + +################################### __all__ ################################# + + +__all__ = ( + 'FutureTimeoutError', + 'FutureCancelledError', + 'Future', + 'ChannelConnectivity', + 'StatusCode', + 'RpcError', + 'RpcContext', + 'Call', + 'ChannelCredentials', + 'CallCredentials', + 'AuthMetadataContext', + 'AuthMetadataPluginCallback', + 'AuthMetadataPlugin', + 'ServerCredentials', + 'UnaryUnaryMultiCallable', + 'UnaryStreamMultiCallable', + 'StreamUnaryMultiCallable', + 'StreamStreamMultiCallable', + 'Channel', + 'ServicerContext', + 'RpcMethodHandler', + 'HandlerCallDetails', + 'GenericRpcHandler', + 'Server', + 'unary_unary_rpc_method_handler', + 'unary_stream_rpc_method_handler', + 'stream_unary_rpc_method_handler', + 'stream_stream_rpc_method_handler', + 'method_handlers_generic_handler', + 'ssl_channel_credentials', + 'metadata_call_credentials', + 'access_token_call_credentials', + 'composite_call_credentials', + 'composite_channel_credentials', + 'ssl_server_credentials', + 'channel_ready_future', + 'insecure_channel', + 'secure_channel', + 'server', +) diff --git a/src/python/grpcio/grpc/_adapter/_types.py b/src/python/grpcio/grpc/_adapter/_types.py index f8405949d4..b7cc6fbbb5 100644 --- a/src/python/grpcio/grpc/_adapter/_types.py +++ b/src/python/grpcio/grpc/_adapter/_types.py @@ -114,7 +114,7 @@ class ConnectivityState(enum.IntEnum): CONNECTING = cygrpc.ConnectivityState.connecting READY = cygrpc.ConnectivityState.ready TRANSIENT_FAILURE = cygrpc.ConnectivityState.transient_failure - FATAL_FAILURE = cygrpc.ConnectivityState.fatal_failure + FATAL_FAILURE = cygrpc.ConnectivityState.shutdown class Status(collections.namedtuple( diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index d9315d2e6c..7cdd542de2 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -814,6 +814,13 @@ def _options(options): class Channel(grpc.Channel): def __init__(self, target, options, credentials): + """Constructor. + + Args: + target: The target to which to connect. + options: Configuration options for the channel. + credentials: A cygrpc.ChannelCredentials or None. + """ self._channel = cygrpc.Channel(target, _options(options), credentials) self._call_state = _ChannelCallState(self._channel) self._connectivity_state = _ChannelConnectivityState(self._channel) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index b8688a0149..f351bea9e3 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -30,6 +30,8 @@ """Shared implementation.""" import logging +import threading +import time import six @@ -44,8 +46,8 @@ CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { cygrpc.ConnectivityState.ready: grpc.ChannelConnectivity.READY, cygrpc.ConnectivityState.transient_failure: grpc.ChannelConnectivity.TRANSIENT_FAILURE, - cygrpc.ConnectivityState.fatal_failure: - grpc.ChannelConnectivity.FATAL_FAILURE, + cygrpc.ConnectivityState.shutdown: + grpc.ChannelConnectivity.SHUTDOWN, } CYGRPC_STATUS_CODE_TO_STATUS_CODE = { @@ -110,3 +112,43 @@ def fully_qualified_method(group, method): group = _encode(group) method = _encode(method) return b'/' + group + b'/' + method + + +class CleanupThread(threading.Thread): + """A threading.Thread subclass supporting custom behavior on join(). + + On Python Interpreter exit, Python will attempt to join outstanding threads + prior to garbage collection. We may need to do additional cleanup, and + we accomplish this by overriding the join() method. + """ + + def __init__(self, behavior, group=None, target=None, name=None, + args=(), kwargs={}): + """Constructor. + + Args: + behavior (function): Function called on join() with a single + argument, timeout, indicating the maximum duration of + `behavior`, or None indicating `behavior` has no deadline. + `behavior` must be idempotent. + group (None): should be None. Reseved for future extensions + when ThreadGroup is implemented. + target (function): The function to invoke when this thread is + run. Defaults to None. + name (str): The name of this thread. Defaults to None. + args (tuple[object]): A tuple of arguments to pass to `target`. + kwargs (dict[str,object]): A dictionary of keyword arguments to + pass to `target`. + """ + super(CleanupThread, self).__init__(group=group, target=target, + name=name, args=args, kwargs=kwargs) + self._behavior = behavior + + def join(self, timeout=None): + start_time = time.time() + self._behavior(timeout) + end_time = time.time() + if timeout is not None: + timeout -= end_time - start_time + timeout = max(timeout, 0) + super(CleanupThread, self).join(timeout) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi index a67c963684..01089c3dc0 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi @@ -31,9 +31,6 @@ cdef class CompletionQueue: cdef grpc_completion_queue *c_completion_queue - cdef object pluck_condition - cdef int num_plucking - cdef int num_polling cdef bint is_shutting_down cdef bint is_shutdown diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index cdae39d519..90266516fe 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -32,6 +32,8 @@ cimport cpython import threading import time +cdef int _INTERRUPT_CHECK_PERIOD_MS = 200 + cdef class CompletionQueue: @@ -40,9 +42,6 @@ cdef class CompletionQueue: self.c_completion_queue = grpc_completion_queue_create(NULL) self.is_shutting_down = False self.is_shutdown = False - self.pluck_condition = threading.Condition() - self.num_plucking = 0 - self.num_polling = 0 cdef _interpret_event(self, grpc_event event): cdef OperationTag tag = None @@ -83,45 +82,27 @@ cdef class CompletionQueue: def poll(self, Timespec deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for # 'special' methods (like next and __next__). + cdef gpr_timespec c_increment + cdef gpr_timespec c_timeout cdef gpr_timespec c_deadline with nogil: + c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) - if deadline is not None: - c_deadline = deadline.c_time - cdef grpc_event event - - # Poll within a critical section to detect contention - with self.pluck_condition: - assert self.num_plucking == 0, 'cannot simultaneously pluck and poll' - self.num_polling += 1 - with nogil: - event = grpc_completion_queue_next( - self.c_completion_queue, c_deadline, NULL) - with self.pluck_condition: - self.num_polling -= 1 - return self._interpret_event(event) - - def pluck(self, OperationTag tag, Timespec deadline=None): - # Plucking a 'None' tag is equivalent to passing control to GRPC core until - # the deadline. - cdef gpr_timespec c_deadline = gpr_inf_future( - GPR_CLOCK_REALTIME) - if deadline is not None: - c_deadline = deadline.c_time - cdef grpc_event event - - # Pluck within a critical section to detect contention - with self.pluck_condition: - assert self.num_polling == 0, 'cannot simultaneously pluck and poll' - assert self.num_plucking < GRPC_MAX_COMPLETION_QUEUE_PLUCKERS, ( - 'cannot pluck more than {} times simultaneously'.format( - GRPC_MAX_COMPLETION_QUEUE_PLUCKERS)) - self.num_plucking += 1 - with nogil: - event = grpc_completion_queue_pluck( - self.c_completion_queue, <cpython.PyObject *>tag, c_deadline, NULL) - with self.pluck_condition: - self.num_plucking -= 1 + if deadline is not None: + c_deadline = deadline.c_time + + while True: + c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) + if gpr_time_cmp(c_timeout, c_deadline) > 0: + c_timeout = c_deadline + event = grpc_completion_queue_next( + self.c_completion_queue, c_timeout, NULL) + if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0: + break; + + # Handle any signals + with gil: + cpython.PyErr_CheckSignals() return self._interpret_event(event) def shutdown(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 05b8886df7..168b9751aa 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -80,6 +80,12 @@ cdef extern from "grpc/_cython/loader.h": gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type target_clock) nogil + gpr_timespec gpr_time_from_millis(int64_t ms, gpr_clock_type type) nogil + + gpr_timespec gpr_time_add(gpr_timespec a, gpr_timespec b) nogil + + int gpr_time_cmp(gpr_timespec a, gpr_timespec b) nogil + ctypedef enum grpc_status_code: GRPC_STATUS_OK GRPC_STATUS_CANCELLED diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 2e52953c0a..0055d0d3a2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -33,7 +33,7 @@ class ConnectivityState: connecting = GRPC_CHANNEL_CONNECTING ready = GRPC_CHANNEL_READY transient_failure = GRPC_CHANNEL_TRANSIENT_FAILURE - fatal_failure = GRPC_CHANNEL_SHUTDOWN + shutdown = GRPC_CHANNEL_SHUTDOWN class ChannelArgKey: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index c8a73e65d6..42afeb8498 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -99,7 +99,7 @@ cdef class Server: with nogil: grpc_server_start(self.c_server) # Ensure the core has gotten a chance to do the start-up work - self.backup_shutdown_queue.pluck(None, Timespec(None)) + self.backup_shutdown_queue.poll(Timespec(None)) def add_http2_port(self, address, ServerCredentials server_credentials=None): diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c index 5c49f6cf3e..8437e74ba0 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.c +++ b/src/python/grpcio/grpc/_cython/imports.generated.c @@ -260,6 +260,8 @@ gpr_avl_unref_type gpr_avl_unref_import; gpr_avl_add_type gpr_avl_add_import; gpr_avl_remove_type gpr_avl_remove_import; gpr_avl_get_type gpr_avl_get_import; +gpr_avl_maybe_get_type gpr_avl_maybe_get_import; +gpr_avl_is_empty_type gpr_avl_is_empty_import; gpr_cmdline_create_type gpr_cmdline_create_import; gpr_cmdline_add_int_type gpr_cmdline_add_int_import; gpr_cmdline_add_flag_type gpr_cmdline_add_flag_import; @@ -533,6 +535,8 @@ void pygrpc_load_imports(HMODULE library) { gpr_avl_add_import = (gpr_avl_add_type) GetProcAddress(library, "gpr_avl_add"); gpr_avl_remove_import = (gpr_avl_remove_type) GetProcAddress(library, "gpr_avl_remove"); gpr_avl_get_import = (gpr_avl_get_type) GetProcAddress(library, "gpr_avl_get"); + gpr_avl_maybe_get_import = (gpr_avl_maybe_get_type) GetProcAddress(library, "gpr_avl_maybe_get"); + gpr_avl_is_empty_import = (gpr_avl_is_empty_type) GetProcAddress(library, "gpr_avl_is_empty"); gpr_cmdline_create_import = (gpr_cmdline_create_type) GetProcAddress(library, "gpr_cmdline_create"); gpr_cmdline_add_int_import = (gpr_cmdline_add_int_type) GetProcAddress(library, "gpr_cmdline_add_int"); gpr_cmdline_add_flag_import = (gpr_cmdline_add_flag_type) GetProcAddress(library, "gpr_cmdline_add_flag"); diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index 16bb5cdfab..d52e8591b3 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -482,7 +482,7 @@ extern grpc_byte_buffer_reader_readall_type grpc_byte_buffer_reader_readall_impo typedef grpc_byte_buffer *(*grpc_raw_byte_buffer_from_reader_type)(grpc_byte_buffer_reader *reader); extern grpc_raw_byte_buffer_from_reader_type grpc_raw_byte_buffer_from_reader_import; #define grpc_raw_byte_buffer_from_reader grpc_raw_byte_buffer_from_reader_import -typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...); +typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...) GPRC_PRINT_FORMAT_CHECK(4, 5); extern gpr_log_type gpr_log_import; #define gpr_log gpr_log_import typedef void(*gpr_log_message_type)(const char *file, int line, gpr_log_severity severity, const char *message); @@ -731,6 +731,12 @@ extern gpr_avl_remove_type gpr_avl_remove_import; typedef void *(*gpr_avl_get_type)(gpr_avl avl, void *key); extern gpr_avl_get_type gpr_avl_get_import; #define gpr_avl_get gpr_avl_get_import +typedef int(*gpr_avl_maybe_get_type)(gpr_avl avl, void *key, void **value); +extern gpr_avl_maybe_get_type gpr_avl_maybe_get_import; +#define gpr_avl_maybe_get gpr_avl_maybe_get_import +typedef int(*gpr_avl_is_empty_type)(gpr_avl avl); +extern gpr_avl_is_empty_type gpr_avl_is_empty_import; +#define gpr_avl_is_empty gpr_avl_is_empty_import typedef gpr_cmdline *(*gpr_cmdline_create_type)(const char *description); extern gpr_cmdline_create_type gpr_cmdline_create_import; #define gpr_cmdline_create gpr_cmdline_create_import @@ -821,7 +827,7 @@ extern gpr_format_message_type gpr_format_message_import; typedef char *(*gpr_strdup_type)(const char *src); extern gpr_strdup_type gpr_strdup_import; #define gpr_strdup gpr_strdup_import -typedef int(*gpr_asprintf_type)(char **strp, const char *format, ...); +typedef int(*gpr_asprintf_type)(char **strp, const char *format, ...) GPRC_PRINT_FORMAT_CHECK(2, 3); extern gpr_asprintf_type gpr_asprintf_import; #define gpr_asprintf gpr_asprintf_import typedef const char *(*gpr_subprocess_binary_extension_type)(); diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index f4f6720497..bf20f15f72 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -60,6 +60,8 @@ _CANCELLED = 'cancelled' _EMPTY_FLAGS = 0 _EMPTY_METADATA = cygrpc.Metadata(()) +_UNEXPECTED_EXIT_SERVER_GRACE = 1.0 + def _serialized_request(request_event): return request_event.batch_operations[0].received_message.bytes() @@ -254,7 +256,7 @@ class _Context(grpc.ServicerContext): else: if self._state.initial_metadata_allowed: operation = cygrpc.operation_send_initial_metadata( - cygrpc.Metadata(initial_metadata), _EMPTY_FLAGS) + _common.metadata(initial_metadata), _EMPTY_FLAGS) self._rpc_event.operation_call.start_batch( cygrpc.Operations((operation,)), _send_initial_metadata(self._state)) @@ -342,10 +344,9 @@ def _unary_request(rpc_event, state, request_deserializer): if state.client is _CLOSED: details = '"{}" requires exactly one request message.'.format( rpc_event.request_call_details.method) - # TODO(5992#issuecomment-220761992): really, what status code? _abort( state, rpc_event.operation_call, - cygrpc.StatusCode.unavailable, details) + cygrpc.StatusCode.unimplemented, details) return None elif state.client is _CANCELLED: return None @@ -670,17 +671,6 @@ def _serve(state): return -def _start(state): - with state.lock: - if state.stage is not _ServerStage.STOPPED: - raise ValueError('Cannot start already-started server!') - state.server.start() - state.stage = _ServerStage.STARTED - _request_call(state) - thread = threading.Thread(target=_serve, args=(state,)) - thread.start() - - def _stop(state, grace): with state.lock: if state.stage is _ServerStage.STOPPED: @@ -719,6 +709,24 @@ def _stop(state, grace): return shutdown_event +def _start(state): + with state.lock: + if state.stage is not _ServerStage.STOPPED: + raise ValueError('Cannot start already-started server!') + state.server.start() + state.stage = _ServerStage.STARTED + _request_call(state) + def cleanup_server(timeout): + if timeout is None: + _stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait() + else: + _stop(state, timeout).wait() + + thread = _common.CleanupThread( + cleanup_server, target=_serve, args=(state,)) + thread.start() + + class Server(grpc.Server): def __init__(self, generic_handlers, thread_pool): diff --git a/src/python/grpcio/grpc/_utilities.py b/src/python/grpcio/grpc/_utilities.py index a4ca9b7282..4850967fbc 100644 --- a/src/python/grpcio/grpc/_utilities.py +++ b/src/python/grpcio/grpc/_utilities.py @@ -29,16 +29,41 @@ """Internal utilities for gRPC Python.""" +import collections import threading import time +import six + import grpc +from grpc import _common from grpc.framework.foundation import callable_util _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( 'Exception calling connectivity future "done" callback!') +class RpcMethodHandler( + collections.namedtuple( + '_RpcMethodHandler', + ('request_streaming', 'response_streaming', 'request_deserializer', + 'response_serializer', 'unary_unary', 'unary_stream', 'stream_unary', + 'stream_stream',)), + grpc.RpcMethodHandler): + pass + + +class DictionaryGenericHandler(grpc.GenericRpcHandler): + + def __init__(self, service, method_handlers): + self._method_handlers = { + _common.fully_qualified_method(service, method): method_handler + for method, method_handler in six.iteritems(method_handlers)} + + def service(self, handler_call_details): + return self._method_handlers.get(handler_call_details.method) + + class _ChannelReadyFuture(grpc.Future): def __init__(self, channel): @@ -144,4 +169,3 @@ def channel_ready_future(channel): ready_future = _ChannelReadyFuture(channel) ready_future.start() return ready_future - diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py index 4343b6c4b5..90f6bbbfcc 100644 --- a/src/python/grpcio/grpc/beta/interfaces.py +++ b/src/python/grpcio/grpc/beta/interfaces.py @@ -36,6 +36,9 @@ import six import grpc ChannelConnectivity = grpc.ChannelConnectivity +# FATAL_FAILURE was a Beta-API name for SHUTDOWN +ChannelConnectivity.FATAL_FAILURE = ChannelConnectivity.SHUTDOWN + StatusCode = grpc.StatusCode diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 1a04021c27..839c555f05 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -45,7 +45,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/support/env_windows.c', 'src/core/lib/support/histogram.c', 'src/core/lib/support/host_port.c', - 'src/core/lib/support/load_file.c', 'src/core/lib/support/log.c', 'src/core/lib/support/log_android.c', 'src/core/lib/support/log_linux.c', @@ -94,6 +93,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/endpoint.c', 'src/core/lib/iomgr/endpoint_pair_posix.c', 'src/core/lib/iomgr/endpoint_pair_windows.c', + 'src/core/lib/iomgr/error.c', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', @@ -103,6 +103,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/iomgr.c', 'src/core/lib/iomgr/iomgr_posix.c', 'src/core/lib/iomgr/iomgr_windows.c', + 'src/core/lib/iomgr/load_file.c', 'src/core/lib/iomgr/polling_entity.c', 'src/core/lib/iomgr/pollset_set_windows.c', 'src/core/lib/iomgr/pollset_windows.c', @@ -161,6 +162,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/transport/transport.c', 'src/core/lib/transport/transport_op_string.c', 'src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c', + 'src/core/ext/transport/chttp2/transport/bin_decoder.c', 'src/core/ext/transport/chttp2/transport/bin_encoder.c', 'src/core/ext/transport/chttp2/transport/chttp2_plugin.c', 'src/core/ext/transport/chttp2/transport/chttp2_transport.c', @@ -204,6 +206,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/security/transport/secure_endpoint.c', 'src/core/lib/security/transport/security_connector.c', 'src/core/lib/security/transport/server_auth_filter.c', + 'src/core/lib/security/transport/tsi_error.c', 'src/core/lib/security/util/b64.c', 'src/core/lib/security/util/json_util.c', 'src/core/lib/surface/init_secure.c', diff --git a/src/python/grpcio/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio/tests/protoc_plugin/_python_plugin_test.py new file mode 100644 index 0000000000..1c9cbb0d0c --- /dev/null +++ b/src/python/grpcio/tests/protoc_plugin/_python_plugin_test.py @@ -0,0 +1,583 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import collections +from concurrent import futures +import contextlib +import distutils.spawn +import errno +import os +import shutil +import subprocess +import sys +import tempfile +import threading +import unittest + +from six import moves + +import grpc +from tests.unit.framework.common import test_constants + +# Identifiers of entities we expect to find in the generated module. +STUB_IDENTIFIER = 'TestServiceStub' +SERVICER_IDENTIFIER = 'TestServiceServicer' +ADD_SERVICER_TO_SERVER_IDENTIFIER = 'add_TestServiceServicer_to_server' + + +class _ServicerMethods(object): + + def __init__(self, response_pb2, payload_pb2): + self._condition = threading.Condition() + self._paused = False + self._fail = False + self._response_pb2 = response_pb2 + self._payload_pb2 = payload_pb2 + + @contextlib.contextmanager + def pause(self): # pylint: disable=invalid-name + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + @contextlib.contextmanager + def fail(self): # pylint: disable=invalid-name + with self._condition: + self._fail = True + yield + with self._condition: + self._fail = False + + def _control(self): # pylint: disable=invalid-name + with self._condition: + if self._fail: + raise ValueError() + while self._paused: + self._condition.wait() + + def UnaryCall(self, request, unused_rpc_context): + response = self._response_pb2.SimpleResponse() + response.payload.payload_type = self._payload_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * request.response_size + self._control() + return response + + def StreamingOutputCall(self, request, unused_rpc_context): + for parameter in request.response_parameters: + response = self._response_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._payload_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + yield response + + def StreamingInputCall(self, request_iter, unused_rpc_context): + response = self._response_pb2.StreamingInputCallResponse() + aggregated_payload_size = 0 + for request in request_iter: + aggregated_payload_size += len(request.payload.payload_compressable) + response.aggregated_payload_size = aggregated_payload_size + self._control() + return response + + def FullDuplexCall(self, request_iter, unused_rpc_context): + for request in request_iter: + for parameter in request.response_parameters: + response = self._response_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._payload_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + yield response + + def HalfDuplexCall(self, request_iter, unused_rpc_context): + responses = [] + for request in request_iter: + for parameter in request.response_parameters: + response = self._response_pb2.StreamingOutputCallResponse() + response.payload.payload_type = self._payload_pb2.COMPRESSABLE + response.payload.payload_compressable = 'a' * parameter.size + self._control() + responses.append(response) + for response in responses: + yield response + + +class _Service( + collections.namedtuple( + '_Service', ('servicer_methods', 'server', 'stub',))): + """A live and running service. + + Attributes: + servicer_methods: The _ServicerMethods servicing RPCs. + server: The grpc.Server servicing RPCs. + stub: A stub on which to invoke RPCs. + """ + + +def _CreateService(service_pb2, response_pb2, payload_pb2): + """Provides a servicer backend and a stub. + + Args: + service_pb2: The service_pb2 module generated by this test. + response_pb2: The response_pb2 module generated by this test. + payload_pb2: The payload_pb2 module generated by this test. + + Returns: + A _Service with which to test RPCs. + """ + servicer_methods = _ServicerMethods(response_pb2, payload_pb2) + + class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)): + + def UnaryCall(self, request, context): + return servicer_methods.UnaryCall(request, context) + + def StreamingOutputCall(self, request, context): + return servicer_methods.StreamingOutputCall(request, context) + + def StreamingInputCall(self, request_iter, context): + return servicer_methods.StreamingInputCall(request_iter, context) + + def FullDuplexCall(self, request_iter, context): + return servicer_methods.FullDuplexCall(request_iter, context) + + def HalfDuplexCall(self, request_iter, context): + return servicer_methods.HalfDuplexCall(request_iter, context) + + server = grpc.server( + (), futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) + getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server) + port = server.add_insecure_port('[::]:0') + server.start() + channel = grpc.insecure_channel('localhost:{}'.format(port)) + stub = getattr(service_pb2, STUB_IDENTIFIER)(channel) + return _Service(servicer_methods, server, stub) + + +def _CreateIncompleteService(service_pb2): + """Provides a servicer backend that fails to implement methods and its stub. + + Args: + service_pb2: The service_pb2 module generated by this test. + + Returns: + A _Service with which to test RPCs. The returned _Service's + servicer_methods implements none of the methods required of it. + """ + + class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)): + pass + + server = grpc.server( + (), futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) + getattr(service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server) + port = server.add_insecure_port('[::]:0') + server.start() + channel = grpc.insecure_channel('localhost:{}'.format(port)) + stub = getattr(service_pb2, STUB_IDENTIFIER)(channel) + return _Service(None, server, stub) + + +def _streaming_input_request_iterator(request_pb2, payload_pb2): + for _ in range(3): + request = request_pb2.StreamingInputCallRequest() + request.payload.payload_type = payload_pb2.COMPRESSABLE + request.payload.payload_compressable = 'a' + yield request + + +def _streaming_output_request(request_pb2): + request = request_pb2.StreamingOutputCallRequest() + sizes = [1, 2, 3] + request.response_parameters.add(size=sizes[0], interval_us=0) + request.response_parameters.add(size=sizes[1], interval_us=0) + request.response_parameters.add(size=sizes[2], interval_us=0) + return request + + +def _full_duplex_request_iterator(request_pb2): + request = request_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = request_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + + +class PythonPluginTest(unittest.TestCase): + """Test case for the gRPC Python protoc-plugin. + + While reading these tests, remember that the futures API + (`stub.method.future()`) only gives futures for the *response-unary* + methods and does not exist for response-streaming methods. + """ + + def setUp(self): + # Assume that the appropriate protoc and grpc_python_plugins are on the + # path. + protoc_command = 'protoc' + protoc_plugin_filename = distutils.spawn.find_executable( + 'grpc_python_plugin') + if not os.path.isfile(protoc_command): + # Assume that if we haven't built protoc that it's on the system. + protoc_command = 'protoc' + + # Ensure that the output directory exists. + self.outdir = tempfile.mkdtemp() + + # Find all proto files + paths = [] + root_dir = os.path.dirname(os.path.realpath(__file__)) + proto_dir = os.path.join(root_dir, 'protos') + for walk_root, _, filenames in os.walk(proto_dir): + for filename in filenames: + if filename.endswith('.proto'): + path = os.path.join(walk_root, filename) + paths.append(path) + + # Invoke protoc with the plugin. + cmd = [ + protoc_command, + '--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename, + '-I %s' % root_dir, + '--python_out=%s' % self.outdir, + '--python-grpc_out=%s' % self.outdir + ] + paths + subprocess.check_call(' '.join(cmd), shell=True, env=os.environ, + cwd=os.path.dirname(os.path.realpath(__file__))) + + # Generated proto directories dont include __init__.py, but + # these are needed for python package resolution + for walk_root, _, _ in os.walk(os.path.join(self.outdir, 'protos')): + path = os.path.join(walk_root, '__init__.py') + open(path, 'a').close() + + sys.path.insert(0, self.outdir) + + import protos.payload.test_payload_pb2 as payload_pb2 + import protos.requests.r.test_requests_pb2 as request_pb2 + import protos.responses.test_responses_pb2 as response_pb2 + import protos.service.test_service_pb2 as service_pb2 + self._payload_pb2 = payload_pb2 + self._request_pb2 = request_pb2 + self._response_pb2 = response_pb2 + self._service_pb2 = service_pb2 + + def tearDown(self): + try: + shutil.rmtree(self.outdir) + except OSError as exc: + if exc.errno != errno.ENOENT: + raise + sys.path.remove(self.outdir) + + def testImportAttributes(self): + # check that we can access the generated module and its members. + self.assertIsNotNone( + getattr(self._service_pb2, STUB_IDENTIFIER, None)) + self.assertIsNotNone( + getattr(self._service_pb2, SERVICER_IDENTIFIER, None)) + self.assertIsNotNone( + getattr(self._service_pb2, ADD_SERVICER_TO_SERVER_IDENTIFIER, None)) + + def testUpDown(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + self.assertIsNotNone(service.servicer_methods) + self.assertIsNotNone(service.server) + self.assertIsNotNone(service.stub) + + def testIncompleteServicer(self): + service = _CreateIncompleteService(self._service_pb2) + request = self._request_pb2.SimpleRequest(response_size=13) + with self.assertRaises(grpc.RpcError) as exception_context: + service.stub.UnaryCall(request) + self.assertIs( + exception_context.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + + def testUnaryCall(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + request = self._request_pb2.SimpleRequest(response_size=13) + response = service.stub.UnaryCall(request) + expected_response = service.servicer_methods.UnaryCall( + request, 'not a real context!') + self.assertEqual(expected_response, response) + + def testUnaryCallFuture(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + request = self._request_pb2.SimpleRequest(response_size=13) + # Check that the call does not block waiting for the server to respond. + with service.servicer_methods.pause(): + response_future = service.stub.UnaryCall.future(request) + response = response_future.result() + expected_response = service.servicer_methods.UnaryCall( + request, 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testUnaryCallFutureExpired(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + request = self._request_pb2.SimpleRequest(response_size=13) + with service.servicer_methods.pause(): + response_future = service.stub.UnaryCall.future( + request, timeout=test_constants.SHORT_TIMEOUT) + with self.assertRaises(grpc.RpcError) as exception_context: + response_future.result() + self.assertIs( + exception_context.exception.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + + def testUnaryCallFutureCancelled(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + request = self._request_pb2.SimpleRequest(response_size=13) + with service.servicer_methods.pause(): + response_future = service.stub.UnaryCall.future(request) + response_future.cancel() + self.assertTrue(response_future.cancelled()) + self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED) + + def testUnaryCallFutureFailed(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + request = self._request_pb2.SimpleRequest(response_size=13) + with service.servicer_methods.fail(): + response_future = service.stub.UnaryCall.future(request) + self.assertIsNotNone(response_future.exception()) + self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN) + + def testStreamingOutputCall(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + request = _streaming_output_request(self._request_pb2) + responses = service.stub.StreamingOutputCall(request) + expected_responses = service.servicer_methods.StreamingOutputCall( + request, 'not a real RpcContext!') + for expected_response, response in moves.zip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + def testStreamingOutputCallExpired(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + request = _streaming_output_request(self._request_pb2) + with service.servicer_methods.pause(): + responses = service.stub.StreamingOutputCall( + request, timeout=test_constants.SHORT_TIMEOUT) + with self.assertRaises(grpc.RpcError) as exception_context: + list(responses) + self.assertIs( + exception_context.exception.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + + def testStreamingOutputCallCancelled(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + request = _streaming_output_request(self._request_pb2) + responses = service.stub.StreamingOutputCall(request) + next(responses) + responses.cancel() + with self.assertRaises(grpc.RpcError) as exception_context: + next(responses) + self.assertIs(responses.code(), grpc.StatusCode.CANCELLED) + + def testStreamingOutputCallFailed(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + request = _streaming_output_request(self._request_pb2) + with service.servicer_methods.fail(): + responses = service.stub.StreamingOutputCall(request) + self.assertIsNotNone(responses) + with self.assertRaises(grpc.RpcError) as exception_context: + next(responses) + self.assertIs(exception_context.exception.code(), grpc.StatusCode.UNKNOWN) + + def testStreamingInputCall(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + response = service.stub.StreamingInputCall( + _streaming_input_request_iterator( + self._request_pb2, self._payload_pb2)) + expected_response = service.servicer_methods.StreamingInputCall( + _streaming_input_request_iterator(self._request_pb2, self._payload_pb2), + 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testStreamingInputCallFuture(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + with service.servicer_methods.pause(): + response_future = service.stub.StreamingInputCall.future( + _streaming_input_request_iterator( + self._request_pb2, self._payload_pb2)) + response = response_future.result() + expected_response = service.servicer_methods.StreamingInputCall( + _streaming_input_request_iterator(self._request_pb2, self._payload_pb2), + 'not a real RpcContext!') + self.assertEqual(expected_response, response) + + def testStreamingInputCallFutureExpired(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + with service.servicer_methods.pause(): + response_future = service.stub.StreamingInputCall.future( + _streaming_input_request_iterator( + self._request_pb2, self._payload_pb2), + timeout=test_constants.SHORT_TIMEOUT) + with self.assertRaises(grpc.RpcError) as exception_context: + response_future.result() + self.assertIsInstance(response_future.exception(), grpc.RpcError) + self.assertIs( + response_future.exception().code(), grpc.StatusCode.DEADLINE_EXCEEDED) + self.assertIs( + exception_context.exception.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + + def testStreamingInputCallFutureCancelled(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + with service.servicer_methods.pause(): + response_future = service.stub.StreamingInputCall.future( + _streaming_input_request_iterator( + self._request_pb2, self._payload_pb2)) + response_future.cancel() + self.assertTrue(response_future.cancelled()) + with self.assertRaises(grpc.FutureCancelledError): + response_future.result() + + def testStreamingInputCallFutureFailed(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + with service.servicer_methods.fail(): + response_future = service.stub.StreamingInputCall.future( + _streaming_input_request_iterator( + self._request_pb2, self._payload_pb2)) + self.assertIsNotNone(response_future.exception()) + self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN) + + def testFullDuplexCall(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + responses = service.stub.FullDuplexCall( + _full_duplex_request_iterator(self._request_pb2)) + expected_responses = service.servicer_methods.FullDuplexCall( + _full_duplex_request_iterator(self._request_pb2), + 'not a real RpcContext!') + for expected_response, response in moves.zip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + def testFullDuplexCallExpired(self): + request_iterator = _full_duplex_request_iterator(self._request_pb2) + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + with service.servicer_methods.pause(): + responses = service.stub.FullDuplexCall( + request_iterator, timeout=test_constants.SHORT_TIMEOUT) + with self.assertRaises(grpc.RpcError) as exception_context: + list(responses) + self.assertIs( + exception_context.exception.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + + def testFullDuplexCallCancelled(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + request_iterator = _full_duplex_request_iterator(self._request_pb2) + responses = service.stub.FullDuplexCall(request_iterator) + next(responses) + responses.cancel() + with self.assertRaises(grpc.RpcError) as exception_context: + next(responses) + self.assertIs( + exception_context.exception.code(), grpc.StatusCode.CANCELLED) + + def testFullDuplexCallFailed(self): + request_iterator = _full_duplex_request_iterator(self._request_pb2) + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + with service.servicer_methods.fail(): + responses = service.stub.FullDuplexCall(request_iterator) + with self.assertRaises(grpc.RpcError) as exception_context: + next(responses) + self.assertIs(exception_context.exception.code(), grpc.StatusCode.UNKNOWN) + + def testHalfDuplexCall(self): + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + def half_duplex_request_iterator(): + request = self._request_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + request = self._request_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=2, interval_us=0) + request.response_parameters.add(size=3, interval_us=0) + yield request + responses = service.stub.HalfDuplexCall(half_duplex_request_iterator()) + expected_responses = service.servicer_methods.HalfDuplexCall( + half_duplex_request_iterator(), 'not a real RpcContext!') + for expected_response, response in moves.zip_longest( + expected_responses, responses): + self.assertEqual(expected_response, response) + + def testHalfDuplexCallWedged(self): + condition = threading.Condition() + wait_cell = [False] + @contextlib.contextmanager + def wait(): # pylint: disable=invalid-name + # Where's Python 3's 'nonlocal' statement when you need it? + with condition: + wait_cell[0] = True + yield + with condition: + wait_cell[0] = False + condition.notify_all() + def half_duplex_request_iterator(): + request = self._request_pb2.StreamingOutputCallRequest() + request.response_parameters.add(size=1, interval_us=0) + yield request + with condition: + while wait_cell[0]: + condition.wait() + service = _CreateService( + self._service_pb2, self._response_pb2, self._payload_pb2) + with wait(): + responses = service.stub.HalfDuplexCall( + half_duplex_request_iterator(), timeout=test_constants.SHORT_TIMEOUT) + # half-duplex waits for the client to send all info + with self.assertRaises(grpc.RpcError) as exception_context: + next(responses) + self.assertIs( + exception_context.exception.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py index 1b100bb168..080281415d 100644 --- a/src/python/grpcio/tests/qps/benchmark_client.py +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -30,11 +30,13 @@ """Defines test client behaviors (UNARY/STREAMING) (SYNC/ASYNC).""" import abc +import threading import time from concurrent import futures from six.moves import queue +import grpc from grpc.beta import implementations from grpc.framework.interfaces.face import face from src.proto.grpc.testing import messages_pb2 @@ -62,6 +64,13 @@ class BenchmarkClient: else: channel = implementations.insecure_channel(host, port) + connected_event = threading.Event() + def wait_for_ready(connectivity): + if connectivity == grpc.ChannelConnectivity.READY: + connected_event.set() + channel.subscribe(wait_for_ready, try_to_connect=True) + connected_event.wait() + if config.payload_config.WhichOneof('payload') == 'simple_params': self._generic = False self._stub = services_pb2.beta_create_BenchmarkService_stub(channel) diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json index 53b2998b78..8e509621a8 100644 --- a/src/python/grpcio/tests/tests.json +++ b/src/python/grpcio/tests/tests.json @@ -1,10 +1,9 @@ [ + "_api_test.AllTest", + "_api_test.ChannelConnectivityTest", + "_api_test.ChannelTest", "_auth_test.AccessTokenCallCredentialsTest", "_auth_test.GoogleCallCredentialsTest", - "_base_interface_test.AsyncEasyTest", - "_base_interface_test.AsyncPeasyTest", - "_base_interface_test.SyncEasyTest", - "_base_interface_test.SyncPeasyTest", "_beta_features_test.BetaFeaturesTest", "_beta_features_test.ContextManagementAndLifecycleTest", "_cancel_many_calls_test.CancelManyCallsTest", @@ -12,22 +11,7 @@ "_channel_ready_future_test.ChannelReadyFutureTest", "_channel_test.ChannelTest", "_connectivity_channel_test.ChannelConnectivityTest", - "_core_over_links_base_interface_test.AsyncEasyTest", - "_core_over_links_base_interface_test.AsyncPeasyTest", - "_core_over_links_base_interface_test.SyncEasyTest", - "_core_over_links_base_interface_test.SyncPeasyTest", - "_crust_over_core_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", - "_crust_over_core_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", - "_crust_over_core_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", - "_crust_over_core_over_links_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_over_links_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", - "_crust_over_core_over_links_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_over_links_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", - "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", + "_connectivity_channel_test.ConnectivityStatesTest", "_empty_message_test.EmptyMessageTest", "_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", "_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", @@ -39,22 +23,15 @@ "_implementations_test.CallCredentialsTest", "_implementations_test.ChannelCredentialsTest", "_insecure_interop_test.InsecureInteropTest", - "_intermediary_low_test.CancellationTest", - "_intermediary_low_test.EchoTest", - "_intermediary_low_test.ExpirationTest", - "_intermediary_low_test.LonelyClientTest", - "_later_test.LaterTest", "_logging_pool_test.LoggingPoolTest", - "_lonely_invocation_link_test.LonelyInvocationLinkTest", - "_low_test.HangingServerShutdown", - "_low_test.InsecureServerInsecureClient", + "_metadata_test.MetadataTest", "_not_found_test.NotFoundTest", + "_python_plugin_test.PythonPluginTest", "_read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest", "_rpc_test.RPCTest", "_sanity_test.Sanity", "_secure_interop_test.SecureInteropTest", - "_transmission_test.RoundTripTest", - "_transmission_test.TransmissionTest", + "_thread_cleanup_test.CleanupThreadTest", "_utilities_test.ChannelConnectivityTest", "beta_python_plugin_test.PythonPluginTest", "cygrpc_test.InsecureServerInsecureClient", diff --git a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py b/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py deleted file mode 100644 index 09ebdeff33..0000000000 --- a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py +++ /dev/null @@ -1,429 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests for the old '_low'.""" - -import threading -import time -import unittest - -import six -from six.moves import queue - -from grpc._adapter import _intermediary_low as _low - -_STREAM_LENGTH = 300 -_TIMEOUT = 5 -_AFTER_DELAY = 2 -_FUTURE = time.time() + 60 * 60 * 24 -_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200 -_BYTE_SEQUENCE_SEQUENCE = tuple( - bytes(bytearray((row + column) % 256 for column in range(row))) - for row in range(_STREAM_LENGTH)) - - -class LonelyClientTest(unittest.TestCase): - - def testLonelyClient(self): - host = 'nosuchhostexists' - port = 54321 - method = 'test method' - deadline = time.time() + _TIMEOUT - after_deadline = deadline + _AFTER_DELAY - metadata_tag = object() - finish_tag = object() - - completion_queue = _low.CompletionQueue() - channel = _low.Channel('%s:%d' % (host, port), None) - client_call = _low.Call(channel, completion_queue, method, host, deadline) - - client_call.invoke(completion_queue, metadata_tag, finish_tag) - first_event = completion_queue.get(after_deadline) - self.assertIsNotNone(first_event) - second_event = completion_queue.get(after_deadline) - self.assertIsNotNone(second_event) - kinds = [event.kind for event in (first_event, second_event)] - six.assertCountEqual(self, - (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH), - kinds) - - self.assertIsNone(completion_queue.get(after_deadline)) - - completion_queue.stop() - stop_event = completion_queue.get(_FUTURE) - self.assertEqual(_low.Event.Kind.STOP, stop_event.kind) - - del client_call - del channel - del completion_queue - - -def _drive_completion_queue(completion_queue, event_queue): - while True: - event = completion_queue.get(_FUTURE) - if event.kind is _low.Event.Kind.STOP: - break - event_queue.put(event) - - -class EchoTest(unittest.TestCase): - - def setUp(self): - self.host = 'localhost' - - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue) - port = self.server.add_http2_addr('[::]:0') - self.server.start() - self.server_events = queue.Queue() - self.server_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.server_completion_queue, self.server_events)) - self.server_completion_queue_thread.start() - - self.client_completion_queue = _low.CompletionQueue() - self.channel = _low.Channel('%s:%d' % (self.host, port), None) - self.client_events = queue.Queue() - self.client_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.client_completion_queue, self.client_events)) - self.client_completion_queue_thread.start() - - def tearDown(self): - self.server.stop() - self.server.cancel_all_calls() - self.server_completion_queue.stop() - self.client_completion_queue.stop() - self.server_completion_queue_thread.join() - self.client_completion_queue_thread.join() - del self.server - - def _perform_echo_test(self, test_data): - method = 'test method' - details = 'test details' - server_leading_metadata_key = 'my_server_leading_key' - server_leading_metadata_value = 'my_server_leading_value' - server_trailing_metadata_key = 'my_server_trailing_key' - server_trailing_metadata_value = 'my_server_trailing_value' - client_metadata_key = 'my_client_key' - client_metadata_value = 'my_client_value' - server_leading_binary_metadata_key = 'my_server_leading_key-bin' - server_leading_binary_metadata_value = b'\0'*2047 - server_trailing_binary_metadata_key = 'my_server_trailing_key-bin' - server_trailing_binary_metadata_value = b'\0'*2047 - client_binary_metadata_key = 'my_client_key-bin' - client_binary_metadata_value = b'\0'*2047 - deadline = _FUTURE - metadata_tag = object() - finish_tag = object() - write_tag = object() - complete_tag = object() - service_tag = object() - read_tag = object() - status_tag = object() - - server_data = [] - client_data = [] - - client_call = _low.Call(self.channel, self.client_completion_queue, - method, self.host, deadline) - client_call.add_metadata(client_metadata_key, client_metadata_value) - client_call.add_metadata(client_binary_metadata_key, - client_binary_metadata_value) - - client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) - - self.server.service(service_tag) - service_accepted = self.server_events.get() - self.assertIsNotNone(service_accepted) - self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED) - self.assertIs(service_accepted.tag, service_tag) - self.assertEqual(method.encode(), service_accepted.service_acceptance.method) - self.assertEqual(self.host.encode(), service_accepted.service_acceptance.host) - self.assertIsNotNone(service_accepted.service_acceptance.call) - metadata = dict(service_accepted.metadata) - self.assertIn(client_metadata_key.encode(), metadata) - self.assertEqual(client_metadata_value.encode(), metadata[client_metadata_key.encode()]) - self.assertIn(client_binary_metadata_key.encode(), metadata) - self.assertEqual(client_binary_metadata_value, - metadata[client_binary_metadata_key.encode()]) - server_call = service_accepted.service_acceptance.call - server_call.accept(self.server_completion_queue, finish_tag) - server_call.add_metadata(server_leading_metadata_key, - server_leading_metadata_value) - server_call.add_metadata(server_leading_binary_metadata_key, - server_leading_binary_metadata_value) - server_call.premetadata() - - metadata_accepted = self.client_events.get() - self.assertIsNotNone(metadata_accepted) - self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind) - self.assertEqual(metadata_tag, metadata_accepted.tag) - metadata = dict(metadata_accepted.metadata) - self.assertIn(server_leading_metadata_key.encode(), metadata) - self.assertEqual(server_leading_metadata_value.encode(), - metadata[server_leading_metadata_key.encode()]) - self.assertIn(server_leading_binary_metadata_key.encode(), metadata) - self.assertEqual(server_leading_binary_metadata_value, - metadata[server_leading_binary_metadata_key.encode()]) - - for datum in test_data: - client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS) - write_accepted = self.client_events.get() - self.assertIsNotNone(write_accepted) - self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED) - self.assertIs(write_accepted.tag, write_tag) - self.assertIs(write_accepted.write_accepted, True) - - server_call.read(read_tag) - read_accepted = self.server_events.get() - self.assertIsNotNone(read_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNotNone(read_accepted.bytes) - server_data.append(read_accepted.bytes) - - server_call.write(read_accepted.bytes, write_tag, 0) - write_accepted = self.server_events.get() - self.assertIsNotNone(write_accepted) - self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind) - self.assertEqual(write_tag, write_accepted.tag) - self.assertTrue(write_accepted.write_accepted) - - client_call.read(read_tag) - read_accepted = self.client_events.get() - self.assertIsNotNone(read_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNotNone(read_accepted.bytes) - client_data.append(read_accepted.bytes) - - client_call.complete(complete_tag) - complete_accepted = self.client_events.get() - self.assertIsNotNone(complete_accepted) - self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED) - self.assertIs(complete_accepted.tag, complete_tag) - self.assertIs(complete_accepted.complete_accepted, True) - - server_call.read(read_tag) - read_accepted = self.server_events.get() - self.assertIsNotNone(read_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNone(read_accepted.bytes) - - server_call.add_metadata(server_trailing_metadata_key, - server_trailing_metadata_value) - server_call.add_metadata(server_trailing_binary_metadata_key, - server_trailing_binary_metadata_value) - - server_call.status(_low.Status(_low.Code.OK, details), status_tag) - server_terminal_event_one = self.server_events.get() - server_terminal_event_two = self.server_events.get() - if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED: - status_accepted = server_terminal_event_one - rpc_accepted = server_terminal_event_two - else: - status_accepted = server_terminal_event_two - rpc_accepted = server_terminal_event_one - self.assertIsNotNone(status_accepted) - self.assertIsNotNone(rpc_accepted) - self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind) - self.assertEqual(status_tag, status_accepted.tag) - self.assertTrue(status_accepted.complete_accepted) - self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) - self.assertEqual(finish_tag, rpc_accepted.tag) - self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status) - - client_call.read(read_tag) - client_terminal_event_one = self.client_events.get() - client_terminal_event_two = self.client_events.get() - if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: - read_accepted = client_terminal_event_one - finish_accepted = client_terminal_event_two - else: - read_accepted = client_terminal_event_two - finish_accepted = client_terminal_event_one - self.assertIsNotNone(read_accepted) - self.assertIsNotNone(finish_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertEqual(read_tag, read_accepted.tag) - self.assertIsNone(read_accepted.bytes) - self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind) - self.assertEqual(finish_tag, finish_accepted.tag) - self.assertEqual(_low.Status(_low.Code.OK, details.encode()), finish_accepted.status) - metadata = dict(finish_accepted.metadata) - self.assertIn(server_trailing_metadata_key.encode(), metadata) - self.assertEqual(server_trailing_metadata_value.encode(), - metadata[server_trailing_metadata_key.encode()]) - self.assertIn(server_trailing_binary_metadata_key.encode(), metadata) - self.assertEqual(server_trailing_binary_metadata_value, - metadata[server_trailing_binary_metadata_key.encode()]) - self.assertSetEqual(set(key for key, _ in finish_accepted.metadata), - set((server_trailing_metadata_key.encode(), - server_trailing_binary_metadata_key.encode(),))) - - self.assertSequenceEqual(test_data, server_data) - self.assertSequenceEqual(test_data, client_data) - - def testNoEcho(self): - self._perform_echo_test(()) - - def testOneByteEcho(self): - self._perform_echo_test([b'\x07']) - - def testOneManyByteEcho(self): - self._perform_echo_test([_BYTE_SEQUENCE]) - - def testManyOneByteEchoes(self): - self._perform_echo_test( - [_BYTE_SEQUENCE[i:i+1] for i in range(len(_BYTE_SEQUENCE))]) - - def testManyManyByteEchoes(self): - self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE) - - -class CancellationTest(unittest.TestCase): - - def setUp(self): - self.host = 'localhost' - - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue) - port = self.server.add_http2_addr('[::]:0') - self.server.start() - self.server_events = queue.Queue() - self.server_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.server_completion_queue, self.server_events)) - self.server_completion_queue_thread.start() - - self.client_completion_queue = _low.CompletionQueue() - self.channel = _low.Channel('%s:%d' % (self.host, port), None) - self.client_events = queue.Queue() - self.client_completion_queue_thread = threading.Thread( - target=_drive_completion_queue, - args=(self.client_completion_queue, self.client_events)) - self.client_completion_queue_thread.start() - - def tearDown(self): - self.server.stop() - self.server.cancel_all_calls() - self.server_completion_queue.stop() - self.client_completion_queue.stop() - self.server_completion_queue_thread.join() - self.client_completion_queue_thread.join() - del self.server - - def testCancellation(self): - method = 'test method' - deadline = _FUTURE - metadata_tag = object() - finish_tag = object() - write_tag = object() - service_tag = object() - read_tag = object() - test_data = _BYTE_SEQUENCE_SEQUENCE - - server_data = [] - client_data = [] - - client_call = _low.Call(self.channel, self.client_completion_queue, - method, self.host, deadline) - - client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) - - self.server.service(service_tag) - service_accepted = self.server_events.get() - server_call = service_accepted.service_acceptance.call - - server_call.accept(self.server_completion_queue, finish_tag) - server_call.premetadata() - - metadata_accepted = self.client_events.get() - self.assertIsNotNone(metadata_accepted) - - for datum in test_data: - client_call.write(datum, write_tag, 0) - write_accepted = self.client_events.get() - - server_call.read(read_tag) - read_accepted = self.server_events.get() - server_data.append(read_accepted.bytes) - - server_call.write(read_accepted.bytes, write_tag, 0) - write_accepted = self.server_events.get() - self.assertIsNotNone(write_accepted) - - client_call.read(read_tag) - read_accepted = self.client_events.get() - client_data.append(read_accepted.bytes) - - client_call.cancel() - # cancel() is idempotent. - client_call.cancel() - client_call.cancel() - client_call.cancel() - - server_call.read(read_tag) - - server_terminal_event_one = self.server_events.get() - server_terminal_event_two = self.server_events.get() - if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED: - read_accepted = server_terminal_event_one - rpc_accepted = server_terminal_event_two - else: - read_accepted = server_terminal_event_two - rpc_accepted = server_terminal_event_one - self.assertIsNotNone(read_accepted) - self.assertIsNotNone(rpc_accepted) - self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind) - self.assertIsNone(read_accepted.bytes) - self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind) - self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status) - - finish_event = self.client_events.get() - self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind) - self.assertEqual(_low.Status(_low.Code.CANCELLED, b'Cancelled'), - finish_event.status) - - self.assertSequenceEqual(test_data, server_data) - self.assertSequenceEqual(test_data, client_data) - - -class ExpirationTest(unittest.TestCase): - - @unittest.skip('TODO(nathaniel): Expiration test!') - def testExpiration(self): - pass - - -if __name__ == '__main__': - unittest.main(verbosity=2) - diff --git a/src/python/grpcio/tests/unit/_adapter/_low_test.py b/src/python/grpcio/tests/unit/_adapter/_low_test.py deleted file mode 100644 index e09a1f2564..0000000000 --- a/src/python/grpcio/tests/unit/_adapter/_low_test.py +++ /dev/null @@ -1,319 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import threading -import time -import unittest - -from grpc import _grpcio_metadata -from grpc._adapter import _types -from grpc._adapter import _low -from tests.unit import test_common - - -def wait_for_events(completion_queues, deadline): - """ - Args: - completion_queues: list of completion queues to wait for events on - deadline: absolute deadline to wait until - - Returns: - a sequence of events of length len(completion_queues). - """ - - results = [None] * len(completion_queues) - lock = threading.Lock() - threads = [] - def set_ith_result(i, completion_queue): - result = completion_queue.next(deadline) - with lock: - results[i] = result - for i, completion_queue in enumerate(completion_queues): - thread = threading.Thread(target=set_ith_result, - args=[i, completion_queue]) - thread.start() - threads.append(thread) - for thread in threads: - thread.join() - return results - - -class InsecureServerInsecureClient(unittest.TestCase): - - def setUp(self): - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue, []) - self.port = self.server.add_http2_port('[::]:0') - self.client_completion_queue = _low.CompletionQueue() - self.client_channel = _low.Channel('localhost:%d'%self.port, []) - - self.server.start() - - def tearDown(self): - self.server.shutdown() - del self.client_channel - - self.client_completion_queue.shutdown() - while (self.client_completion_queue.next(float('+inf')).type != - _types.EventType.QUEUE_SHUTDOWN): - pass - self.server_completion_queue.shutdown() - while (self.server_completion_queue.next(float('+inf')).type != - _types.EventType.QUEUE_SHUTDOWN): - pass - - del self.client_completion_queue - del self.server_completion_queue - del self.server - - def testEcho(self): - deadline = time.time() + 5 - event_time_tolerance = 2 - deadline_tolerance = 0.25 - client_metadata_ascii_key = 'key' - client_metadata_ascii_value = 'val' - client_metadata_bin_key = 'key-bin' - client_metadata_bin_value = b'\0'*1000 - server_initial_metadata_key = 'init_me_me_me' - server_initial_metadata_value = 'whodawha?' - server_trailing_metadata_key = 'california_is_in_a_drought' - server_trailing_metadata_value = 'zomg it is' - server_status_code = _types.StatusCode.OK - server_status_details = 'our work is never over' - request = 'blarghaflargh' - response = 'his name is robert paulson' - method = 'twinkies' - host = 'hostess' - server_request_tag = object() - request_call_result = self.server.request_call(self.server_completion_queue, - server_request_tag) - - self.assertEqual(_types.CallError.OK, request_call_result) - - client_call_tag = object() - client_call = self.client_channel.create_call( - self.client_completion_queue, method, host, deadline) - client_initial_metadata = [ - (client_metadata_ascii_key, client_metadata_ascii_value), - (client_metadata_bin_key, client_metadata_bin_value) - ] - client_start_batch_result = client_call.start_batch([ - _types.OpArgs.send_initial_metadata(client_initial_metadata), - _types.OpArgs.send_message(request, 0), - _types.OpArgs.send_close_from_client(), - _types.OpArgs.recv_initial_metadata(), - _types.OpArgs.recv_message(), - _types.OpArgs.recv_status_on_client() - ], client_call_tag) - self.assertEqual(_types.CallError.OK, client_start_batch_result) - - client_no_event, request_event, = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - self.assertEqual(client_no_event, None) - self.assertEqual(_types.EventType.OP_COMPLETE, request_event.type) - self.assertIsInstance(request_event.call, _low.Call) - self.assertIs(server_request_tag, request_event.tag) - self.assertEqual(1, len(request_event.results)) - received_initial_metadata = request_event.results[0].initial_metadata - # Check that our metadata were transmitted - self.assertTrue(test_common.metadata_transmitted(client_initial_metadata, - received_initial_metadata)) - # Check that Python's user agent string is a part of the full user agent - # string - received_initial_metadata_dict = dict(received_initial_metadata) - self.assertIn(b'user-agent', received_initial_metadata_dict) - self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__).encode(), - received_initial_metadata_dict[b'user-agent']) - self.assertEqual(method.encode(), request_event.call_details.method) - self.assertEqual(host.encode(), request_event.call_details.host) - self.assertLess(abs(deadline - request_event.call_details.deadline), - deadline_tolerance) - - # Check that the channel is connected, and that both it and the call have - # the proper target and peer; do this after the first flurry of messages to - # avoid the possibility that connection was delayed by the core until the - # first message was sent. - self.assertEqual(_types.ConnectivityState.READY, - self.client_channel.check_connectivity_state(False)) - self.assertIsNotNone(self.client_channel.target()) - self.assertIsNotNone(client_call.peer()) - - server_call_tag = object() - server_call = request_event.call - server_initial_metadata = [ - (server_initial_metadata_key, server_initial_metadata_value) - ] - server_trailing_metadata = [ - (server_trailing_metadata_key, server_trailing_metadata_value) - ] - server_start_batch_result = server_call.start_batch([ - _types.OpArgs.send_initial_metadata(server_initial_metadata), - _types.OpArgs.recv_message(), - _types.OpArgs.send_message(response, 0), - _types.OpArgs.recv_close_on_server(), - _types.OpArgs.send_status_from_server( - server_trailing_metadata, server_status_code, server_status_details) - ], server_call_tag) - self.assertEqual(_types.CallError.OK, server_start_batch_result) - - client_event, server_event, = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - - self.assertEqual(6, len(client_event.results)) - found_client_op_types = set() - for client_result in client_event.results: - # we expect each op type to be unique - self.assertNotIn(client_result.type, found_client_op_types) - found_client_op_types.add(client_result.type) - if client_result.type == _types.OpType.RECV_INITIAL_METADATA: - self.assertTrue( - test_common.metadata_transmitted(server_initial_metadata, - client_result.initial_metadata)) - elif client_result.type == _types.OpType.RECV_MESSAGE: - self.assertEqual(response.encode(), client_result.message) - elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT: - self.assertTrue( - test_common.metadata_transmitted(server_trailing_metadata, - client_result.trailing_metadata)) - self.assertEqual(server_status_details.encode(), client_result.status.details) - self.assertEqual(server_status_code, client_result.status.code) - self.assertEqual(set([ - _types.OpType.SEND_INITIAL_METADATA, - _types.OpType.SEND_MESSAGE, - _types.OpType.SEND_CLOSE_FROM_CLIENT, - _types.OpType.RECV_INITIAL_METADATA, - _types.OpType.RECV_MESSAGE, - _types.OpType.RECV_STATUS_ON_CLIENT - ]), found_client_op_types) - - self.assertEqual(5, len(server_event.results)) - found_server_op_types = set() - for server_result in server_event.results: - self.assertNotIn(client_result.type, found_server_op_types) - found_server_op_types.add(server_result.type) - if server_result.type == _types.OpType.RECV_MESSAGE: - self.assertEqual(request.encode(), server_result.message) - elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER: - self.assertFalse(server_result.cancelled) - self.assertEqual(set([ - _types.OpType.SEND_INITIAL_METADATA, - _types.OpType.RECV_MESSAGE, - _types.OpType.SEND_MESSAGE, - _types.OpType.RECV_CLOSE_ON_SERVER, - _types.OpType.SEND_STATUS_FROM_SERVER - ]), found_server_op_types) - - del client_call - del server_call - - -class HangingServerShutdown(unittest.TestCase): - - def setUp(self): - self.server_completion_queue = _low.CompletionQueue() - self.server = _low.Server(self.server_completion_queue, []) - self.port = self.server.add_http2_port('[::]:0') - self.client_completion_queue = _low.CompletionQueue() - self.client_channel = _low.Channel('localhost:%d'%self.port, []) - - self.server.start() - - def tearDown(self): - self.server.shutdown() - del self.client_channel - - self.client_completion_queue.shutdown() - self.server_completion_queue.shutdown() - while True: - client_event, server_event = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - float("+inf")) - if (client_event.type == _types.EventType.QUEUE_SHUTDOWN and - server_event.type == _types.EventType.QUEUE_SHUTDOWN): - break - - del self.client_completion_queue - del self.server_completion_queue - del self.server - - def testHangingServerCall(self): - deadline = time.time() + 5 - deadline_tolerance = 0.25 - event_time_tolerance = 2 - cancel_all_calls_time_tolerance = 0.5 - request = 'blarghaflargh' - method = 'twinkies' - host = 'hostess' - server_request_tag = object() - request_call_result = self.server.request_call(self.server_completion_queue, - server_request_tag) - - client_call_tag = object() - client_call = self.client_channel.create_call(self.client_completion_queue, - method, host, deadline) - client_start_batch_result = client_call.start_batch([ - _types.OpArgs.send_initial_metadata([]), - _types.OpArgs.send_message(request, 0), - _types.OpArgs.send_close_from_client(), - _types.OpArgs.recv_initial_metadata(), - _types.OpArgs.recv_message(), - _types.OpArgs.recv_status_on_client() - ], client_call_tag) - - client_no_event, request_event, = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - - # Now try to shutdown the server and expect that we see server shutdown - # almost immediately after calling cancel_all_calls. - - # First attempt to cancel all calls before shutting down, and expect - # our state machine to catch the erroneous API use. - with self.assertRaises(RuntimeError): - self.server.cancel_all_calls() - - shutdown_tag = object() - self.server.shutdown(shutdown_tag) - pre_cancel_timestamp = time.time() - self.server.cancel_all_calls() - finish_shutdown_timestamp = None - client_call_event, server_shutdown_event = wait_for_events( - [self.client_completion_queue, self.server_completion_queue], - time.time() + event_time_tolerance) - self.assertIs(shutdown_tag, server_shutdown_event.tag) - self.assertGreater(pre_cancel_timestamp + cancel_all_calls_time_tolerance, - time.time()) - - del client_call - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_api_test.py b/src/python/grpcio/tests/unit/_api_test.py new file mode 100644 index 0000000000..2fe89499f5 --- /dev/null +++ b/src/python/grpcio/tests/unit/_api_test.py @@ -0,0 +1,111 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test of gRPC Python's application-layer API.""" + +import unittest + +import six + +import grpc + +from tests.unit import _from_grpc_import_star + + +class AllTest(unittest.TestCase): + + def testAll(self): + expected_grpc_code_elements = ( + 'FutureTimeoutError', + 'FutureCancelledError', + 'Future', + 'ChannelConnectivity', + 'StatusCode', + 'RpcError', + 'RpcContext', + 'Call', + 'ChannelCredentials', + 'CallCredentials', + 'AuthMetadataContext', + 'AuthMetadataPluginCallback', + 'AuthMetadataPlugin', + 'ServerCredentials', + 'UnaryUnaryMultiCallable', + 'UnaryStreamMultiCallable', + 'StreamUnaryMultiCallable', + 'StreamStreamMultiCallable', + 'Channel', + 'ServicerContext', + 'RpcMethodHandler', + 'HandlerCallDetails', + 'GenericRpcHandler', + 'Server', + 'unary_unary_rpc_method_handler', + 'unary_stream_rpc_method_handler', + 'stream_unary_rpc_method_handler', + 'stream_stream_rpc_method_handler', + 'method_handlers_generic_handler', + 'ssl_channel_credentials', + 'metadata_call_credentials', + 'access_token_call_credentials', + 'composite_call_credentials', + 'composite_channel_credentials', + 'ssl_server_credentials', + 'channel_ready_future', + 'insecure_channel', + 'secure_channel', + 'server', + ) + + six.assertCountEqual( + self, expected_grpc_code_elements, + _from_grpc_import_star.GRPC_ELEMENTS) + + +class ChannelConnectivityTest(unittest.TestCase): + + def testChannelConnectivity(self): + self.assertSequenceEqual( + (grpc.ChannelConnectivity.IDLE, + grpc.ChannelConnectivity.CONNECTING, + grpc.ChannelConnectivity.READY, + grpc.ChannelConnectivity.TRANSIENT_FAILURE, + grpc.ChannelConnectivity.SHUTDOWN,), + tuple(grpc.ChannelConnectivity)) + + +class ChannelTest(unittest.TestCase): + + def test_secure_channel(self): + channel_credentials = grpc.ssl_channel_credentials() + channel = grpc.secure_channel('google.com:443', channel_credentials) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py b/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py deleted file mode 100644 index 2b8981c752..0000000000 --- a/src/python/grpcio/tests/unit/_core_over_links_base_interface_test.py +++ /dev/null @@ -1,157 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests Base interface compliance of the core-over-gRPC-links stack.""" - -import collections -import logging -import random -import time -import unittest - -import six - -from grpc._adapter import _intermediary_low -from grpc._links import invocation -from grpc._links import service -from grpc.beta import interfaces as beta_interfaces -from grpc.framework.core import implementations -from grpc.framework.interfaces.base import utilities -from tests.unit import test_common as grpc_test_common -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.base import test_cases -from tests.unit.framework.interfaces.base import test_interfaces - - -class _SerializationBehaviors( - collections.namedtuple( - '_SerializationBehaviors', - ('request_serializers', 'request_deserializers', 'response_serializers', - 'response_deserializers',))): - pass - - -class _Links( - collections.namedtuple( - '_Links', - ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link', - 'service_end_link'))): - pass - - -def _serialization_behaviors_from_serializations(serializations): - request_serializers = {} - request_deserializers = {} - response_serializers = {} - response_deserializers = {} - for (group, method), serialization in six.iteritems(serializations): - request_serializers[group, method] = serialization.serialize_request - request_deserializers[group, method] = serialization.deserialize_request - response_serializers[group, method] = serialization.serialize_response - response_deserializers[group, method] = serialization.deserialize_response - return _SerializationBehaviors( - request_serializers, request_deserializers, response_serializers, - response_deserializers) - - -class _Implementation(test_interfaces.Implementation): - - def instantiate(self, serializations, servicer): - serialization_behaviors = _serialization_behaviors_from_serializations( - serializations) - invocation_end_link = implementations.invocation_end_link() - service_end_link = implementations.service_end_link( - servicer, test_constants.DEFAULT_TIMEOUT, - test_constants.MAXIMUM_TIMEOUT) - service_grpc_link = service.service_link( - serialization_behaviors.request_deserializers, - serialization_behaviors.response_serializers) - port = service_grpc_link.add_port('[::]:0', None) - channel = _intermediary_low.Channel('localhost:%d' % port, None) - invocation_grpc_link = invocation.invocation_link( - channel, b'localhost', None, - serialization_behaviors.request_serializers, - serialization_behaviors.response_deserializers) - - invocation_end_link.join_link(invocation_grpc_link) - invocation_grpc_link.join_link(invocation_end_link) - service_end_link.join_link(service_grpc_link) - service_grpc_link.join_link(service_end_link) - invocation_grpc_link.start() - service_grpc_link.start() - return invocation_end_link, service_end_link, ( - invocation_grpc_link, service_grpc_link) - - def destantiate(self, memo): - invocation_grpc_link, service_grpc_link = memo - invocation_grpc_link.stop() - service_grpc_link.begin_stop() - service_grpc_link.end_stop() - - def invocation_initial_metadata(self): - return grpc_test_common.INVOCATION_INITIAL_METADATA - - def service_initial_metadata(self): - return grpc_test_common.SERVICE_INITIAL_METADATA - - def invocation_completion(self): - return utilities.completion(None, None, None) - - def service_completion(self): - return utilities.completion( - grpc_test_common.SERVICE_TERMINAL_METADATA, - beta_interfaces.StatusCode.OK, grpc_test_common.DETAILS) - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return original_metadata is None or grpc_test_common.metadata_transmitted( - original_metadata, transmitted_metadata) - - def completion_transmitted(self, original_completion, transmitted_completion): - if (original_completion.terminal_metadata is not None and - not grpc_test_common.metadata_transmitted( - original_completion.terminal_metadata, - transmitted_completion.terminal_metadata)): - return False - elif original_completion.code is not transmitted_completion.code: - return False - elif original_completion.message != transmitted_completion.message: - return False - else: - return True - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py b/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py deleted file mode 100644 index 50b9a5a824..0000000000 --- a/src/python/grpcio/tests/unit/_crust_over_core_over_links_face_interface_test.py +++ /dev/null @@ -1,163 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests Face compliance of the crust-over-core-over-gRPC-links stack.""" - -import collections -import unittest - -import six - -from grpc._adapter import _intermediary_low -from grpc._links import invocation -from grpc._links import service -from grpc.beta import interfaces as beta_interfaces -from grpc.framework.core import implementations as core_implementations -from grpc.framework.crust import implementations as crust_implementations -from grpc.framework.foundation import logging_pool -from grpc.framework.interfaces.links import utilities -from tests.unit import test_common as grpc_test_common -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.face import test_cases -from tests.unit.framework.interfaces.face import test_interfaces - - -class _SerializationBehaviors( - collections.namedtuple( - '_SerializationBehaviors', - ('request_serializers', 'request_deserializers', 'response_serializers', - 'response_deserializers',))): - pass - - -def _serialization_behaviors_from_test_methods(test_methods): - request_serializers = {} - request_deserializers = {} - response_serializers = {} - response_deserializers = {} - for (group, method), test_method in six.iteritems(test_methods): - request_serializers[group, method] = test_method.serialize_request - request_deserializers[group, method] = test_method.deserialize_request - response_serializers[group, method] = test_method.serialize_response - response_deserializers[group, method] = test_method.deserialize_response - return _SerializationBehaviors( - request_serializers, request_deserializers, response_serializers, - response_deserializers) - - -class _Implementation(test_interfaces.Implementation): - - def instantiate( - self, methods, method_implementations, multi_method_implementation): - pool = logging_pool.pool(test_constants.POOL_SIZE) - servicer = crust_implementations.servicer( - method_implementations, multi_method_implementation, pool) - serialization_behaviors = _serialization_behaviors_from_test_methods( - methods) - invocation_end_link = core_implementations.invocation_end_link() - service_end_link = core_implementations.service_end_link( - servicer, test_constants.DEFAULT_TIMEOUT, - test_constants.MAXIMUM_TIMEOUT) - service_grpc_link = service.service_link( - serialization_behaviors.request_deserializers, - serialization_behaviors.response_serializers) - port = service_grpc_link.add_port('[::]:0', None) - channel = _intermediary_low.Channel('localhost:%d' % port, None) - invocation_grpc_link = invocation.invocation_link( - channel, b'localhost', None, - serialization_behaviors.request_serializers, - serialization_behaviors.response_deserializers) - - invocation_end_link.join_link(invocation_grpc_link) - invocation_grpc_link.join_link(invocation_end_link) - service_grpc_link.join_link(service_end_link) - service_end_link.join_link(service_grpc_link) - service_end_link.start() - invocation_end_link.start() - invocation_grpc_link.start() - service_grpc_link.start() - - generic_stub = crust_implementations.generic_stub(invocation_end_link, pool) - # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. - group = next(iter(methods))[0] - # TODO(nathaniel): Add a "cardinalities_by_group" attribute to - # _digest.TestServiceDigest. - cardinalities = { - method: method_object.cardinality() - for (group, method), method_object in six.iteritems(methods)} - dynamic_stub = crust_implementations.dynamic_stub( - invocation_end_link, group, cardinalities, pool) - - return generic_stub, {group: dynamic_stub}, ( - invocation_end_link, invocation_grpc_link, service_grpc_link, - service_end_link, pool) - - def destantiate(self, memo): - (invocation_end_link, invocation_grpc_link, service_grpc_link, - service_end_link, pool) = memo - invocation_end_link.stop(0).wait() - invocation_grpc_link.stop() - service_grpc_link.begin_stop() - service_end_link.stop(0).wait() - service_grpc_link.end_stop() - invocation_end_link.join_link(utilities.NULL_LINK) - invocation_grpc_link.join_link(utilities.NULL_LINK) - service_grpc_link.join_link(utilities.NULL_LINK) - service_end_link.join_link(utilities.NULL_LINK) - pool.shutdown(wait=True) - - def invocation_metadata(self): - return grpc_test_common.INVOCATION_INITIAL_METADATA - - def initial_metadata(self): - return grpc_test_common.SERVICE_INITIAL_METADATA - - def terminal_metadata(self): - return grpc_test_common.SERVICE_TERMINAL_METADATA - - def code(self): - return beta_interfaces.StatusCode.OK - - def details(self): - return grpc_test_common.DETAILS - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return original_metadata is None or grpc_test_common.metadata_transmitted( - original_metadata, transmitted_metadata) - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_from_grpc_import_star.py b/src/python/grpcio/tests/unit/_from_grpc_import_star.py new file mode 100644 index 0000000000..78d2fb7dc5 --- /dev/null +++ b/src/python/grpcio/tests/unit/_from_grpc_import_star.py @@ -0,0 +1,38 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +_BEFORE_IMPORT = tuple(globals()) + +from grpc import * + +_AFTER_IMPORT = tuple(globals()) + +GRPC_ELEMENTS = tuple( + element for element in _AFTER_IMPORT + if element not in _BEFORE_IMPORT and element != '_BEFORE_IMPORT') diff --git a/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py b/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py deleted file mode 100644 index 890755f81c..0000000000 --- a/src/python/grpcio/tests/unit/_links/_lonely_invocation_link_test.py +++ /dev/null @@ -1,88 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""A test of invocation-side code unconnected to an RPC server.""" - -import unittest - -from grpc._adapter import _intermediary_low -from grpc._links import invocation -from grpc.framework.interfaces.links import links -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.links import test_cases -from tests.unit.framework.interfaces.links import test_utilities - -_NULL_BEHAVIOR = lambda unused_argument: None - - -class LonelyInvocationLinkTest(unittest.TestCase): - - def testUpAndDown(self): - channel = _intermediary_low.Channel('nonexistent:54321', None) - invocation_link = invocation.invocation_link( - channel, 'nonexistent', None, {}, {}) - - invocation_link.start() - invocation_link.stop() - - def _test_lonely_invocation_with_termination(self, termination): - test_operation_id = object() - test_group = 'test package.Test Service' - test_method = 'test method' - invocation_link_mate = test_utilities.RecordingLink() - - channel = _intermediary_low.Channel('nonexistent:54321', None) - invocation_link = invocation.invocation_link( - channel, 'nonexistent', None, {}, {}) - invocation_link.join_link(invocation_link_mate) - invocation_link.start() - - ticket = links.Ticket( - test_operation_id, 0, test_group, test_method, - links.Ticket.Subscription.FULL, test_constants.SHORT_TIMEOUT, 1, None, - None, None, None, None, termination, None) - invocation_link.accept_ticket(ticket) - invocation_link_mate.block_until_tickets_satisfy(test_cases.terminated) - - invocation_link.stop() - - self.assertIsNot( - invocation_link_mate.tickets()[-1].termination, - links.Ticket.Termination.COMPLETION) - - def testLonelyInvocationLinkWithCommencementTicket(self): - self._test_lonely_invocation_with_termination(None) - - def testLonelyInvocationLinkWithEntireTicket(self): - self._test_lonely_invocation_with_termination( - links.Ticket.Termination.COMPLETION) - - -if __name__ == '__main__': - unittest.main() diff --git a/src/python/grpcio/tests/unit/_links/_transmission_test.py b/src/python/grpcio/tests/unit/_links/_transmission_test.py deleted file mode 100644 index 1f6edd18ca..0000000000 --- a/src/python/grpcio/tests/unit/_links/_transmission_test.py +++ /dev/null @@ -1,239 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests transmission of tickets across gRPC-on-the-wire.""" - -import unittest - -from grpc._adapter import _intermediary_low -from grpc._links import invocation -from grpc._links import service -from grpc.beta import interfaces as beta_interfaces -from grpc.framework.interfaces.links import links -from tests.unit import test_common -from tests.unit._links import _proto_scenarios -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.links import test_cases -from tests.unit.framework.interfaces.links import test_utilities - -_IDENTITY = lambda x: x - - -class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase): - - def create_transmitting_links(self): - service_link = service.service_link( - {self.group_and_method(): self.deserialize_request}, - {self.group_and_method(): self.serialize_response}) - port = service_link.add_port('[::]:0', None) - service_link.start() - channel = _intermediary_low.Channel('localhost:%d' % port, None) - invocation_link = invocation.invocation_link( - channel, 'localhost', None, - {self.group_and_method(): self.serialize_request}, - {self.group_and_method(): self.deserialize_response}) - invocation_link.start() - return invocation_link, service_link - - def destroy_transmitting_links(self, invocation_side_link, service_side_link): - invocation_side_link.stop() - service_side_link.begin_stop() - service_side_link.end_stop() - - def create_invocation_initial_metadata(self): - return ( - ('first_invocation_initial_metadata_key', 'just a string value'), - ('second_invocation_initial_metadata_key', '0123456789'), - ('third_invocation_initial_metadata_key-bin', '\x00\x57' * 100), - ) - - def create_invocation_terminal_metadata(self): - return None - - def create_service_initial_metadata(self): - return ( - ('first_service_initial_metadata_key', 'just another string value'), - ('second_service_initial_metadata_key', '9876543210'), - ('third_service_initial_metadata_key-bin', '\x00\x59\x02' * 100), - ) - - def create_service_terminal_metadata(self): - return ( - ('first_service_terminal_metadata_key', 'yet another string value'), - ('second_service_terminal_metadata_key', 'abcdefghij'), - ('third_service_terminal_metadata_key-bin', '\x00\x37' * 100), - ) - - def create_invocation_completion(self): - return None, None - - def create_service_completion(self): - return ( - beta_interfaces.StatusCode.OK, b'An exuberant test "details" message!') - - def assertMetadataTransmitted(self, original_metadata, transmitted_metadata): - self.assertTrue( - test_common.metadata_transmitted( - original_metadata, transmitted_metadata), - '%s erroneously transmitted as %s' % ( - original_metadata, transmitted_metadata)) - - -class RoundTripTest(unittest.TestCase): - - def testZeroMessageRoundTrip(self): - test_operation_id = object() - test_group = 'test package.Test Group' - test_method = 'test method' - identity_transformation = {(test_group, test_method): _IDENTITY} - test_code = beta_interfaces.StatusCode.OK - test_message = 'a test message' - - service_link = service.service_link( - identity_transformation, identity_transformation) - service_mate = test_utilities.RecordingLink() - service_link.join_link(service_mate) - port = service_link.add_port('[::]:0', None) - service_link.start() - channel = _intermediary_low.Channel('localhost:%d' % port, None) - invocation_link = invocation.invocation_link( - channel, None, None, identity_transformation, identity_transformation) - invocation_mate = test_utilities.RecordingLink() - invocation_link.join_link(invocation_mate) - invocation_link.start() - - invocation_ticket = links.Ticket( - test_operation_id, 0, test_group, test_method, - links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None, - None, None, None, None, links.Ticket.Termination.COMPLETION, None) - invocation_link.accept_ticket(invocation_ticket) - service_mate.block_until_tickets_satisfy(test_cases.terminated) - - service_ticket = links.Ticket( - service_mate.tickets()[-1].operation_id, 0, None, None, None, None, - None, None, None, None, test_code, test_message, - links.Ticket.Termination.COMPLETION, None) - service_link.accept_ticket(service_ticket) - invocation_mate.block_until_tickets_satisfy(test_cases.terminated) - - invocation_link.stop() - service_link.begin_stop() - service_link.end_stop() - - self.assertIs( - service_mate.tickets()[-1].termination, - links.Ticket.Termination.COMPLETION) - self.assertIs( - invocation_mate.tickets()[-1].termination, - links.Ticket.Termination.COMPLETION) - self.assertIs(invocation_mate.tickets()[-1].code, test_code) - self.assertEqual(invocation_mate.tickets()[-1].message, test_message.encode()) - - def _perform_scenario_test(self, scenario): - test_operation_id = object() - test_group, test_method = scenario.group_and_method() - test_code = beta_interfaces.StatusCode.OK - test_message = 'a scenario test message' - - service_link = service.service_link( - {(test_group, test_method): scenario.deserialize_request}, - {(test_group, test_method): scenario.serialize_response}) - service_mate = test_utilities.RecordingLink() - service_link.join_link(service_mate) - port = service_link.add_port('[::]:0', None) - service_link.start() - channel = _intermediary_low.Channel('localhost:%d' % port, None) - invocation_link = invocation.invocation_link( - channel, 'localhost', None, - {(test_group, test_method): scenario.serialize_request}, - {(test_group, test_method): scenario.deserialize_response}) - invocation_mate = test_utilities.RecordingLink() - invocation_link.join_link(invocation_mate) - invocation_link.start() - - invocation_ticket = links.Ticket( - test_operation_id, 0, test_group, test_method, - links.Ticket.Subscription.FULL, test_constants.LONG_TIMEOUT, None, None, - None, None, None, None, None, None) - invocation_link.accept_ticket(invocation_ticket) - requests = scenario.requests() - for request_index, request in enumerate(requests): - request_ticket = links.Ticket( - test_operation_id, 1 + request_index, None, None, None, None, 1, None, - request, None, None, None, None, None) - invocation_link.accept_ticket(request_ticket) - service_mate.block_until_tickets_satisfy( - test_cases.at_least_n_payloads_received_predicate(1 + request_index)) - response_ticket = links.Ticket( - service_mate.tickets()[0].operation_id, request_index, None, None, - None, None, 1, None, scenario.response_for_request(request), None, - None, None, None, None) - service_link.accept_ticket(response_ticket) - invocation_mate.block_until_tickets_satisfy( - test_cases.at_least_n_payloads_received_predicate(1 + request_index)) - request_count = len(requests) - invocation_completion_ticket = links.Ticket( - test_operation_id, request_count + 1, None, None, None, None, None, - None, None, None, None, None, links.Ticket.Termination.COMPLETION, - None) - invocation_link.accept_ticket(invocation_completion_ticket) - service_mate.block_until_tickets_satisfy(test_cases.terminated) - service_completion_ticket = links.Ticket( - service_mate.tickets()[0].operation_id, request_count, None, None, None, - None, None, None, None, None, test_code, test_message, - links.Ticket.Termination.COMPLETION, None) - service_link.accept_ticket(service_completion_ticket) - invocation_mate.block_until_tickets_satisfy(test_cases.terminated) - - invocation_link.stop() - service_link.begin_stop() - service_link.end_stop() - - observed_requests = tuple( - ticket.payload for ticket in service_mate.tickets() - if ticket.payload is not None) - observed_responses = tuple( - ticket.payload for ticket in invocation_mate.tickets() - if ticket.payload is not None) - self.assertTrue(scenario.verify_requests(observed_requests)) - self.assertTrue(scenario.verify_responses(observed_responses)) - - def testEmptyScenario(self): - self._perform_scenario_test(_proto_scenarios.EmptyScenario()) - - def testBidirectionallyUnaryScenario(self): - self._perform_scenario_test(_proto_scenarios.BidirectionallyUnaryScenario()) - - def testBidirectionallyStreamingScenario(self): - self._perform_scenario_test( - _proto_scenarios.BidirectionallyStreamingScenario()) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_metadata_test.py b/src/python/grpcio/tests/unit/_metadata_test.py new file mode 100644 index 0000000000..77b3901261 --- /dev/null +++ b/src/python/grpcio/tests/unit/_metadata_test.py @@ -0,0 +1,216 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +"""Tests server and client side metadata API.""" + +import unittest +import weakref + +import grpc +from grpc import _grpcio_metadata +from grpc.framework.foundation import logging_pool + +from tests.unit import test_common +from tests.unit.framework.common import test_constants + +_CHANNEL_ARGS = (('grpc.primary_user_agent', 'primary-agent'), + ('grpc.secondary_user_agent', 'secondary-agent')) + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x00\x00\x00' + +_UNARY_UNARY = b'/test/UnaryUnary' +_UNARY_STREAM = b'/test/UnaryStream' +_STREAM_UNARY = b'/test/StreamUnary' +_STREAM_STREAM = b'/test/StreamStream' + +_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__) + +_CLIENT_METADATA = ( + (b'client-md-key', b'client-md-key'), + (b'client-md-key-bin', b'\x00\x01') +) + +_SERVER_INITIAL_METADATA = ( + (b'server-initial-md-key', b'server-initial-md-value'), + (b'server-initial-md-key-bin', b'\x00\x02') +) + +_SERVER_TRAILING_METADATA = ( + (b'server-trailing-md-key', b'server-trailing-md-value'), + (b'server-trailing-md-key-bin', b'\x00\x03') +) + + +def user_agent(metadata): + for key, val in metadata: + if key == b'user-agent': + return val.decode('ascii') + raise KeyError('No user agent!') + + +def validate_client_metadata(test, servicer_context): + test.assertTrue(test_common.metadata_transmitted( + _CLIENT_METADATA, servicer_context.invocation_metadata())) + test.assertTrue(user_agent(servicer_context.invocation_metadata()) + .startswith('primary-agent ' + _USER_AGENT)) + test.assertTrue(user_agent(servicer_context.invocation_metadata()) + .endswith('secondary-agent')) + + +def handle_unary_unary(test, request, servicer_context): + validate_client_metadata(test, servicer_context) + servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA) + servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + return _RESPONSE + + +def handle_unary_stream(test, request, servicer_context): + validate_client_metadata(test, servicer_context) + servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA) + servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + for _ in range(test_constants.STREAM_LENGTH): + yield _RESPONSE + + +def handle_stream_unary(test, request_iterator, servicer_context): + validate_client_metadata(test, servicer_context) + servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA) + servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + # TODO(issue:#6891) We should be able to remove this loop + for request in request_iterator: + pass + return _RESPONSE + + +def handle_stream_stream(test, request_iterator, servicer_context): + validate_client_metadata(test, servicer_context) + servicer_context.send_initial_metadata(_SERVER_INITIAL_METADATA) + servicer_context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + # TODO(issue:#6891) We should be able to remove this loop, + # and replace with return; yield + for request in request_iterator: + yield _RESPONSE + + +class _MethodHandler(grpc.RpcMethodHandler): + + def __init__(self, test, request_streaming, response_streaming): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + if self.request_streaming and self.response_streaming: + self.stream_stream = lambda x, y: handle_stream_stream(test, x, y) + elif self.request_streaming: + self.stream_unary = lambda x, y: handle_stream_unary(test, x, y) + elif self.response_streaming: + self.unary_stream = lambda x, y: handle_unary_stream(test, x, y) + else: + self.unary_unary = lambda x, y: handle_unary_unary(test, x, y) + + +class _GenericHandler(grpc.GenericRpcHandler): + + def __init__(self, test): + self._test = test + + def service(self, handler_call_details): + if handler_call_details.method == _UNARY_UNARY: + return _MethodHandler(self._test, False, False) + elif handler_call_details.method == _UNARY_STREAM: + return _MethodHandler(self._test, False, True) + elif handler_call_details.method == _STREAM_UNARY: + return _MethodHandler(self._test, True, False) + elif handler_call_details.method == _STREAM_STREAM: + return _MethodHandler(self._test, True, True) + else: + return None + + +class MetadataTest(unittest.TestCase): + + def setUp(self): + self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + self._server = grpc.server((_GenericHandler(weakref.proxy(self)),), + self._server_pool) + port = self._server.add_insecure_port('[::]:0') + self._server.start() + self._channel = grpc.insecure_channel('localhost:%d' % port, + options=_CHANNEL_ARGS) + + def tearDown(self): + self._server.stop(0) + + def testUnaryUnary(self): + multi_callable = self._channel.unary_unary(_UNARY_UNARY) + unused_response, call = multi_callable( + _REQUEST, metadata=_CLIENT_METADATA, with_call=True) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + + def testUnaryStream(self): + multi_callable = self._channel.unary_stream(_UNARY_STREAM) + call = multi_callable(_REQUEST, metadata=_CLIENT_METADATA) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + for _ in call: + pass + self.assertTrue(test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + + def testStreamUnary(self): + multi_callable = self._channel.stream_unary(_STREAM_UNARY) + unused_response, call = multi_callable( + [_REQUEST] * test_constants.STREAM_LENGTH, + metadata=_CLIENT_METADATA, with_call=True) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + + def testStreamStream(self): + multi_callable = self._channel.stream_stream(_STREAM_STREAM) + call = multi_callable([_REQUEST] * test_constants.STREAM_LENGTH, + metadata=_CLIENT_METADATA) + self.assertTrue(test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + for _ in call: + pass + self.assertTrue(test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_rpc_test.py b/src/python/grpcio/tests/unit/_rpc_test.py index b33bff490c..8407593c86 100644 --- a/src/python/grpcio/tests/unit/_rpc_test.py +++ b/src/python/grpcio/tests/unit/_rpc_test.py @@ -29,8 +29,6 @@ """Test of gRPC Python's application-layer API.""" -from __future__ import division - import itertools import threading import unittest @@ -193,13 +191,6 @@ class RPCTest(unittest.TestCase): self._channel = grpc.insecure_channel('localhost:%d' % port) - # TODO(nathaniel): Why is this necessary, and only in some development - # environments? - def tearDown(self): - del self._channel - del self._server - del self._server_pool - def testUnrecognizedMethod(self): request = b'abc' diff --git a/src/python/grpcio/tests/unit/_thread_cleanup_test.py b/src/python/grpcio/tests/unit/_thread_cleanup_test.py new file mode 100644 index 0000000000..3e4f317edc --- /dev/null +++ b/src/python/grpcio/tests/unit/_thread_cleanup_test.py @@ -0,0 +1,117 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +"""Tests for CleanupThread.""" + +import threading +import time +import unittest + +from grpc import _common + +_SHORT_TIME = 0.5 +_LONG_TIME = 2.0 +_EPSILON = 0.1 + + +def cleanup(timeout): + if timeout is not None: + time.sleep(timeout) + else: + time.sleep(_LONG_TIME) + + +def slow_cleanup(timeout): + # Don't respect timeout + time.sleep(_LONG_TIME) + + +class CleanupThreadTest(unittest.TestCase): + + def testTargetInvocation(self): + event = threading.Event() + def target(arg1, arg2, arg3=None): + self.assertEqual('arg1', arg1) + self.assertEqual('arg2', arg2) + self.assertEqual('arg3', arg3) + event.set() + + cleanup_thread = _common.CleanupThread(behavior=lambda x: None, + target=target, name='test-name', + args=('arg1', 'arg2'), kwargs={'arg3': 'arg3'}) + cleanup_thread.start() + cleanup_thread.join() + self.assertEqual(cleanup_thread.name, 'test-name') + self.assertTrue(event.is_set()) + + def testJoinNoTimeout(self): + cleanup_thread = _common.CleanupThread(behavior=cleanup) + cleanup_thread.start() + start_time = time.time() + cleanup_thread.join() + end_time = time.time() + self.assertAlmostEqual(_LONG_TIME, end_time - start_time, delta=_EPSILON) + + def testJoinTimeout(self): + cleanup_thread = _common.CleanupThread(behavior=cleanup) + cleanup_thread.start() + start_time = time.time() + cleanup_thread.join(_SHORT_TIME) + end_time = time.time() + self.assertAlmostEqual(_SHORT_TIME, end_time - start_time, delta=_EPSILON) + + def testJoinTimeoutSlowBehavior(self): + cleanup_thread = _common.CleanupThread(behavior=slow_cleanup) + cleanup_thread.start() + start_time = time.time() + cleanup_thread.join(_SHORT_TIME) + end_time = time.time() + self.assertAlmostEqual(_LONG_TIME, end_time - start_time, delta=_EPSILON) + + def testJoinTimeoutSlowTarget(self): + event = threading.Event() + def target(): + event.wait(_LONG_TIME) + cleanup_thread = _common.CleanupThread(behavior=cleanup, target=target) + cleanup_thread.start() + start_time = time.time() + cleanup_thread.join(_SHORT_TIME) + end_time = time.time() + self.assertAlmostEqual(_SHORT_TIME, end_time - start_time, delta=_EPSILON) + event.set() + + def testJoinZeroTimeout(self): + cleanup_thread = _common.CleanupThread(behavior=cleanup) + cleanup_thread.start() + start_time = time.time() + cleanup_thread.join(0) + end_time = time.time() + self.assertAlmostEqual(0, end_time - start_time, delta=_EPSILON) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py b/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py index 5dc8720639..488f7d7141 100644 --- a/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py +++ b/src/python/grpcio/tests/unit/beta/_connectivity_channel_test.py @@ -187,5 +187,15 @@ class ChannelConnectivityTest(unittest.TestCase): server_completion_queue_thread.join() +class ConnectivityStatesTest(unittest.TestCase): + + def testBetaConnectivityStates(self): + self.assertIsNotNone(interfaces.ChannelConnectivity.IDLE) + self.assertIsNotNone(interfaces.ChannelConnectivity.CONNECTING) + self.assertIsNotNone(interfaces.ChannelConnectivity.READY) + self.assertIsNotNone(interfaces.ChannelConnectivity.TRANSIENT_FAILURE) + self.assertIsNotNone(interfaces.ChannelConnectivity.FATAL_FAILURE) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/beta/test_utilities.py b/src/python/grpcio/tests/unit/beta/test_utilities.py index 66b5f72897..8ccad04e05 100644 --- a/src/python/grpcio/tests/unit/beta/test_utilities.py +++ b/src/python/grpcio/tests/unit/beta/test_utilities.py @@ -50,6 +50,6 @@ def not_really_secure_channel( """ target = '%s:%d' % (host, port) channel = grpc.secure_channel( - target, ((b'grpc.ssl_target_name_override', server_host_override,),), - channel_credentials._credentials) + target, channel_credentials, + ((b'grpc.ssl_target_name_override', server_host_override,),)) return implementations.Channel(channel) diff --git a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py b/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py deleted file mode 100644 index 43457be362..0000000000 --- a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests Face interface compliance of the crust-over-core stack.""" - -import collections -import unittest - -import six - -from grpc.framework.core import implementations as core_implementations -from grpc.framework.crust import implementations as crust_implementations -from grpc.framework.foundation import logging_pool -from grpc.framework.interfaces.links import utilities -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.face import test_cases -from tests.unit.framework.interfaces.face import test_interfaces -from tests.unit.framework.interfaces.links import test_utilities - - -class _Implementation(test_interfaces.Implementation): - - def instantiate( - self, methods, method_implementations, multi_method_implementation): - pool = logging_pool.pool(test_constants.POOL_SIZE) - servicer = crust_implementations.servicer( - method_implementations, multi_method_implementation, pool) - - service_end_link = core_implementations.service_end_link( - servicer, test_constants.DEFAULT_TIMEOUT, - test_constants.MAXIMUM_TIMEOUT) - invocation_end_link = core_implementations.invocation_end_link() - invocation_end_link.join_link(service_end_link) - service_end_link.join_link(invocation_end_link) - service_end_link.start() - invocation_end_link.start() - - generic_stub = crust_implementations.generic_stub(invocation_end_link, pool) - # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. - group = next(iter(methods))[0] - # TODO(nathaniel): Add a "cardinalities_by_group" attribute to - # _digest.TestServiceDigest. - cardinalities = { - method: method_object.cardinality() - for (group, method), method_object in six.iteritems(methods)} - dynamic_stub = crust_implementations.dynamic_stub( - invocation_end_link, group, cardinalities, pool) - - return generic_stub, {group: dynamic_stub}, ( - invocation_end_link, service_end_link, pool) - - def destantiate(self, memo): - invocation_end_link, service_end_link, pool = memo - invocation_end_link.stop(0).wait() - service_end_link.stop(0).wait() - invocation_end_link.join_link(utilities.NULL_LINK) - service_end_link.join_link(utilities.NULL_LINK) - pool.shutdown(wait=True) - - def invocation_metadata(self): - return object() - - def initial_metadata(self): - return object() - - def terminal_metadata(self): - return object() - - def code(self): - return object() - - def details(self): - return object() - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return original_metadata is transmitted_metadata - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py b/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py deleted file mode 100644 index 1310292306..0000000000 --- a/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests the RPC Framework Core's implementation of the Base interface.""" - -import logging -import random -import time -import unittest - -from grpc.framework.core import implementations -from grpc.framework.interfaces.base import utilities -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.base import test_cases -from tests.unit.framework.interfaces.base import test_interfaces - - -class _Implementation(test_interfaces.Implementation): - - def __init__(self): - self._invocation_initial_metadata = object() - self._service_initial_metadata = object() - self._invocation_terminal_metadata = object() - self._service_terminal_metadata = object() - - def instantiate(self, serializations, servicer): - invocation = implementations.invocation_end_link() - service = implementations.service_end_link( - servicer, test_constants.DEFAULT_TIMEOUT, - test_constants.MAXIMUM_TIMEOUT) - invocation.join_link(service) - service.join_link(invocation) - return invocation, service, None - - def destantiate(self, memo): - pass - - def invocation_initial_metadata(self): - return self._invocation_initial_metadata - - def service_initial_metadata(self): - return self._service_initial_metadata - - def invocation_completion(self): - return utilities.completion(self._invocation_terminal_metadata, None, None) - - def service_completion(self): - return utilities.completion(self._service_terminal_metadata, None, None) - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return transmitted_metadata is original_metadata - - def completion_transmitted(self, original_completion, transmitted_completion): - return ( - (original_completion.terminal_metadata is - transmitted_completion.terminal_metadata) and - original_completion.code is transmitted_completion.code and - original_completion.message is transmitted_completion.message - ) - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/foundation/_later_test.py b/src/python/grpcio/tests/unit/framework/foundation/_later_test.py deleted file mode 100644 index 6c2459e185..0000000000 --- a/src/python/grpcio/tests/unit/framework/foundation/_later_test.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Tests of the later module.""" - -import threading -import time -import unittest - -from grpc.framework.foundation import later - -TICK = 0.1 - - -class LaterTest(unittest.TestCase): - - def test_simple_delay(self): - lock = threading.Lock() - cell = [0] - return_value = object() - - def computation(): - with lock: - cell[0] += 1 - return return_value - computation_future = later.later(TICK * 2, computation) - - self.assertFalse(computation_future.done()) - self.assertFalse(computation_future.cancelled()) - time.sleep(TICK) - self.assertFalse(computation_future.done()) - self.assertFalse(computation_future.cancelled()) - with lock: - self.assertEqual(0, cell[0]) - time.sleep(TICK * 2) - self.assertTrue(computation_future.done()) - self.assertFalse(computation_future.cancelled()) - with lock: - self.assertEqual(1, cell[0]) - self.assertEqual(return_value, computation_future.result()) - - def test_callback(self): - lock = threading.Lock() - cell = [0] - callback_called = [False] - future_passed_to_callback = [None] - def computation(): - with lock: - cell[0] += 1 - computation_future = later.later(TICK * 2, computation) - def callback(outcome): - with lock: - callback_called[0] = True - future_passed_to_callback[0] = outcome - computation_future.add_done_callback(callback) - time.sleep(TICK) - with lock: - self.assertFalse(callback_called[0]) - time.sleep(TICK * 2) - with lock: - self.assertTrue(callback_called[0]) - self.assertTrue(future_passed_to_callback[0].done()) - - callback_called[0] = False - future_passed_to_callback[0] = None - - computation_future.add_done_callback(callback) - with lock: - self.assertTrue(callback_called[0]) - self.assertTrue(future_passed_to_callback[0].done()) - - def test_cancel(self): - lock = threading.Lock() - cell = [0] - callback_called = [False] - future_passed_to_callback = [None] - def computation(): - with lock: - cell[0] += 1 - computation_future = later.later(TICK * 2, computation) - def callback(outcome): - with lock: - callback_called[0] = True - future_passed_to_callback[0] = outcome - computation_future.add_done_callback(callback) - time.sleep(TICK) - with lock: - self.assertFalse(callback_called[0]) - computation_future.cancel() - self.assertTrue(computation_future.cancelled()) - self.assertFalse(computation_future.running()) - self.assertTrue(computation_future.done()) - with lock: - self.assertTrue(callback_called[0]) - self.assertTrue(future_passed_to_callback[0].cancelled()) - - def test_result(self): - lock = threading.Lock() - cell = [0] - callback_called = [False] - future_passed_to_callback_cell = [None] - return_value = object() - - def computation(): - with lock: - cell[0] += 1 - return return_value - computation_future = later.later(TICK * 2, computation) - - def callback(future_passed_to_callback): - with lock: - callback_called[0] = True - future_passed_to_callback_cell[0] = future_passed_to_callback - computation_future.add_done_callback(callback) - returned_value = computation_future.result() - self.assertEqual(return_value, returned_value) - - # The callback may not yet have been called! Sleep a tick. - time.sleep(TICK) - with lock: - self.assertTrue(callback_called[0]) - self.assertEqual(return_value, future_passed_to_callback_cell[0].result()) - -if __name__ == '__main__': - unittest.main(verbosity=2) |