diff options
author | ncteisen <ncteisen@gmail.com> | 2018-01-23 08:58:11 -0800 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2018-01-23 08:58:11 -0800 |
commit | bd0c019c39afde113f5f66b27b1e6055f2635725 (patch) | |
tree | 5dfa40a2e3158b5651bf42134077d52b6fb7fea2 /src/python | |
parent | 90a00f8db60e5a0bbcdf1f0111b7f3ff60579016 (diff) | |
parent | ac0808b107d73613191b66617a547a201871a845 (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into channel-tracing
Diffstat (limited to 'src/python')
28 files changed, 794 insertions, 798 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index db410d307b..79793a710e 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -35,152 +35,152 @@ class FutureCancelledError(Exception): class Future(six.with_metaclass(abc.ABCMeta)): """A representation of a computation in another control flow. - Computations represented by a Future may be yet to be begun, may be ongoing, - or may have already completed. - """ + Computations represented by a Future may be yet to be begun, + may be ongoing, or may have already completed. + """ @abc.abstractmethod def cancel(self): """Attempts to cancel the computation. - This method does not block. + This method does not block. - Returns: - bool: - Returns True if the computation was canceled. - Returns False under all other circumstances, for example: - 1. computation has begun and could not be canceled. - 2. computation has finished - 3. computation is scheduled for execution and it is impossible to - determine its state without blocking. - """ + Returns: + bool: + Returns True if the computation was canceled. + Returns False under all other circumstances, for example: + 1. computation has begun and could not be canceled. + 2. computation has finished + 3. computation is scheduled for execution and it is impossible + to determine its state without blocking. + """ raise NotImplementedError() @abc.abstractmethod def cancelled(self): """Describes whether the computation was cancelled. - This method does not block. + This method does not block. - Returns: - bool: - Returns True if the computation was cancelled before its result became - available. - False under all other circumstances, for example: - 1. computation was not cancelled. - 2. computation's result is available. - """ + Returns: + bool: + Returns True if the computation was cancelled before its result became + available. + False under all other circumstances, for example: + 1. computation was not cancelled. + 2. computation's result is available. + """ raise NotImplementedError() @abc.abstractmethod def running(self): """Describes whether the computation is taking place. - This method does not block. + This method does not block. - Returns: - bool: - Returns True if the computation is scheduled for execution or currently - executing. - Returns False if the computation already executed or was cancelled. - """ + Returns: + bool: + Returns True if the computation is scheduled for execution or + currently executing. + Returns False if the computation already executed or was cancelled. + """ raise NotImplementedError() @abc.abstractmethod def done(self): """Describes whether the computation has taken place. - This method does not block. + This method does not block. - Returns: - bool: - Returns True if the computation already executed or was cancelled. - Returns False if the computation is scheduled for execution or currently - executing. - This is exactly opposite of the running() method's result. - """ + Returns: + bool: + Returns True if the computation already executed or was cancelled. + Returns False if the computation is scheduled for execution or + currently executing. + This is exactly opposite of the running() method's result. + """ raise NotImplementedError() @abc.abstractmethod def result(self, timeout=None): """Returns the result of the computation or raises its exception. - This method may return immediately or may block. + This method may return immediately or may block. - Args: - timeout: The length of time in seconds to wait for the computation to - finish or be cancelled. If None, the call will block until the - computations's termination. + Args: + timeout: The length of time in seconds to wait for the computation to + finish or be cancelled. If None, the call will block until the + computations's termination. - Returns: - The return value of the computation. + Returns: + The return value of the computation. - Raises: - FutureTimeoutError: If a timeout value is passed and the computation does - not terminate within the allotted time. - FutureCancelledError: If the computation was cancelled. - Exception: If the computation raised an exception, this call will raise - the same exception. - """ + Raises: + FutureTimeoutError: If a timeout value is passed and the computation + does not terminate within the allotted time. + FutureCancelledError: If the computation was cancelled. + Exception: If the computation raised an exception, this call will + raise the same exception. + """ raise NotImplementedError() @abc.abstractmethod def exception(self, timeout=None): """Return the exception raised by the computation. - This method may return immediately or may block. + This method may return immediately or may block. - Args: - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled. If None, the call will block until the - computations's termination. + Args: + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled. If None, the call will block until the + computations's termination. - Returns: - The exception raised by the computation, or None if the computation did - not raise an exception. + Returns: + The exception raised by the computation, or None if the computation + did not raise an exception. - Raises: - FutureTimeoutError: If a timeout value is passed and the computation does - not terminate within the allotted time. - FutureCancelledError: If the computation was cancelled. - """ + Raises: + FutureTimeoutError: If a timeout value is passed and the computation + does not terminate within the allotted time. + FutureCancelledError: If the computation was cancelled. + """ raise NotImplementedError() @abc.abstractmethod def traceback(self, timeout=None): """Access the traceback of the exception raised by the computation. - This method may return immediately or may block. + This method may return immediately or may block. - Args: - timeout: The length of time in seconds to wait for the computation to - terminate or be cancelled. If None, the call will block until the - computations's termination. + Args: + timeout: The length of time in seconds to wait for the computation + to terminate or be cancelled. If None, the call will block until + the computation's termination. - Returns: - The traceback of the exception raised by the computation, or None if the - computation did not raise an exception. + Returns: + The traceback of the exception raised by the computation, or None + if the computation did not raise an exception. - Raises: - FutureTimeoutError: If a timeout value is passed and the computation does - not terminate within the allotted time. - FutureCancelledError: If the computation was cancelled. - """ + Raises: + FutureTimeoutError: If a timeout value is passed and the computation + does not terminate within the allotted time. + FutureCancelledError: If the computation was cancelled. + """ raise NotImplementedError() @abc.abstractmethod def add_done_callback(self, fn): """Adds a function to be called at completion of the computation. - The callback will be passed this Future object describing the outcome of - the computation. + The callback will be passed this Future object describing the outcome + of the computation. - If the computation has already completed, the callback will be called - immediately. + If the computation has already completed, the callback will be called + immediately. - Args: - fn: A callable taking this Future object as its single parameter. - """ + Args: + fn: A callable taking this Future object as its single parameter. + """ raise NotImplementedError() @@ -191,14 +191,14 @@ class Future(six.with_metaclass(abc.ABCMeta)): class ChannelConnectivity(enum.Enum): """Mirrors grpc_connectivity_state in the gRPC Core. - Attributes: - IDLE: The channel is idle. - CONNECTING: The channel is connecting. - READY: The channel is ready to conduct RPCs. - TRANSIENT_FAILURE: The channel has seen a failure from which it expects to - recover. - SHUTDOWN: The channel has seen a failure from which it cannot recover. - """ + Attributes: + IDLE: The channel is idle. + CONNECTING: The channel is connecting. + READY: The channel is ready to conduct RPCs. + TRANSIENT_FAILURE: The channel has seen a failure from which it expects + to recover. + SHUTDOWN: The channel has seen a failure from which it cannot recover. + """ IDLE = (_cygrpc.ConnectivityState.idle, 'idle') CONNECTING = (_cygrpc.ConnectivityState.connecting, 'connecting') READY = (_cygrpc.ConnectivityState.ready, 'ready') @@ -250,44 +250,44 @@ class RpcContext(six.with_metaclass(abc.ABCMeta)): def is_active(self): """Describes whether the RPC is active or has terminated. - Returns: - bool: - True if RPC is active, False otherwise. - """ + Returns: + bool: + True if RPC is active, False otherwise. + """ raise NotImplementedError() @abc.abstractmethod def time_remaining(self): """Describes the length of allowed time remaining for the RPC. - Returns: - A nonnegative float indicating the length of allowed time in seconds - remaining for the RPC to complete before it is considered to have timed - out, or None if no deadline was specified for the RPC. - """ + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the RPC to complete before it is considered to have + timed out, or None if no deadline was specified for the RPC. + """ raise NotImplementedError() @abc.abstractmethod def cancel(self): """Cancels the RPC. - Idempotent and has no effect if the RPC has already terminated. - """ + Idempotent and has no effect if the RPC has already terminated. + """ raise NotImplementedError() @abc.abstractmethod def add_callback(self, callback): """Registers a callback to be called on RPC termination. - Args: - callback: A no-parameter callable to be called on RPC termination. + Args: + callback: A no-parameter callable to be called on RPC termination. - Returns: - bool: - True if the callback was added and will be called later; False if the - callback was not added and will not be called (because the RPC - already terminated or some other reason). - """ + Returns: + bool: + True if the callback was added and will be called later; False if + the callback was not added and will not be called (because the RPC + already terminated or some other reason). + """ raise NotImplementedError() @@ -301,44 +301,44 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)): def initial_metadata(self): """Accesses the initial metadata sent by the server. - This method blocks until the value is available. + This method blocks until the value is available. - Returns: - The initial :term:`metadata`. - """ + Returns: + The initial :term:`metadata`. + """ raise NotImplementedError() @abc.abstractmethod def trailing_metadata(self): """Accesses the trailing metadata sent by the server. - This method blocks until the value is available. + This method blocks until the value is available. - Returns: - The trailing :term:`metadata`. - """ + Returns: + The trailing :term:`metadata`. + """ raise NotImplementedError() @abc.abstractmethod def code(self): """Accesses the status code sent by the server. - This method blocks until the value is available. + This method blocks until the value is available. - Returns: - The StatusCode value for the RPC. - """ + Returns: + The StatusCode value for the RPC. + """ raise NotImplementedError() @abc.abstractmethod def details(self): """Accesses the details sent by the server. - This method blocks until the value is available. + This method blocks until the value is available. - Returns: - The details string of the RPC. - """ + Returns: + The details string of the RPC. + """ raise NotImplementedError() @@ -578,9 +578,9 @@ class AuthMetadataPlugin(six.with_metaclass(abc.ABCMeta)): class ServerCredentials(object): """An encapsulation of the data required to open a secure port on a Server. - This class has no supported interface - it exists to define the type of its - instances and its instances exist to be passed to other functions. - """ + This class has no supported interface - it exists to define the type of its + instances and its instances exist to be passed to other functions. + """ def __init__(self, credentials): self._credentials = credentials @@ -611,61 +611,65 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): def __call__(self, request, timeout=None, metadata=None, credentials=None): """Synchronously invokes the underlying RPC. - Args: - request: The request value for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow + for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - The response value for the RPC. + Returns: + The response value for the RPC. - Raises: - RpcError: Indicating that the RPC terminated with non-OK status. The - raised RpcError will also be a Call for the RPC affording the RPC's - metadata, status code, and details. - """ + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ raise NotImplementedError() @abc.abstractmethod def with_call(self, request, timeout=None, metadata=None, credentials=None): """Synchronously invokes the underlying RPC. - Args: - request: The request value for the RPC. - timeout: An optional durating of time in seconds to allow for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request: The request value for the RPC. + timeout: An optional durating of time in seconds to allow for + the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - The response value for the RPC and a Call value for the RPC. + Returns: + The response value for the RPC and a Call value for the RPC. - Raises: - RpcError: Indicating that the RPC terminated with non-OK status. The - raised RpcError will also be a Call for the RPC affording the RPC's - metadata, status code, and details. - """ + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ raise NotImplementedError() @abc.abstractmethod def future(self, request, timeout=None, metadata=None, credentials=None): """Asynchronously invokes the underlying RPC. - Args: - request: The request value for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - An object that is both a Call for the RPC and a Future. In the event of - RPC completion, the return Call-Future's result value will be the - response message of the RPC. Should the event terminate with non-OK - status, the returned Call-Future's exception value will be an RpcError. - """ + Returns: + An object that is both a Call for the RPC and a Future. + In the event of RPC completion, the return Call-Future's result + value will be the response message of the RPC. + Should the event terminate with non-OK status, + the returned Call-Future's exception value will be an RpcError. + """ raise NotImplementedError() @@ -676,19 +680,20 @@ class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): def __call__(self, request, timeout=None, metadata=None, credentials=None): """Invokes the underlying RPC. - Args: - request: The request value for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - If None, the timeout is considered infinite. - metadata: An optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request: The request value for the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. If None, the timeout is considered infinite. + metadata: An optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - An object that is both a Call for the RPC and an iterator of response - values. Drawing response values from the returned Call-iterator may - raise RpcError indicating termination of the RPC with non-OK status. - """ + Returns: + An object that is both a Call for the RPC and an iterator of + response values. Drawing response values from the returned + Call-iterator may raise RpcError indicating termination of the + RPC with non-OK status. + """ raise NotImplementedError() @@ -703,22 +708,23 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): credentials=None): """Synchronously invokes the underlying RPC. - Args: - request_iterator: An iterator that yields request values for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - If None, the timeout is considered infinite. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request_iterator: An iterator that yields request values for + the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. If None, the timeout is considered infinite. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - The response value for the RPC. + Returns: + The response value for the RPC. - Raises: - RpcError: Indicating that the RPC terminated with non-OK status. The - raised RpcError will also implement grpc.Call, affording methods - such as metadata, code, and details. - """ + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also implement grpc.Call, affording methods + such as metadata, code, and details. + """ raise NotImplementedError() @abc.abstractmethod @@ -729,22 +735,23 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): credentials=None): """Synchronously invokes the underlying RPC on the client. - Args: - request_iterator: An iterator that yields request values for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - If None, the timeout is considered infinite. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request_iterator: An iterator that yields request values for + the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. If None, the timeout is considered infinite. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - The response value for the RPC and a Call object for the RPC. + Returns: + The response value for the RPC and a Call object for the RPC. - Raises: - RpcError: Indicating that the RPC terminated with non-OK status. The - raised RpcError will also be a Call for the RPC affording the RPC's - metadata, status code, and details. - """ + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ raise NotImplementedError() @abc.abstractmethod @@ -755,20 +762,21 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): credentials=None): """Asynchronously invokes the underlying RPC on the client. - Args: - request_iterator: An iterator that yields request values for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - If None, the timeout is considered infinite. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. If None, the timeout is considered infinite. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - An object that is both a Call for the RPC and a Future. In the event of - RPC completion, the return Call-Future's result value will be the - response message of the RPC. Should the event terminate with non-OK - status, the returned Call-Future's exception value will be an RpcError. - """ + Returns: + An object that is both a Call for the RPC and a Future. + In the event of RPC completion, the return Call-Future's result value + will be the response message of the RPC. Should the event terminate + with non-OK status, the returned Call-Future's exception value will + be an RpcError. + """ raise NotImplementedError() @@ -783,19 +791,20 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): credentials=None): """Invokes the underlying RPC on the client. - Args: - request_iterator: An iterator that yields request values for the RPC. - timeout: An optional duration of time in seconds to allow for the RPC. - if not specified the timeout is considered infinite. - metadata: Optional :term:`metadata` to be transmitted to the - service-side of the RPC. - credentials: An optional CallCredentials for the RPC. + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: An optional duration of time in seconds to allow for + the RPC. If not specified, the timeout is considered infinite. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. - Returns: - An object that is both a Call for the RPC and an iterator of response - values. Drawing response values from the returned Call-iterator may - raise RpcError indicating termination of the RPC with non-OK status. - """ + Returns: + An object that is both a Call for the RPC and an iterator of + response values. Drawing response values from the returned + Call-iterator may raise RpcError indicating termination of the + RPC with non-OK status. + """ raise NotImplementedError() @@ -809,31 +818,31 @@ class Channel(six.with_metaclass(abc.ABCMeta)): def subscribe(self, callback, try_to_connect=False): """Subscribe to this Channel's connectivity state machine. - A Channel may be in any of the states described by ChannelConnectivity. - This method allows application to monitor the state transitions. - The typical use case is to debug or gain better visibility into gRPC - runtime's state. + A Channel may be in any of the states described by ChannelConnectivity. + This method allows application to monitor the state transitions. + The typical use case is to debug or gain better visibility into gRPC + runtime's state. - Args: - callback: A callable to be invoked with ChannelConnectivity argument. - ChannelConnectivity describes current state of the channel. - The callable will be invoked immediately upon subscription and again for - every change to ChannelConnectivity until it is unsubscribed or this - Channel object goes out of scope. - try_to_connect: A boolean indicating whether or not this Channel should - attempt to connect immediately. If set to False, gRPC runtime decides - when to connect. - """ + Args: + callback: A callable to be invoked with ChannelConnectivity argument. + ChannelConnectivity describes current state of the channel. + The callable will be invoked immediately upon subscription + and again for every change to ChannelConnectivity until it + is unsubscribed or this Channel object goes out of scope. + try_to_connect: A boolean indicating whether or not this Channel + should attempt to connect immediately. If set to False, gRPC + runtime decides when to connect. + """ raise NotImplementedError() @abc.abstractmethod def unsubscribe(self, callback): """Unsubscribes a subscribed callback from this Channel's connectivity. - Args: - callback: A callable previously registered with this Channel from having - been passed to its "subscribe" method. - """ + Args: + callback: A callable previously registered with this Channel from + having been passed to its "subscribe" method. + """ raise NotImplementedError() @abc.abstractmethod @@ -843,16 +852,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)): response_deserializer=None): """Creates a UnaryUnaryMultiCallable for a unary-unary method. - Args: - method: The name of the RPC method. - request_serializer: Optional behaviour for serializing the request - message. Request goes unserialized in case None is passed. - response_deserializer: Optional behaviour for deserializing the response - message. Response goes undeserialized in case None is passed. + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None + is passed. - Returns: - A UnaryUnaryMultiCallable value for the named unary-unary method. - """ + Returns: + A UnaryUnaryMultiCallable value for the named unary-unary method. + """ raise NotImplementedError() @abc.abstractmethod @@ -862,16 +872,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)): response_deserializer=None): """Creates a UnaryStreamMultiCallable for a unary-stream method. - Args: - method: The name of the RPC method. - request_serializer: Optional behaviour for serializing the request - message. Request goes unserialized in case None is passed. - response_deserializer: Optional behaviour for deserializing the response - message. Response goes undeserialized in case None is passed. + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None is + passed. - Returns: - A UnaryStreamMultiCallable value for the name unary-stream method. - """ + Returns: + A UnaryStreamMultiCallable value for the name unary-stream method. + """ raise NotImplementedError() @abc.abstractmethod @@ -881,16 +892,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)): response_deserializer=None): """Creates a StreamUnaryMultiCallable for a stream-unary method. - Args: - method: The name of the RPC method. - request_serializer: Optional behaviour for serializing the request - message. Request goes unserialized in case None is passed. - response_deserializer: Optional behaviour for deserializing the response - message. Response goes undeserialized in case None is passed. + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None is + passed. - Returns: - A StreamUnaryMultiCallable value for the named stream-unary method. - """ + Returns: + A StreamUnaryMultiCallable value for the named stream-unary method. + """ raise NotImplementedError() @abc.abstractmethod @@ -900,16 +912,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)): response_deserializer=None): """Creates a StreamStreamMultiCallable for a stream-stream method. - Args: - method: The name of the RPC method. - request_serializer: Optional behaviour for serializing the request - message. Request goes unserialized in case None is passed. - response_deserializer: Optional behaviour for deserializing the response - message. Response goes undeserialized in case None is passed. + Args: + method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the + response message. Response goes undeserialized in case None + is passed. - Returns: - A StreamStreamMultiCallable value for the named stream-stream method. - """ + Returns: + A StreamStreamMultiCallable value for the named stream-stream method. + """ raise NotImplementedError() @@ -923,79 +936,79 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)): def invocation_metadata(self): """Accesses the metadata from the sent by the client. - Returns: - The invocation :term:`metadata`. - """ + Returns: + The invocation :term:`metadata`. + """ raise NotImplementedError() @abc.abstractmethod def peer(self): """Identifies the peer that invoked the RPC being serviced. - Returns: - A string identifying the peer that invoked the RPC being serviced. - The string format is determined by gRPC runtime. - """ + Returns: + A string identifying the peer that invoked the RPC being serviced. + The string format is determined by gRPC runtime. + """ raise NotImplementedError() @abc.abstractmethod def peer_identities(self): """Gets one or more peer identity(s). - Equivalent to - servicer_context.auth_context().get( - servicer_context.peer_identity_key()) + Equivalent to + servicer_context.auth_context().get( + servicer_context.peer_identity_key()) - Returns: - An iterable of the identities, or None if the call is not authenticated. - Each identity is returned as a raw bytes type. - """ + Returns: + An iterable of the identities, or None if the call is not + authenticated. Each identity is returned as a raw bytes type. + """ raise NotImplementedError() @abc.abstractmethod def peer_identity_key(self): """The auth property used to identify the peer. - For example, "x509_common_name" or "x509_subject_alternative_name" are - used to identify an SSL peer. + For example, "x509_common_name" or "x509_subject_alternative_name" are + used to identify an SSL peer. - Returns: - The auth property (string) that indicates the - peer identity, or None if the call is not authenticated. - """ + Returns: + The auth property (string) that indicates the + peer identity, or None if the call is not authenticated. + """ raise NotImplementedError() @abc.abstractmethod def auth_context(self): """Gets the auth context for the call. - Returns: - A map of strings to an iterable of bytes for each auth property. - """ + Returns: + A map of strings to an iterable of bytes for each auth property. + """ raise NotImplementedError() @abc.abstractmethod def send_initial_metadata(self, initial_metadata): """Sends the initial metadata value to the client. - This method need not be called by implementations if they have no - metadata to add to what the gRPC runtime will transmit. + This method need not be called by implementations if they have no + metadata to add to what the gRPC runtime will transmit. - Args: - initial_metadata: The initial :term:`metadata`. - """ + Args: + initial_metadata: The initial :term:`metadata`. + """ raise NotImplementedError() @abc.abstractmethod def set_trailing_metadata(self, trailing_metadata): """Sends the trailing metadata for the RPC. - This method need not be called by implementations if they have no - metadata to add to what the gRPC runtime will transmit. + This method need not be called by implementations if they have no + metadata to add to what the gRPC runtime will transmit. - Args: - trailing_metadata: The trailing :term:`metadata`. - """ + Args: + trailing_metadata: The trailing :term:`metadata`. + """ raise NotImplementedError() @abc.abstractmethod @@ -1049,44 +1062,45 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)): class RpcMethodHandler(six.with_metaclass(abc.ABCMeta)): """An implementation of a single RPC method. - Attributes: - request_streaming: Whether the RPC supports exactly one request message or - any arbitrary number of request messages. - response_streaming: Whether the RPC supports exactly one response message or - any arbitrary number of response messages. - request_deserializer: A callable behavior that accepts a byte string and - returns an object suitable to be passed to this object's business logic, - or None to indicate that this object's business logic should be passed the - raw request bytes. - response_serializer: A callable behavior that accepts an object produced by - this object's business logic and returns a byte string, or None to - indicate that the byte strings produced by this object's business logic - should be transmitted on the wire as they are. - unary_unary: This object's application-specific business logic as a callable - value that takes a request value and a ServicerContext object and returns - a response value. Only non-None if both request_streaming and - response_streaming are False. - unary_stream: This object's application-specific business logic as a - callable value that takes a request value and a ServicerContext object and - returns an iterator of response values. Only non-None if request_streaming - is False and response_streaming is True. - stream_unary: This object's application-specific business logic as a - callable value that takes an iterator of request values and a - ServicerContext object and returns a response value. Only non-None if - request_streaming is True and response_streaming is False. - stream_stream: This object's application-specific business logic as a - callable value that takes an iterator of request values and a - ServicerContext object and returns an iterator of response values. Only - non-None if request_streaming and response_streaming are both True. - """ + Attributes: + request_streaming: Whether the RPC supports exactly one request message + or any arbitrary number of request messages. + response_streaming: Whether the RPC supports exactly one response message + or any arbitrary number of response messages. + request_deserializer: A callable behavior that accepts a byte string and + returns an object suitable to be passed to this object's business + logic, or None to indicate that this object's business logic should be + passed the raw request bytes. + response_serializer: A callable behavior that accepts an object produced + by this object's business logic and returns a byte string, or None to + indicate that the byte strings produced by this object's business logic + should be transmitted on the wire as they are. + unary_unary: This object's application-specific business logic as a + callable value that takes a request value and a ServicerContext object + and returns a response value. Only non-None if both request_streaming + and response_streaming are False. + unary_stream: This object's application-specific business logic as a + callable value that takes a request value and a ServicerContext object + and returns an iterator of response values. Only non-None if + request_streaming is False and response_streaming is True. + stream_unary: This object's application-specific business logic as a + callable value that takes an iterator of request values and a + ServicerContext object and returns a response value. Only non-None if + request_streaming is True and response_streaming is False. + stream_stream: This object's application-specific business logic as a + callable value that takes an iterator of request values and a + ServicerContext object and returns an iterator of response values. + Only non-None if request_streaming and response_streaming are both + True. + """ class HandlerCallDetails(six.with_metaclass(abc.ABCMeta)): """Describes an RPC that has just arrived for service. - Attributes: - method: The method name of the RPC. - invocation_metadata: The :term:`metadata` sent by the client. - """ + Attributes: + method: The method name of the RPC. + invocation_metadata: The :term:`metadata` sent by the client. + """ class GenericRpcHandler(six.with_metaclass(abc.ABCMeta)): @@ -1096,33 +1110,33 @@ class GenericRpcHandler(six.with_metaclass(abc.ABCMeta)): def service(self, handler_call_details): """Returns the handler for servicing the RPC. - Args: - handler_call_details: A HandlerCallDetails describing the RPC. + Args: + handler_call_details: A HandlerCallDetails describing the RPC. - Returns: - An RpcMethodHandler with which the RPC may be serviced if the - implementation chooses to service this RPC, or None otherwise. - """ + Returns: + An RpcMethodHandler with which the RPC may be serviced if the + implementation chooses to service this RPC, or None otherwise. + """ raise NotImplementedError() class ServiceRpcHandler(six.with_metaclass(abc.ABCMeta, GenericRpcHandler)): """An implementation of RPC methods belonging to a service. - A service handles RPC methods with structured names of the form - '/Service.Name/Service.Method', where 'Service.Name' is the value - returned by service_name(), and 'Service.Method' is the method - name. A service can have multiple method names, but only a single - service name. - """ + A service handles RPC methods with structured names of the form + '/Service.Name/Service.Method', where 'Service.Name' is the value + returned by service_name(), and 'Service.Method' is the method + name. A service can have multiple method names, but only a single + service name. + """ @abc.abstractmethod def service_name(self): """Returns this service's name. - Returns: - The service name. - """ + Returns: + The service name. + """ raise NotImplementedError() @@ -1164,83 +1178,84 @@ class Server(six.with_metaclass(abc.ABCMeta)): def add_generic_rpc_handlers(self, generic_rpc_handlers): """Registers GenericRpcHandlers with this Server. - This method is only safe to call before the server is started. + This method is only safe to call before the server is started. - Args: - generic_rpc_handlers: An iterable of GenericRpcHandlers that will be used - to service RPCs. - """ + Args: + generic_rpc_handlers: An iterable of GenericRpcHandlers that will be + used to service RPCs. + """ raise NotImplementedError() @abc.abstractmethod def add_insecure_port(self, address): """Opens an insecure port for accepting RPCs. - This method may only be called before starting the server. + This method may only be called before starting the server. - Args: - address: The address for which to open a port. - if the port is 0, or not specified in the address, then gRPC runtime - will choose a port. + Args: + address: The address for which to open a port. + if the port is 0, or not specified in the address, then gRPC runtime + will choose a port. - Returns: - integer: - An integer port on which server will accept RPC requests. - """ + Returns: + integer: + An integer port on which server will accept RPC requests. + """ raise NotImplementedError() @abc.abstractmethod def add_secure_port(self, address, server_credentials): """Opens a secure port for accepting RPCs. - This method may only be called before starting the server. + This method may only be called before starting the server. - Args: - address: The address for which to open a port. - if the port is 0, or not specified in the address, then gRPC runtime - will choose a port. - server_credentials: A ServerCredentials object. + Args: + address: The address for which to open a port. + if the port is 0, or not specified in the address, then gRPC + runtime will choose a port. + server_credentials: A ServerCredentials object. - Returns: - integer: - An integer port on which server will accept RPC requests. - """ + Returns: + integer: + An integer port on which server will accept RPC requests. + """ raise NotImplementedError() @abc.abstractmethod def start(self): """Starts this Server. - This method may only be called once. (i.e. it is not idempotent). - """ + This method may only be called once. (i.e. it is not idempotent). + """ raise NotImplementedError() @abc.abstractmethod def stop(self, grace): """Stops this Server. - This method immediately stop service of new RPCs in all cases. - If a grace period is specified, this method returns immediately - and all RPCs active at the end of the grace period are aborted. + This method immediately stop service of new RPCs in all cases. + If a grace period is specified, this method returns immediately + and all RPCs active at the end of the grace period are aborted. - If a grace period is not specified, then all existing RPCs are - teriminated immediately and the this method blocks until the last - RPC handler terminates. + If a grace period is not specified, then all existing RPCs are + teriminated immediately and the this method blocks until the last + RPC handler terminates. - This method is idempotent and may be called at any time. Passing a smaller - grace value in subsequentcall will have the effect of stopping the Server - sooner. Passing a larger grace value in subsequent call *will not* have the - effect of stopping the server later (i.e. the most restrictive grace - value is used). + This method is idempotent and may be called at any time. + Passing a smaller grace value in subsequent call will have + the effect of stopping the Server sooner. Passing a larger + grace value in subsequent call *will not* have the effect of + stopping the server later (i.e. the most restrictive grace + value is used). - Args: - grace: A duration of time in seconds or None. + Args: + grace: A duration of time in seconds or None. - Returns: - A threading.Event that will be set when this Server has completely - stopped, i.e. when running RPCs either complete or are aborted and - all handlers have terminated. - """ + Returns: + A threading.Event that will be set when this Server has completely + stopped, i.e. when running RPCs either complete or are aborted and + all handlers have terminated. + """ raise NotImplementedError() @@ -1252,15 +1267,15 @@ def unary_unary_rpc_method_handler(behavior, response_serializer=None): """Creates an RpcMethodHandler for a unary-unary RPC method. - Args: - behavior: The implementation of an RPC that accepts one request and returns - one response. - request_deserializer: An optional behavior for request deserialization. - response_serializer: An optional behavior for response serialization. + Args: + behavior: The implementation of an RPC that accepts one request + and returns one response. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. - Returns: - An RpcMethodHandler object that is typically used by grpc.Server. - """ + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(False, False, request_deserializer, response_serializer, behavior, None, @@ -1272,15 +1287,15 @@ def unary_stream_rpc_method_handler(behavior, response_serializer=None): """Creates an RpcMethodHandler for a unary-stream RPC method. - Args: - behavior: The implementation of an RPC that accepts one request and returns - an iterator of response values. - request_deserializer: An optional behavior for request deserialization. - response_serializer: An optional behavior for response serialization. + Args: + behavior: The implementation of an RPC that accepts one request + and returns an iterator of response values. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. - Returns: - An RpcMethodHandler object that is typically used by grpc.Server. - """ + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(False, True, request_deserializer, response_serializer, None, behavior, @@ -1292,15 +1307,15 @@ def stream_unary_rpc_method_handler(behavior, response_serializer=None): """Creates an RpcMethodHandler for a stream-unary RPC method. - Args: - behavior: The implementation of an RPC that accepts an iterator of request - values and returns a single response value. - request_deserializer: An optional behavior for request deserialization. - response_serializer: An optional behavior for response serialization. + Args: + behavior: The implementation of an RPC that accepts an iterator of + request values and returns a single response value. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. - Returns: - An RpcMethodHandler object that is typically used by grpc.Server. - """ + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(True, False, request_deserializer, response_serializer, None, None, @@ -1312,15 +1327,15 @@ def stream_stream_rpc_method_handler(behavior, response_serializer=None): """Creates an RpcMethodHandler for a stream-stream RPC method. - Args: - behavior: The implementation of an RPC that accepts an iterator of request - values and returns an iterator of response values. - request_deserializer: An optional behavior for request deserialization. - response_serializer: An optional behavior for response serialization. + Args: + behavior: The implementation of an RPC that accepts an iterator of + request values and returns an iterator of response values. + request_deserializer: An optional behavior for request deserialization. + response_serializer: An optional behavior for response serialization. - Returns: - An RpcMethodHandler object that is typically used by grpc.Server. - """ + Returns: + An RpcMethodHandler object that is typically used by grpc.Server. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.RpcMethodHandler(True, True, request_deserializer, response_serializer, None, None, None, @@ -1330,15 +1345,16 @@ def stream_stream_rpc_method_handler(behavior, def method_handlers_generic_handler(service, method_handlers): """Creates a GenericRpcHandler from RpcMethodHandlers. - Args: - service: The name of the service that is implemented by the method_handlers. - method_handlers: A dictionary that maps method names to corresponding - RpcMethodHandler. + Args: + service: The name of the service that is implemented by the + method_handlers. + method_handlers: A dictionary that maps method names to corresponding + RpcMethodHandler. - Returns: - A GenericRpcHandler. This is typically added to the grpc.Server object - with add_generic_rpc_handlers() before starting the server. - """ + Returns: + A GenericRpcHandler. This is typically added to the grpc.Server object + with add_generic_rpc_handlers() before starting the server. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.DictionaryGenericHandler(service, method_handlers) @@ -1435,20 +1451,20 @@ def ssl_server_credentials(private_key_certificate_chain_pairs, require_client_auth=False): """Creates a ServerCredentials for use with an SSL-enabled Server. - Args: - private_key_certificate_chain_pairs: A list of pairs of the form - [PEM-encoded private key, PEM-encoded certificate chain]. - root_certificates: An optional byte string of PEM-encoded client root - certificates that the server will use to verify client authentication. - If omitted, require_client_auth must also be False. - require_client_auth: A boolean indicating whether or not to require - clients to be authenticated. May only be True if root_certificates - is not None. - - Returns: - A ServerCredentials for use with an SSL-enabled Server. Typically, this - object is an argument to add_secure_port() method during server setup. - """ + Args: + private_key_certificate_chain_pairs: A list of pairs of the form + [PEM-encoded private key, PEM-encoded certificate chain]. + root_certificates: An optional byte string of PEM-encoded client root + certificates that the server will use to verify client authentication. + If omitted, require_client_auth must also be False. + require_client_auth: A boolean indicating whether or not to require + clients to be authenticated. May only be True if root_certificates + is not None. + + Returns: + A ServerCredentials for use with an SSL-enabled Server. Typically, this + object is an argument to add_secure_port() method during server setup. + """ if len(private_key_certificate_chain_pairs) == 0: raise ValueError( 'At least one private key-certificate chain pair is required!') @@ -1522,16 +1538,16 @@ def dynamic_ssl_server_credentials(initial_certificate_configuration, def channel_ready_future(channel): """Creates a Future that tracks when a Channel is ready. - Cancelling the Future does not affect the channel's state machine. - It merely decouples the Future from channel state machine. + Cancelling the Future does not affect the channel's state machine. + It merely decouples the Future from channel state machine. - Args: - channel: A Channel object. + Args: + channel: A Channel object. - Returns: - A Future object that matures when the channel connectivity is - ChannelConnectivity.READY. - """ + Returns: + A Future object that matures when the channel connectivity is + ChannelConnectivity.READY. + """ from grpc import _utilities # pylint: disable=cyclic-import return _utilities.channel_ready_future(channel) @@ -1539,14 +1555,14 @@ def channel_ready_future(channel): def insecure_channel(target, options=None): """Creates an insecure Channel to a server. - Args: - target: The server address - options: An optional list of key-value pairs (channel args in gRPC runtime) - to configure the channel. + Args: + target: The server address + options: An optional list of key-value pairs (channel args + in gRPC Core runtime) to configure the channel. - Returns: - A Channel object. - """ + Returns: + A Channel object. + """ from grpc import _channel # pylint: disable=cyclic-import return _channel.Channel(target, () if options is None else options, None) @@ -1554,15 +1570,15 @@ def insecure_channel(target, options=None): def secure_channel(target, credentials, options=None): """Creates a secure Channel to a server. - Args: - target: The server address. - credentials: A ChannelCredentials instance. - options: An optional list of key-value pairs (channel args in gRPC runtime) - to configure the channel. + Args: + target: The server address. + credentials: A ChannelCredentials instance. + options: An optional list of key-value pairs (channel args + in gRPC Core runtime) to configure the channel. - Returns: - A Channel object. - """ + Returns: + A Channel object. + """ from grpc import _channel # pylint: disable=cyclic-import return _channel.Channel(target, () if options is None else options, credentials._credentials) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 24be042f61..bfc7208310 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -27,7 +27,6 @@ from grpc.framework.foundation import callable_util _USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__) _EMPTY_FLAGS = 0 -_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) _UNARY_UNARY_INITIAL_DUE = ( cygrpc.OperationType.send_initial_metadata, @@ -61,11 +60,7 @@ _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( def _deadline(timeout): - if timeout is None: - return None, _INFINITE_FUTURE - else: - deadline = time.time() + timeout - return deadline, cygrpc.Timespec(deadline) + return None if timeout is None else time.time() + timeout def _unknown_code_details(unknown_cygrpc_code, details): @@ -420,15 +415,15 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): def _start_unary_request(request, timeout, request_serializer): - deadline, deadline_timespec = _deadline(timeout) + deadline = _deadline(timeout) serialized_request = _common.serialize(request, request_serializer) if serialized_request is None: state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 'Exception serializing request!') rendezvous = _Rendezvous(state, None, None, deadline) - return deadline, deadline_timespec, None, rendezvous + return deadline, None, rendezvous else: - return deadline, deadline_timespec, serialized_request, None + return deadline, serialized_request, None def _end_unary_response_blocking(state, call, with_call, deadline): @@ -453,10 +448,10 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._response_deserializer = response_deserializer def _prepare(self, request, timeout, metadata): - deadline, deadline_timespec, serialized_request, rendezvous = ( - _start_unary_request(request, timeout, self._request_serializer)) + deadline, serialized_request, rendezvous = (_start_unary_request( + request, timeout, self._request_serializer)) if serialized_request is None: - return None, None, None, None, rendezvous + return None, None, None, rendezvous else: state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( @@ -467,18 +462,17 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ) - return state, operations, deadline, deadline_timespec, None + return state, operations, deadline, None def _blocking(self, request, timeout, metadata, credentials): - state, operations, deadline, deadline_timespec, rendezvous = self._prepare( + state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata) if rendezvous: raise rendezvous else: completion_queue = cygrpc.CompletionQueue() call = self._channel.create_call(None, 0, completion_queue, - self._method, None, - deadline_timespec) + self._method, None, deadline) if credentials is not None: call.set_credentials(credentials._credentials) call_error = call.start_client_batch(operations, None) @@ -498,13 +492,13 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): return _end_unary_response_blocking(state, call, True, deadline) def future(self, request, timeout=None, metadata=None, credentials=None): - state, operations, deadline, deadline_timespec, rendezvous = self._prepare( + state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata) if rendezvous: return rendezvous else: call, drive_call = self._managed_call(None, 0, self._method, None, - deadline_timespec) + deadline) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, @@ -530,14 +524,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._response_deserializer = response_deserializer def __call__(self, request, timeout=None, metadata=None, credentials=None): - deadline, deadline_timespec, serialized_request, rendezvous = ( - _start_unary_request(request, timeout, self._request_serializer)) + deadline, serialized_request, rendezvous = (_start_unary_request( + request, timeout, self._request_serializer)) if serialized_request is None: raise rendezvous else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) call, drive_call = self._managed_call(None, 0, self._method, None, - deadline_timespec) + deadline) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, @@ -573,11 +567,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self._response_deserializer = response_deserializer def _blocking(self, request_iterator, timeout, metadata, credentials): - deadline, deadline_timespec = _deadline(timeout) + deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) completion_queue = cygrpc.CompletionQueue() call = self._channel.create_call(None, 0, completion_queue, - self._method, None, deadline_timespec) + self._method, None, deadline) if credentials is not None: call.set_credentials(credentials._credentials) with state.condition: @@ -624,10 +618,10 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): timeout=None, metadata=None, credentials=None): - deadline, deadline_timespec = _deadline(timeout) + deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) call, drive_call = self._managed_call(None, 0, self._method, None, - deadline_timespec) + deadline) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, self._response_deserializer) @@ -665,10 +659,10 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): timeout=None, metadata=None, credentials=None): - deadline, deadline_timespec = _deadline(timeout) + deadline = _deadline(timeout) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) call, drive_call = self._managed_call(None, 0, self._method, None, - deadline_timespec) + deadline) if credentials is not None: call.set_credentials(credentials._credentials) event_handler = _event_handler(state, call, self._response_deserializer) @@ -737,7 +731,8 @@ def _channel_managed_call_management(state): flags: An integer bitfield of call flags. method: The RPC method. host: A host string for the created call. - deadline: A cygrpc.Timespec to be the deadline of the created call. + deadline: A float to be the deadline of the created call or None if the + call is to have an infinite deadline. Returns: A cygrpc.Call with which to conduct an RPC and a function to call if @@ -827,8 +822,8 @@ def _poll_connectivity(state, channel, initial_try_to_connect): completion_queue = cygrpc.CompletionQueue() while True: channel.watch_connectivity_state(connectivity, - cygrpc.Timespec(time.time() + 0.2), - completion_queue, None) + time.time() + 0.2, completion_queue, + None) event = completion_queue.poll() with state.lock: if not state.callbacks_and_connectivities and not state.try_to_connect: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 443d534d7e..efe5f2e0db 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -42,7 +42,7 @@ cdef class Channel: def create_call(self, Call parent, int flags, CompletionQueue queue not None, - method, host, Timespec deadline not None): + method, host, object deadline): if queue.is_shutting_down: raise ValueError("queue must not be shutting down or shutdown") cdef grpc_slice method_slice = _slice_from_bytes(method) @@ -56,14 +56,13 @@ cdef class Channel: cdef grpc_call *parent_call = NULL if parent is not None: parent_call = parent.c_call - with nogil: - operation_call.c_call = grpc_channel_create_call( - self.c_channel, parent_call, flags, - queue.c_completion_queue, method_slice, host_slice_ptr, - deadline.c_time, NULL) - grpc_slice_unref(method_slice) - if host_slice_ptr: - grpc_slice_unref(host_slice) + operation_call.c_call = grpc_channel_create_call( + self.c_channel, parent_call, flags, + queue.c_completion_queue, method_slice, host_slice_ptr, + _timespec_from_time(deadline), NULL) + grpc_slice_unref(method_slice) + if host_slice_ptr: + grpc_slice_unref(host_slice) return operation_call def check_connectivity_state(self, bint try_to_connect): @@ -75,13 +74,12 @@ cdef class Channel: def watch_connectivity_state( self, grpc_connectivity_state last_observed_state, - Timespec deadline not None, CompletionQueue queue not None, tag): + object deadline, CompletionQueue queue not None, tag): cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag) cpython.Py_INCREF(connectivity_tag) - with nogil: - grpc_channel_watch_connectivity_state( - self.c_channel, last_observed_state, deadline.c_time, - queue.c_completion_queue, <cpython.PyObject *>connectivity_tag) + grpc_channel_watch_connectivity_state( + self.c_channel, last_observed_state, _timespec_from_time(deadline), + queue.c_completion_queue, <cpython.PyObject *>connectivity_tag) def target(self): cdef char *target = NULL diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index e259789b35..40496d1124 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -52,17 +52,18 @@ cdef class CompletionQueue: cpython.Py_DECREF(tag) return tag.event(event) - def poll(self, Timespec deadline=None): + def poll(self, deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for # 'special' methods (like next and __next__). cdef gpr_timespec c_increment cdef gpr_timespec c_timeout cdef gpr_timespec c_deadline + if deadline is None: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) + else: + c_deadline = _timespec_from_time(deadline) with nogil: c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) - c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) - if deadline is not None: - c_deadline = deadline.c_time while True: c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 7b2482d947..297bbadfe0 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -18,11 +18,6 @@ cdef grpc_slice _copy_slice(grpc_slice slice) nogil cdef grpc_slice _slice_from_bytes(bytes value) nogil -cdef class Timespec: - - cdef gpr_timespec c_time - - cdef class CallDetails: cdef grpc_call_details c_details diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index bc2cd0338e..b2343b53d6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -123,74 +123,6 @@ class CompressionLevel: high = GRPC_COMPRESS_LEVEL_HIGH -cdef class Timespec: - - def __cinit__(self, time): - if time is None: - with nogil: - self.c_time = gpr_now(GPR_CLOCK_REALTIME) - return - if isinstance(time, int): - time = float(time) - if isinstance(time, float): - if time == float("+inf"): - with nogil: - self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME) - elif time == float("-inf"): - with nogil: - self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME) - else: - self.c_time.seconds = time - self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9 - self.c_time.clock_type = GPR_CLOCK_REALTIME - elif isinstance(time, Timespec): - self.c_time = (<Timespec>time).c_time - else: - raise TypeError("expected time to be float, int, or Timespec, not {}" - .format(type(time))) - - @property - def seconds(self): - # TODO(atash) ensure that everywhere a Timespec is created that it's - # converted to GPR_CLOCK_REALTIME then and not every time someone wants to - # read values off in Python. - cdef gpr_timespec real_time - with nogil: - real_time = ( - gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) - return real_time.seconds - - @property - def nanoseconds(self): - cdef gpr_timespec real_time = ( - gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) - return real_time.nanoseconds - - def __float__(self): - cdef gpr_timespec real_time = ( - gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) - return <double>real_time.seconds + <double>real_time.nanoseconds / 1e9 - - def __richcmp__(Timespec self not None, Timespec other not None, int op): - cdef gpr_timespec self_c_time = self.c_time - cdef gpr_timespec other_c_time = other.c_time - cdef int result = gpr_time_cmp(self_c_time, other_c_time) - if op == 0: # < - return result < 0 - elif op == 2: # == - return result == 0 - elif op == 4: # > - return result > 0 - elif op == 1: # <= - return result <= 0 - elif op == 3: # != - return result != 0 - elif op == 5: # >= - return result >= 0 - else: - raise ValueError('__richcmp__ `op` contract violated') - - cdef class CallDetails: def __cinit__(self): @@ -213,9 +145,7 @@ cdef class CallDetails: @property def deadline(self): - timespec = Timespec(float("-inf")) - timespec.c_time = self.c_details.deadline - return timespec + return _time_from_timespec(self.c_details.deadline) cdef class SslPemKeyCertPair: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index c19beccde6..e5d28a85d5 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -106,7 +106,7 @@ cdef class Server: with nogil: grpc_server_start(self.c_server) # Ensure the core has gotten a chance to do the start-up work - self.backup_shutdown_queue.poll(Timespec(None)) + self.backup_shutdown_queue.poll(deadline=time.time()) def add_http2_port(self, bytes address, ServerCredentials server_credentials=None): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/time.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/time.pxd.pxi new file mode 100644 index 0000000000..ce67c61eaf --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/time.pxd.pxi @@ -0,0 +1,19 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef gpr_timespec _timespec_from_time(object time) + + +cdef double _time_from_timespec(gpr_timespec timespec) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/time.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/time.pyx.pxi new file mode 100644 index 0000000000..7a668680b8 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/time.pyx.pxi @@ -0,0 +1,30 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef gpr_timespec _timespec_from_time(object time): + cdef gpr_timespec timespec + if time is None: + return gpr_inf_future(GPR_CLOCK_REALTIME) + else: + timespec.seconds = time + timespec.nanoseconds = (time - float(timespec.seconds)) * 1e9 + timespec.clock_type = GPR_CLOCK_REALTIME + return timespec + + +cdef double _time_from_timespec(gpr_timespec timespec): + cdef gpr_timespec real_timespec = gpr_convert_clock_type( + timespec, GPR_CLOCK_REALTIME) + return <double>real_timespec.seconds + <double>real_timespec.nanoseconds / 1e9 diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index b32fa518fc..01e2da6d54 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -25,3 +25,4 @@ include "_cygrpc/records.pxd.pxi" include "_cygrpc/security.pxd.pxi" include "_cygrpc/server.pxd.pxi" include "_cygrpc/tag.pxd.pxi" +include "_cygrpc/time.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 5106394708..d8ac84a317 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -32,6 +32,7 @@ include "_cygrpc/records.pyx.pxi" include "_cygrpc/security.pyx.pxi" include "_cygrpc/server.pyx.pxi" include "_cygrpc/tag.pyx.pxi" +include "_cygrpc/time.pyx.pxi" # # initialize gRPC diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py index 993c49d4af..6032828c77 100644 --- a/src/python/grpcio/grpc/_grpcio_metadata.py +++ b/src/python/grpcio/grpc/_grpcio_metadata.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!! -__version__ = """1.9.0.dev0""" +__version__ = """1.10.0.dev0""" diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 1cdb2d45b6..56122fee11 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -220,8 +220,7 @@ class _Context(grpc.ServicerContext): return self._state.client is not _CANCELLED and not self._state.statused def time_remaining(self): - return max( - float(self._rpc_event.call_details.deadline) - time.time(), 0) + return max(self._rpc_event.call_details.deadline - time.time(), 0) def cancel(self): self._rpc_event.call.cancel() @@ -278,6 +277,12 @@ class _Context(grpc.ServicerContext): self._state.trailing_metadata = trailing_metadata def abort(self, code, details): + # treat OK like other invalid arguments: fail the RPC + if code == grpc.StatusCode.OK: + logging.error( + 'abort() called with StatusCode.OK; returning UNKNOWN') + code = grpc.StatusCode.UNKNOWN + details = '' with self._state.condition: self._state.code = code self._state.details = _common.encode(details) diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 1e964cebce..17dd792be8 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -15,51 +15,51 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_core_dependencies.py.template`!!! CORE_SOURCE_FILES = [ + 'src/core/lib/gpr/alloc.cc', + 'src/core/lib/gpr/arena.cc', + 'src/core/lib/gpr/atm.cc', + 'src/core/lib/gpr/avl.cc', + 'src/core/lib/gpr/cmdline.cc', + 'src/core/lib/gpr/cpu_iphone.cc', + 'src/core/lib/gpr/cpu_linux.cc', + 'src/core/lib/gpr/cpu_posix.cc', + 'src/core/lib/gpr/cpu_windows.cc', + 'src/core/lib/gpr/env_linux.cc', + 'src/core/lib/gpr/env_posix.cc', + 'src/core/lib/gpr/env_windows.cc', + 'src/core/lib/gpr/fork.cc', + 'src/core/lib/gpr/host_port.cc', + 'src/core/lib/gpr/log.cc', + 'src/core/lib/gpr/log_android.cc', + 'src/core/lib/gpr/log_linux.cc', + 'src/core/lib/gpr/log_posix.cc', + 'src/core/lib/gpr/log_windows.cc', + 'src/core/lib/gpr/mpscq.cc', + 'src/core/lib/gpr/murmur_hash.cc', + 'src/core/lib/gpr/object_registry.cc', + 'src/core/lib/gpr/string.cc', + 'src/core/lib/gpr/string_posix.cc', + 'src/core/lib/gpr/string_util_windows.cc', + 'src/core/lib/gpr/string_windows.cc', + 'src/core/lib/gpr/subprocess_posix.cc', + 'src/core/lib/gpr/subprocess_windows.cc', + 'src/core/lib/gpr/sync.cc', + 'src/core/lib/gpr/sync_posix.cc', + 'src/core/lib/gpr/sync_windows.cc', + 'src/core/lib/gpr/thd.cc', + 'src/core/lib/gpr/thd_posix.cc', + 'src/core/lib/gpr/thd_windows.cc', + 'src/core/lib/gpr/time.cc', + 'src/core/lib/gpr/time_posix.cc', + 'src/core/lib/gpr/time_precise.cc', + 'src/core/lib/gpr/time_windows.cc', + 'src/core/lib/gpr/tls_pthread.cc', + 'src/core/lib/gpr/tmpfile_msys.cc', + 'src/core/lib/gpr/tmpfile_posix.cc', + 'src/core/lib/gpr/tmpfile_windows.cc', + 'src/core/lib/gpr/wrap_memcpy.cc', 'src/core/lib/profiling/basic_timers.cc', 'src/core/lib/profiling/stap_timers.cc', - 'src/core/lib/support/alloc.cc', - 'src/core/lib/support/arena.cc', - 'src/core/lib/support/atm.cc', - 'src/core/lib/support/avl.cc', - 'src/core/lib/support/cmdline.cc', - 'src/core/lib/support/cpu_iphone.cc', - 'src/core/lib/support/cpu_linux.cc', - 'src/core/lib/support/cpu_posix.cc', - 'src/core/lib/support/cpu_windows.cc', - 'src/core/lib/support/env_linux.cc', - 'src/core/lib/support/env_posix.cc', - 'src/core/lib/support/env_windows.cc', - 'src/core/lib/support/fork.cc', - 'src/core/lib/support/host_port.cc', - 'src/core/lib/support/log.cc', - 'src/core/lib/support/log_android.cc', - 'src/core/lib/support/log_linux.cc', - 'src/core/lib/support/log_posix.cc', - 'src/core/lib/support/log_windows.cc', - 'src/core/lib/support/mpscq.cc', - 'src/core/lib/support/murmur_hash.cc', - 'src/core/lib/support/object_registry.cc', - 'src/core/lib/support/string.cc', - 'src/core/lib/support/string_posix.cc', - 'src/core/lib/support/string_util_windows.cc', - 'src/core/lib/support/string_windows.cc', - 'src/core/lib/support/subprocess_posix.cc', - 'src/core/lib/support/subprocess_windows.cc', - 'src/core/lib/support/sync.cc', - 'src/core/lib/support/sync_posix.cc', - 'src/core/lib/support/sync_windows.cc', - 'src/core/lib/support/thd.cc', - 'src/core/lib/support/thd_posix.cc', - 'src/core/lib/support/thd_windows.cc', - 'src/core/lib/support/time.cc', - 'src/core/lib/support/time_posix.cc', - 'src/core/lib/support/time_precise.cc', - 'src/core/lib/support/time_windows.cc', - 'src/core/lib/support/tls_pthread.cc', - 'src/core/lib/support/tmpfile_msys.cc', - 'src/core/lib/support/tmpfile_posix.cc', - 'src/core/lib/support/tmpfile_windows.cc', - 'src/core/lib/support/wrap_memcpy.cc', 'src/core/lib/surface/init.cc', 'src/core/lib/backoff/backoff.cc', 'src/core/lib/channel/channel_args.cc', diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index 1fac57b03a..a654eb026a 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION = '1.9.0.dev0' +VERSION = '1.10.0.dev0' diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py index 5b7e5859bc..d3185c6972 100644 --- a/src/python/grpcio_health_checking/grpc_version.py +++ b/src/python/grpcio_health_checking/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!! -VERSION = '1.9.0.dev0' +VERSION = '1.10.0.dev0' diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py index 0ad9621154..7203d0d321 100644 --- a/src/python/grpcio_reflection/grpc_version.py +++ b/src/python/grpcio_reflection/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! -VERSION = '1.9.0.dev0' +VERSION = '1.10.0.dev0' diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py index 0eb5fbf94d..bf9e55e10e 100644 --- a/src/python/grpcio_testing/grpc_version.py +++ b/src/python/grpcio_testing/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!! -VERSION = '1.9.0.dev0' +VERSION = '1.10.0.dev0' diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py index b1b4d7e0c2..2583e42016 100644 --- a/src/python/grpcio_tests/grpc_version.py +++ b/src/python/grpcio_tests/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!! -VERSION = '1.9.0.dev0' +VERSION = '1.10.0.dev0' diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index b81d6fbc61..2ca1fa82f4 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -20,7 +20,6 @@ from grpc._cython import cygrpc from grpc.framework.foundation import logging_pool from tests.unit.framework.common import test_constants -_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) _EMPTY_FLAGS = 0 _EMPTY_METADATA = () @@ -171,9 +170,9 @@ class CancelManyCallsTest(unittest.TestCase): with client_condition: client_calls = [] for index in range(test_constants.RPC_CONCURRENCY): - client_call = channel.create_call( - None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', - None, _INFINITE_FUTURE) + client_call = channel.create_call(None, _EMPTY_FLAGS, + client_completion_queue, + b'/twinkies', None, None) operations = ( cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, _EMPTY_FLAGS), diff --git a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py index 4eeb34b92e..c22c77ddbd 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py @@ -31,9 +31,9 @@ def _connectivity_loop(channel, completion_queue): for _ in range(100): connectivity = channel.check_connectivity_state(True) channel.watch_connectivity_state(connectivity, - cygrpc.Timespec(time.time() + 0.2), - completion_queue, None) - completion_queue.poll(deadline=cygrpc.Timespec(float('+inf'))) + time.time() + 0.2, completion_queue, + None) + completion_queue.poll() def _create_loop_destroy(): diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py index ffd226fa95..d4b01ca38b 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_common.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -20,7 +20,6 @@ from grpc._cython import cygrpc RPC_COUNT = 4000 -INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) EMPTY_FLAGS = 0 INVOCATION_METADATA = ( diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py index 4ef4ad33e5..7caa98f72d 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -41,9 +41,9 @@ class Test(_common.RpcTest, unittest.TestCase): server_request_call_tag, }) - client_call = self.channel.create_call( - None, _common.EMPTY_FLAGS, self.client_completion_queue, - b'/twinkies', None, _common.INFINITE_FUTURE) + client_call = self.channel.create_call(None, _common.EMPTY_FLAGS, + self.client_completion_queue, + b'/twinkies', None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' with self.client_condition: diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py index 85395c9680..8582a39c01 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -36,9 +36,9 @@ class Test(_common.RpcTest, unittest.TestCase): server_request_call_tag, }) - client_call = self.channel.create_call( - None, _common.EMPTY_FLAGS, self.client_completion_queue, - b'/twinkies', None, _common.INFINITE_FUTURE) + client_call = self.channel.create_call(None, _common.EMPTY_FLAGS, + self.client_completion_queue, + b'/twinkies', None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' with self.client_condition: diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index 82ef25b2a7..ecd23afda7 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -18,7 +18,6 @@ import unittest from grpc._cython import cygrpc -_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) _EMPTY_FLAGS = 0 _EMPTY_METADATA = () @@ -156,7 +155,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): client_call = channel.create_call(None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', - None, _INFINITE_FUTURE) + None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' with client_condition: diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 5f9b74ba98..561adf7dff 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -37,21 +37,6 @@ def _metadata_plugin(context, callback): class TypeSmokeTest(unittest.TestCase): - def testTimespec(self): - now = time.time() - now_timespec_a = cygrpc.Timespec(now) - now_timespec_b = cygrpc.Timespec(now) - self.assertAlmostEqual(now, float(now_timespec_a), places=8) - self.assertEqual(now_timespec_a, now_timespec_b) - self.assertLess(cygrpc.Timespec(now - 1), cygrpc.Timespec(now)) - self.assertGreater(cygrpc.Timespec(now + 1), cygrpc.Timespec(now)) - self.assertGreaterEqual(cygrpc.Timespec(now + 1), cygrpc.Timespec(now)) - self.assertGreaterEqual(cygrpc.Timespec(now), cygrpc.Timespec(now)) - self.assertLessEqual(cygrpc.Timespec(now - 1), cygrpc.Timespec(now)) - self.assertLessEqual(cygrpc.Timespec(now), cygrpc.Timespec(now)) - self.assertNotEqual(cygrpc.Timespec(now - 1), cygrpc.Timespec(now)) - self.assertNotEqual(cygrpc.Timespec(now + 1), cygrpc.Timespec(now)) - def testCompletionQueueUpDown(self): completion_queue = cygrpc.CompletionQueue() del completion_queue @@ -147,7 +132,7 @@ class ServerClientMixin(object): try: call_result = call.start_client_batch(operations, tag) self.assertEqual(cygrpc.CallError.ok, call_result) - event = queue.poll(deadline) + event = queue.poll(deadline=deadline) self.assertEqual(cygrpc.CompletionType.operation_complete, event.completion_type) self.assertTrue(event.success) @@ -176,8 +161,6 @@ class ServerClientMixin(object): RESPONSE = b'his name is robert paulson' METHOD = b'twinkies' - cygrpc_deadline = cygrpc.Timespec(DEADLINE) - server_request_tag = object() request_call_result = self.server.request_call( self.server_completion_queue, self.server_completion_queue, @@ -188,7 +171,7 @@ class ServerClientMixin(object): client_call_tag = object() client_call = self.client_channel.create_call( None, 0, self.client_completion_queue, METHOD, self.host_argument, - cygrpc_deadline) + DEADLINE) client_initial_metadata = ( ( CLIENT_METADATA_ASCII_KEY, @@ -210,9 +193,9 @@ class ServerClientMixin(object): ], client_call_tag) self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) client_event_future = test_utilities.CompletionQueuePollFuture( - self.client_completion_queue, cygrpc_deadline) + self.client_completion_queue, DEADLINE) - request_event = self.server_completion_queue.poll(cygrpc_deadline) + request_event = self.server_completion_queue.poll(deadline=DEADLINE) self.assertEqual(cygrpc.CompletionType.operation_complete, request_event.completion_type) self.assertIsInstance(request_event.call, cygrpc.Call) @@ -223,7 +206,7 @@ class ServerClientMixin(object): self.assertEqual(METHOD, request_event.call_details.method) self.assertEqual(self.expected_host, request_event.call_details.host) self.assertLess( - abs(DEADLINE - float(request_event.call_details.deadline)), + abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE) server_call_tag = object() @@ -248,7 +231,7 @@ class ServerClientMixin(object): ], server_call_tag) self.assertEqual(cygrpc.CallError.ok, server_start_batch_result) - server_event = self.server_completion_queue.poll(cygrpc_deadline) + server_event = self.server_completion_queue.poll(deadline=DEADLINE) client_event = client_event_future.result() self.assertEqual(6, len(client_event.batch_operations)) @@ -310,7 +293,6 @@ class ServerClientMixin(object): DEADLINE_TOLERANCE = 0.25 METHOD = b'twinkies' - cygrpc_deadline = cygrpc.Timespec(DEADLINE) empty_metadata = () server_request_tag = object() @@ -319,26 +301,26 @@ class ServerClientMixin(object): server_request_tag) client_call = self.client_channel.create_call( None, 0, self.client_completion_queue, METHOD, self.host_argument, - cygrpc_deadline) + DEADLINE) # Prologue def perform_client_operations(operations, description): return self._perform_operations(operations, client_call, self.client_completion_queue, - cygrpc_deadline, description) + DEADLINE, description) client_event_future = perform_client_operations([ cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), ], "Client prologue") - request_event = self.server_completion_queue.poll(cygrpc_deadline) + request_event = self.server_completion_queue.poll(deadline=DEADLINE) server_call = request_event.call def perform_server_operations(operations, description): return self._perform_operations(operations, server_call, self.server_completion_queue, - cygrpc_deadline, description) + DEADLINE, description) server_event_future = perform_server_operations([ cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), diff --git a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py index 8e91161f80..4a00b9ef2f 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py +++ b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py @@ -49,4 +49,4 @@ class CompletionQueuePollFuture(SimpleFuture): def __init__(self, completion_queue, deadline): super(CompletionQueuePollFuture, - self).__init__(lambda: completion_queue.poll(deadline)) + self).__init__(lambda: completion_queue.poll(deadline=deadline)) diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py index bb6ac70497..ca10bd4dab 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py @@ -50,6 +50,12 @@ _SERVER_TRAILING_METADATA = (('server-trailing-md-key', _NON_OK_CODE = grpc.StatusCode.NOT_FOUND _DETAILS = 'Test details!' +# calling abort should always fail an RPC, even for "invalid" codes +_ABORT_CODES = (_NON_OK_CODE, 3, grpc.StatusCode.OK) +_EXPECTED_CLIENT_CODES = (_NON_OK_CODE, grpc.StatusCode.UNKNOWN, + grpc.StatusCode.UNKNOWN) +_EXPECTED_DETAILS = (_DETAILS, _DETAILS, '') + class _Servicer(object): @@ -302,99 +308,119 @@ class MetadataCodeDetailsTest(unittest.TestCase): self.assertEqual(_DETAILS, response_iterator_call.details()) def testAbortedUnaryUnary(self): - self._servicer.set_code(_NON_OK_CODE) - self._servicer.set_details(_DETAILS) - self._servicer.set_abort_call() - - with self.assertRaises(grpc.RpcError) as exception_context: - self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA) - - self.assertTrue( - test_common.metadata_transmitted( - _CLIENT_METADATA, self._servicer.received_client_metadata())) - self.assertTrue( - test_common.metadata_transmitted( - _SERVER_INITIAL_METADATA, - exception_context.exception.initial_metadata())) - self.assertTrue( - test_common.metadata_transmitted( - _SERVER_TRAILING_METADATA, - exception_context.exception.trailing_metadata())) - self.assertIs(_NON_OK_CODE, exception_context.exception.code()) - self.assertEqual(_DETAILS, exception_context.exception.details()) + test_cases = zip(_ABORT_CODES, _EXPECTED_CLIENT_CODES, + _EXPECTED_DETAILS) + for abort_code, expected_code, expected_details in test_cases: + self._servicer.set_code(abort_code) + self._servicer.set_details(_DETAILS) + self._servicer.set_abort_call() + + with self.assertRaises(grpc.RpcError) as exception_context: + self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, + self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(expected_code, exception_context.exception.code()) + self.assertEqual(expected_details, + exception_context.exception.details()) def testAbortedUnaryStream(self): - self._servicer.set_code(_NON_OK_CODE) - self._servicer.set_details(_DETAILS) - self._servicer.set_abort_call() - - response_iterator_call = self._unary_stream( - _SERIALIZED_REQUEST, metadata=_CLIENT_METADATA) - received_initial_metadata = response_iterator_call.initial_metadata() - with self.assertRaises(grpc.RpcError): - self.assertEqual(len(list(response_iterator_call)), 0) - - self.assertTrue( - test_common.metadata_transmitted( - _CLIENT_METADATA, self._servicer.received_client_metadata())) - self.assertTrue( - test_common.metadata_transmitted(_SERVER_INITIAL_METADATA, - received_initial_metadata)) - self.assertTrue( - test_common.metadata_transmitted( - _SERVER_TRAILING_METADATA, - response_iterator_call.trailing_metadata())) - self.assertIs(_NON_OK_CODE, response_iterator_call.code()) - self.assertEqual(_DETAILS, response_iterator_call.details()) + test_cases = zip(_ABORT_CODES, _EXPECTED_CLIENT_CODES, + _EXPECTED_DETAILS) + for abort_code, expected_code, expected_details in test_cases: + self._servicer.set_code(abort_code) + self._servicer.set_details(_DETAILS) + self._servicer.set_abort_call() + + response_iterator_call = self._unary_stream( + _SERIALIZED_REQUEST, metadata=_CLIENT_METADATA) + received_initial_metadata = \ + response_iterator_call.initial_metadata() + with self.assertRaises(grpc.RpcError): + self.assertEqual(len(list(response_iterator_call)), 0) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, + self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted(_SERVER_INITIAL_METADATA, + received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + response_iterator_call.trailing_metadata())) + self.assertIs(expected_code, response_iterator_call.code()) + self.assertEqual(expected_details, response_iterator_call.details()) def testAbortedStreamUnary(self): - self._servicer.set_code(_NON_OK_CODE) - self._servicer.set_details(_DETAILS) - self._servicer.set_abort_call() - - with self.assertRaises(grpc.RpcError) as exception_context: - self._stream_unary.with_call( - iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH), - metadata=_CLIENT_METADATA) - - self.assertTrue( - test_common.metadata_transmitted( - _CLIENT_METADATA, self._servicer.received_client_metadata())) - self.assertTrue( - test_common.metadata_transmitted( - _SERVER_INITIAL_METADATA, - exception_context.exception.initial_metadata())) - self.assertTrue( - test_common.metadata_transmitted( - _SERVER_TRAILING_METADATA, - exception_context.exception.trailing_metadata())) - self.assertIs(_NON_OK_CODE, exception_context.exception.code()) - self.assertEqual(_DETAILS, exception_context.exception.details()) + test_cases = zip(_ABORT_CODES, _EXPECTED_CLIENT_CODES, + _EXPECTED_DETAILS) + for abort_code, expected_code, expected_details in test_cases: + self._servicer.set_code(abort_code) + self._servicer.set_details(_DETAILS) + self._servicer.set_abort_call() + + with self.assertRaises(grpc.RpcError) as exception_context: + self._stream_unary.with_call( + iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, + self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(expected_code, exception_context.exception.code()) + self.assertEqual(expected_details, + exception_context.exception.details()) def testAbortedStreamStream(self): - self._servicer.set_code(_NON_OK_CODE) - self._servicer.set_details(_DETAILS) - self._servicer.set_abort_call() - - response_iterator_call = self._stream_stream( - iter([object()] * test_constants.STREAM_LENGTH), - metadata=_CLIENT_METADATA) - received_initial_metadata = response_iterator_call.initial_metadata() - with self.assertRaises(grpc.RpcError): - self.assertEqual(len(list(response_iterator_call)), 0) - - self.assertTrue( - test_common.metadata_transmitted( - _CLIENT_METADATA, self._servicer.received_client_metadata())) - self.assertTrue( - test_common.metadata_transmitted(_SERVER_INITIAL_METADATA, - received_initial_metadata)) - self.assertTrue( - test_common.metadata_transmitted( - _SERVER_TRAILING_METADATA, - response_iterator_call.trailing_metadata())) - self.assertIs(_NON_OK_CODE, response_iterator_call.code()) - self.assertEqual(_DETAILS, response_iterator_call.details()) + test_cases = zip(_ABORT_CODES, _EXPECTED_CLIENT_CODES, + _EXPECTED_DETAILS) + for abort_code, expected_code, expected_details in test_cases: + self._servicer.set_code(abort_code) + self._servicer.set_details(_DETAILS) + self._servicer.set_abort_call() + + response_iterator_call = self._stream_stream( + iter([object()] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + received_initial_metadata = \ + response_iterator_call.initial_metadata() + with self.assertRaises(grpc.RpcError): + self.assertEqual(len(list(response_iterator_call)), 0) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, + self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted(_SERVER_INITIAL_METADATA, + received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + response_iterator_call.trailing_metadata())) + self.assertIs(expected_code, response_iterator_call.code()) + self.assertEqual(expected_details, response_iterator_call.details()) def testCustomCodeUnaryUnary(self): self._servicer.set_code(_NON_OK_CODE) |