diff options
Diffstat (limited to 'src/python/grpcio/grpc')
23 files changed, 980 insertions, 447 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 564772527e..8b913ac949 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -342,6 +342,170 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)): raise NotImplementedError() +############## Invocation-Side Interceptor Interfaces & Classes ############## + + +class ClientCallDetails(six.with_metaclass(abc.ABCMeta)): + """Describes an RPC to be invoked. + + This is an EXPERIMENTAL API. + + Attributes: + method: The method name of the RPC. + timeout: An optional duration of time in seconds to allow for the RPC. + metadata: Optional :term:`metadata` to be transmitted to + the service-side of the RPC. + credentials: An optional CallCredentials for the RPC. + """ + + +class UnaryUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)): + """Affords intercepting unary-unary invocations. + + This is an EXPERIMENTAL API. + """ + + @abc.abstractmethod + def intercept_unary_unary(self, continuation, client_call_details, request): + """Intercepts a unary-unary invocation asynchronously. + + Args: + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_future = continuation(client_call_details, request)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and a Future. In the event of RPC + completion, the return Call-Future's result value will be + the response message of the RPC. Should the event terminate + with non-OK status, the returned Call-Future's exception value + will be an RpcError. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request: The request value for the RPC. + + Returns: + An object that is both a Call for the RPC and a Future. + In the event of RPC completion, the return Call-Future's + result value will be the response message of the RPC. + Should the event terminate with non-OK status, the returned + Call-Future's exception value will be an RpcError. + """ + raise NotImplementedError() + + +class UnaryStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)): + """Affords intercepting unary-stream invocations. + + This is an EXPERIMENTAL API. + """ + + @abc.abstractmethod + def intercept_unary_stream(self, continuation, client_call_details, + request): + """Intercepts a unary-stream invocation. + + Args: + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_iterator = continuation(client_call_details, request)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and an iterator for response values. + Drawing response values from the returned Call-iterator may + raise RpcError indicating termination of the RPC with non-OK + status. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request: The request value for the RPC. + + Returns: + An object that is both a Call for the RPC and an iterator of + response values. Drawing response values from the returned + Call-iterator may raise RpcError indicating termination of + the RPC with non-OK status. + """ + raise NotImplementedError() + + +class StreamUnaryClientInterceptor(six.with_metaclass(abc.ABCMeta)): + """Affords intercepting stream-unary invocations. + + This is an EXPERIMENTAL API. + """ + + @abc.abstractmethod + def intercept_stream_unary(self, continuation, client_call_details, + request_iterator): + """Intercepts a stream-unary invocation asynchronously. + + Args: + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_future = continuation(client_call_details, + request_iterator)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and a Future. In the event of RPC completion, + the return Call-Future's result value will be the response message + of the RPC. Should the event terminate with non-OK status, the + returned Call-Future's exception value will be an RpcError. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request_iterator: An iterator that yields request values for the RPC. + + Returns: + An object that is both a Call for the RPC and a Future. + In the event of RPC completion, the return Call-Future's + result value will be the response message of the RPC. + Should the event terminate with non-OK status, the returned + Call-Future's exception value will be an RpcError. + """ + raise NotImplementedError() + + +class StreamStreamClientInterceptor(six.with_metaclass(abc.ABCMeta)): + """Affords intercepting stream-stream invocations. + + This is an EXPERIMENTAL API. + """ + + @abc.abstractmethod + def intercept_stream_stream(self, continuation, client_call_details, + request_iterator): + """Intercepts a stream-stream invocation. + + continuation: A function that proceeds with the invocation by + executing the next interceptor in chain or invoking the + actual RPC on the underlying Channel. It is the interceptor's + responsibility to call it if it decides to move the RPC forward. + The interceptor can use + `response_iterator = continuation(client_call_details, + request_iterator)` + to continue with the RPC. `continuation` returns an object that is + both a Call for the RPC and an iterator for response values. + Drawing response values from the returned Call-iterator may + raise RpcError indicating termination of the RPC with non-OK + status. + client_call_details: A ClientCallDetails object describing the + outgoing RPC. + request_iterator: An iterator that yields request values for the RPC. + + Returns: + An object that is both a Call for the RPC and an iterator of + response values. Drawing response values from the returned + Call-iterator may raise RpcError indicating termination of + the RPC with non-OK status. + """ + raise NotImplementedError() + + ############ Authentication & Authorization Interfaces & Classes ############# @@ -835,27 +999,47 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)): raise NotImplementedError() @abc.abstractmethod + def abort(self, code, details): + """Raises an exception to terminate the RPC with a non-OK status. + + The code and details passed as arguments will supercede any existing + ones. + + Args: + code: A StatusCode object to be sent to the client. + It must not be StatusCode.OK. + details: An ASCII-encodable string to be sent to the client upon + termination of the RPC. + + Raises: + Exception: An exception is always raised to signal the abortion the + RPC to the gRPC runtime. + """ + raise NotImplementedError() + + @abc.abstractmethod def set_code(self, code): """Sets the value to be used as status code upon RPC completion. - This method need not be called by method implementations if they wish the - gRPC runtime to determine the status code of the RPC. + This method need not be called by method implementations if they wish + the gRPC runtime to determine the status code of the RPC. - Args: - code: A StatusCode object to be sent to the client. - """ + Args: + code: A StatusCode object to be sent to the client. + """ raise NotImplementedError() @abc.abstractmethod def set_details(self, details): """Sets the value to be used as detail string upon RPC completion. - This method need not be called by method implementations if they have no - details to transmit. + This method need not be called by method implementations if they have + no details to transmit. - Args: - details: An arbitrary string to be sent to the client upon completion. - """ + Args: + details: An ASCII-encodable string to be sent to the client upon + termination of the RPC. + """ raise NotImplementedError() @@ -942,6 +1126,34 @@ class ServiceRpcHandler(six.with_metaclass(abc.ABCMeta, GenericRpcHandler)): raise NotImplementedError() +#################### Service-Side Interceptor Interfaces ##################### + + +class ServerInterceptor(six.with_metaclass(abc.ABCMeta)): + """Affords intercepting incoming RPCs on the service-side. + + This is an EXPERIMENTAL API. + """ + + @abc.abstractmethod + def intercept_service(self, continuation, handler_call_details): + """Intercepts incoming RPCs before handing them over to a handler. + + Args: + continuation: A function that takes a HandlerCallDetails and + proceeds to invoke the next interceptor in the chain, if any, + or the RPC handler lookup logic, with the call details passed + as an argument, and returns an RpcMethodHandler instance if + the RPC is considered serviced, or None otherwise. + handler_call_details: A HandlerCallDetails describing the RPC. + + Returns: + An RpcMethodHandler with which the RPC may be serviced if the + interceptor chooses to service this RPC, or None otherwise. + """ + raise NotImplementedError() + + ############################# Server Interface ############################### @@ -1356,53 +1568,88 @@ def secure_channel(target, credentials, options=None): credentials._credentials) +def intercept_channel(channel, *interceptors): + """Intercepts a channel through a set of interceptors. + + This is an EXPERIMENTAL API. + + Args: + channel: A Channel. + interceptors: Zero or more objects of type + UnaryUnaryClientInterceptor, + UnaryStreamClientInterceptor, + StreamUnaryClientInterceptor, or + StreamStreamClientInterceptor. + Interceptors are given control in the order they are listed. + + Returns: + A Channel that intercepts each invocation via the provided interceptors. + + Raises: + TypeError: If interceptor does not derive from any of + UnaryUnaryClientInterceptor, + UnaryStreamClientInterceptor, + StreamUnaryClientInterceptor, or + StreamStreamClientInterceptor. + """ + from grpc import _interceptor # pylint: disable=cyclic-import + return _interceptor.intercept_channel(channel, *interceptors) + + def server(thread_pool, handlers=None, + interceptors=None, options=None, maximum_concurrent_rpcs=None): """Creates a Server with which RPCs can be serviced. - Args: - thread_pool: A futures.ThreadPoolExecutor to be used by the Server - to execute RPC handlers. - handlers: An optional list of GenericRpcHandlers used for executing RPCs. - More handlers may be added by calling add_generic_rpc_handlers any time - before the server is started. - options: An optional list of key-value pairs (channel args in gRPC runtime) - to configure the channel. - maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server - will service before returning RESOURCE_EXHAUSTED status, or None to - indicate no limit. + Args: + thread_pool: A futures.ThreadPoolExecutor to be used by the Server + to execute RPC handlers. + handlers: An optional list of GenericRpcHandlers used for executing RPCs. + More handlers may be added by calling add_generic_rpc_handlers any time + before the server is started. + interceptors: An optional list of ServerInterceptor objects that observe + and optionally manipulate the incoming RPCs before handing them over to + handlers. The interceptors are given control in the order they are + specified. This is an EXPERIMENTAL API. + options: An optional list of key-value pairs (channel args in gRPC runtime) + to configure the channel. + maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server + will service before returning RESOURCE_EXHAUSTED status, or None to + indicate no limit. - Returns: - A Server object. - """ + Returns: + A Server object. + """ from grpc import _server # pylint: disable=cyclic-import return _server.Server(thread_pool, () if handlers is None else handlers, () - if options is None else options, - maximum_concurrent_rpcs) + if interceptors is None else interceptors, () if + options is None else options, maximum_concurrent_rpcs) ################################### __all__ ################################# -__all__ = ('FutureTimeoutError', 'FutureCancelledError', 'Future', - 'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext', - 'Call', 'ChannelCredentials', 'CallCredentials', - 'AuthMetadataContext', 'AuthMetadataPluginCallback', - 'AuthMetadataPlugin', 'ServerCertificateConfiguration', - 'ServerCredentials', 'UnaryUnaryMultiCallable', - 'UnaryStreamMultiCallable', 'StreamUnaryMultiCallable', - 'StreamStreamMultiCallable', 'Channel', 'ServicerContext', - 'RpcMethodHandler', 'HandlerCallDetails', 'GenericRpcHandler', - 'ServiceRpcHandler', 'Server', 'unary_unary_rpc_method_handler', - 'unary_stream_rpc_method_handler', 'stream_unary_rpc_method_handler', - 'stream_stream_rpc_method_handler', - 'method_handlers_generic_handler', 'ssl_channel_credentials', - 'metadata_call_credentials', 'access_token_call_credentials', - 'composite_call_credentials', 'composite_channel_credentials', - 'ssl_server_credentials', 'ssl_server_certificate_configuration', - 'dynamic_ssl_server_credentials', 'channel_ready_future', - 'insecure_channel', 'secure_channel', 'server',) +__all__ = ( + 'FutureTimeoutError', 'FutureCancelledError', 'Future', + 'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext', 'Call', + 'ChannelCredentials', 'CallCredentials', 'AuthMetadataContext', + 'AuthMetadataPluginCallback', 'AuthMetadataPlugin', 'ClientCallDetails', + 'ServerCertificateConfiguration', 'ServerCredentials', + 'UnaryUnaryMultiCallable', 'UnaryStreamMultiCallable', + 'StreamUnaryMultiCallable', 'StreamStreamMultiCallable', + 'UnaryUnaryClientInterceptor', 'UnaryStreamClientInterceptor', + 'StreamUnaryClientInterceptor', 'StreamStreamClientInterceptor', 'Channel', + 'ServicerContext', 'RpcMethodHandler', 'HandlerCallDetails', + 'GenericRpcHandler', 'ServiceRpcHandler', 'Server', 'ServerInterceptor', + 'unary_unary_rpc_method_handler', 'unary_stream_rpc_method_handler', + 'stream_unary_rpc_method_handler', 'stream_stream_rpc_method_handler', + 'method_handlers_generic_handler', 'ssl_channel_credentials', + 'metadata_call_credentials', 'access_token_call_credentials', + 'composite_call_credentials', 'composite_channel_credentials', + 'ssl_server_credentials', 'ssl_server_certificate_configuration', + 'dynamic_ssl_server_credentials', 'channel_ready_future', + 'insecure_channel', 'secure_channel', 'intercept_channel', 'server',) ############################### Extension Shims ################################ diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index cf4ce0941b..d7456a3dd1 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -122,8 +122,8 @@ def _abort(state, code, details): state.code = code state.details = details if state.initial_metadata is None: - state.initial_metadata = _common.EMPTY_METADATA - state.trailing_metadata = _common.EMPTY_METADATA + state.initial_metadata = () + state.trailing_metadata = () def _handle_event(event, state, response_deserializer): @@ -202,8 +202,7 @@ def _consume_request_iterator(request_iterator, state, call, else: operations = (cygrpc.operation_send_message( serialized_request, _EMPTY_FLAGS),) - call.start_client_batch( - cygrpc.Operations(operations), event_handler) + call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_message) while True: state.condition.wait() @@ -218,8 +217,7 @@ def _consume_request_iterator(request_iterator, state, call, if state.code is None: operations = ( cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),) - call.start_client_batch( - cygrpc.Operations(operations), event_handler) + call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_close_from_client) def stop_consumption_thread(timeout): # pylint: disable=unused-argument @@ -321,8 +319,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): event_handler = _event_handler(self._state, self._call, self._response_deserializer) self._call.start_client_batch( - cygrpc.Operations( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),)), + (cygrpc.operation_receive_message(_EMPTY_FLAGS),), event_handler) self._state.due.add(cygrpc.OperationType.receive_message) elif self._state.code is grpc.StatusCode.OK: @@ -372,14 +369,13 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): with self._state.condition: while self._state.initial_metadata is None: self._state.condition.wait() - return _common.to_application_metadata(self._state.initial_metadata) + return self._state.initial_metadata def trailing_metadata(self): with self._state.condition: while self._state.trailing_metadata is None: self._state.condition.wait() - return _common.to_application_metadata( - self._state.trailing_metadata) + return self._state.trailing_metadata def code(self): with self._state.condition: @@ -420,8 +416,7 @@ def _start_unary_request(request, timeout, request_serializer): deadline, deadline_timespec = _deadline(timeout) serialized_request = _common.serialize(request, request_serializer) if serialized_request is None: - state = _RPCState((), _common.EMPTY_METADATA, _common.EMPTY_METADATA, - grpc.StatusCode.INTERNAL, + state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 'Exception serializing request!') rendezvous = _Rendezvous(state, None, None, deadline) return deadline, deadline_timespec, None, rendezvous @@ -458,8 +453,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): else: state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( - cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), + cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS), cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), @@ -479,8 +473,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): deadline_timespec) if credentials is not None: call.set_credentials(credentials._credentials) - call_error = call.start_client_batch( - cygrpc.Operations(operations), None) + call_error = call.start_client_batch(operations, None) _check_call_error(call_error, metadata) _handle_event(completion_queue.poll(), state, self._response_deserializer) @@ -509,8 +502,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: - call_error = call.start_client_batch( - cygrpc.Operations(operations), event_handler) + call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) return _Rendezvous(state, None, None, deadline) @@ -544,18 +536,15 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._response_deserializer) with state.condition: call.start_client_batch( - cygrpc.Operations(( - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - )), event_handler) + (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + event_handler) operations = ( cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(metadata), - _EMPTY_FLAGS), cygrpc.operation_send_message( + metadata, _EMPTY_FLAGS), cygrpc.operation_send_message( serialized_request, _EMPTY_FLAGS), cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) - call_error = call.start_client_batch( - cygrpc.Operations(operations), event_handler) + call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) return _Rendezvous(state, None, None, deadline) @@ -584,16 +573,13 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): call.set_credentials(credentials._credentials) with state.condition: call.start_client_batch( - cygrpc.Operations( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)), + (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), None) operations = ( - cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), + cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) - call_error = call.start_client_batch( - cygrpc.Operations(operations), None) + call_error = call.start_client_batch(operations, None) _check_call_error(call_error, metadata) _consume_request_iterator(request_iterator, state, call, self._request_serializer) @@ -638,16 +624,13 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch( - cygrpc.Operations( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)), + (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), + cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) - call_error = call.start_client_batch( - cygrpc.Operations(operations), event_handler) + call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) return _Rendezvous(state, None, None, deadline) @@ -681,15 +664,12 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch( - cygrpc.Operations( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)), + (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), + cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) - call_error = call.start_client_batch( - cygrpc.Operations(operations), event_handler) + call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) return _Rendezvous(state, None, None, deadline) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index 740d4639db..130fc42630 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -22,8 +22,6 @@ import six import grpc from grpc._cython import cygrpc -EMPTY_METADATA = cygrpc.Metadata(()) - CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { cygrpc.ConnectivityState.idle: grpc.ChannelConnectivity.IDLE, @@ -91,21 +89,6 @@ def channel_args(options): return cygrpc.ChannelArgs(cygrpc_args) -def to_cygrpc_metadata(application_metadata): - return EMPTY_METADATA if application_metadata is None else cygrpc.Metadata( - cygrpc.Metadatum(encode(key), encode(value)) - for key, value in application_metadata) - - -def to_application_metadata(cygrpc_metadata): - if cygrpc_metadata is None: - return () - else: - return tuple((decode(key), value - if key[-4:] == b'-bin' else decode(value)) - for key, value in cygrpc_metadata) - - def _transform(message, transformer, exception_message): if transformer is None: return message diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 6b3a276097..6361669757 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -26,20 +26,16 @@ cdef class Call: def _start_batch(self, operations, tag, retain_self): if not self.is_valid: raise ValueError("invalid call object cannot be used from Python") - cdef grpc_call_error result - cdef Operations cy_operations = Operations(operations) - cdef OperationTag operation_tag = OperationTag(tag) + cdef OperationTag operation_tag = OperationTag(tag, operations) if retain_self: operation_tag.operation_call = self else: operation_tag.operation_call = None - operation_tag.batch_operations = cy_operations + operation_tag.store_ops() cpython.Py_INCREF(operation_tag) - with nogil: - result = grpc_call_start_batch( - self.c_call, cy_operations.c_ops, cy_operations.c_nops, + return grpc_call_start_batch( + self.c_call, operation_tag.c_ops, operation_tag.c_nops, <cpython.PyObject *>operation_tag, NULL) - return result def start_client_batch(self, operations, tag): # We don't reference this call in the operations tag because diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 4c397f8f64..644df674cc 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -76,7 +76,7 @@ cdef class Channel: def watch_connectivity_state( self, grpc_connectivity_state last_observed_state, Timespec deadline not None, CompletionQueue queue not None, tag): - cdef OperationTag operation_tag = OperationTag(tag) + cdef OperationTag operation_tag = OperationTag(tag, None) cpython.Py_INCREF(operation_tag) with nogil: grpc_channel_watch_connectivity_state( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 237f430799..140fc357b9 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -42,7 +42,7 @@ cdef class CompletionQueue: cdef Call operation_call = None cdef CallDetails request_call_details = None cdef object request_metadata = None - cdef Operations batch_operations = None + cdef object batch_operations = None if event.type == GRPC_QUEUE_TIMEOUT: return Event( event.type, False, None, None, None, None, False, None) @@ -61,9 +61,10 @@ cdef class CompletionQueue: user_tag = tag.user_tag operation_call = tag.operation_call request_call_details = tag.request_call_details - if tag.request_metadata is not None: - request_metadata = tuple(tag.request_metadata) - batch_operations = tag.batch_operations + if tag.is_new_request: + request_metadata = _metadata(&tag._c_request_metadata) + grpc_metadata_array_destroy(&tag._c_request_metadata) + batch_operations = tag.release_ops() if tag.is_new_request: # Stuff in the tag not explicitly handled by us needs to live through # the life of the call diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 246a271893..500086f6cb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -30,9 +30,13 @@ cdef int _get_metadata( grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], size_t *num_creds_md, grpc_status_code *status, const char **error_details) with gil: - def callback(Metadata metadata, grpc_status_code status, bytes error_details): + cdef size_t metadata_count + cdef grpc_metadata *c_metadata + def callback(metadata, grpc_status_code status, bytes error_details): if status is StatusCode.ok: - cb(user_data, metadata.c_metadata, metadata.c_count, status, NULL) + _store_c_metadata(metadata, &c_metadata, &metadata_count) + cb(user_data, c_metadata, metadata_count, status, NULL) + _release_c_metadata(c_metadata, metadata_count) else: cb(user_data, NULL, 0, status, error_details) args = context.service_url, context.method_name, callback, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 660263fc09..6a72bbf693 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -36,13 +36,6 @@ cdef extern from "grpc/byte_buffer_reader.h": pass -cdef extern from "grpc/impl/codegen/exec_ctx_fwd.h": - - struct grpc_exec_ctx: - # We don't care about the internals - pass - - cdef extern from "grpc/grpc.h": ctypedef struct grpc_slice: @@ -169,7 +162,7 @@ cdef extern from "grpc/grpc.h": ctypedef struct grpc_arg_pointer_vtable: void *(*copy)(void *) - void (*destroy)(grpc_exec_ctx *, void *) + void (*destroy)(void *) int (*cmp)(void *, void *) ctypedef struct grpc_arg_value_pointer: @@ -524,7 +517,7 @@ cdef extern from "grpc/grpc_security.h": grpc_auth_property_iterator grpc_auth_context_property_iterator( const grpc_auth_context *ctx) - + grpc_auth_property_iterator grpc_auth_context_peer_identity( const grpc_auth_context *ctx) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi index c8f11f8e19..e3cad9acb3 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + # This function will ascii encode unicode string inputs if neccesary. # In Python3, unicode strings are the default str type. @@ -22,3 +24,25 @@ cdef bytes str_to_bytes(object s): return s.encode('ascii') else: raise TypeError('Expected bytes, str, or unicode, not {}'.format(type(s))) + + +cdef bytes _encode(str native_string_or_none): + if native_string_or_none is None: + return b'' + elif isinstance(native_string_or_none, (bytes,)): + return <bytes>native_string_or_none + elif isinstance(native_string_or_none, (unicode,)): + return native_string_or_none.encode('ascii') + else: + raise TypeError('Expected str, not {}'.format(type(native_string_or_none))) + + +cdef str _decode(bytes bytestring): + if isinstance(bytestring, (str,)): + return <str>bytestring + else: + try: + return bytestring.decode('utf8') + except UnicodeDecodeError: + logging.exception('Invalid encoding on %s', bytestring) + return bytestring.decode('latin1') diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi new file mode 100644 index 0000000000..a18c365807 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi @@ -0,0 +1,26 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef void _store_c_metadata( + metadata, grpc_metadata **c_metadata, size_t *c_count) + + +cdef void _release_c_metadata(grpc_metadata *c_metadata, int count) + + +cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice) + + +cdef tuple _metadata(grpc_metadata_array *c_metadata_array) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi new file mode 100644 index 0000000000..c39fef08fa --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi @@ -0,0 +1,62 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections + + +_Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',)) + + +cdef void _store_c_metadata( + metadata, grpc_metadata **c_metadata, size_t *c_count): + if metadata is None: + c_count[0] = 0 + c_metadata[0] = NULL + else: + metadatum_count = len(metadata) + if metadatum_count == 0: + c_count[0] = 0 + c_metadata[0] = NULL + else: + c_count[0] = metadatum_count + c_metadata[0] = <grpc_metadata *>gpr_malloc( + metadatum_count * sizeof(grpc_metadata)) + for index, (key, value) in enumerate(metadata): + encoded_key = _encode(key) + encoded_value = value if encoded_key[-4:] == b'-bin' else _encode(value) + c_metadata[0][index].key = _slice_from_bytes(encoded_key) + c_metadata[0][index].value = _slice_from_bytes(encoded_value) + + +cdef void _release_c_metadata(grpc_metadata *c_metadata, int count): + if 0 < count: + for index in range(count): + grpc_slice_unref(c_metadata[index].key) + grpc_slice_unref(c_metadata[index].value) + gpr_free(c_metadata) + + +cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice): + cdef bytes key = _slice_bytes(key_slice) + cdef bytes value = _slice_bytes(value_slice) + return <tuple>_Metadatum( + _decode(key), value if key[-4:] == b'-bin' else _decode(value)) + + +cdef tuple _metadata(grpc_metadata_array *c_metadata_array): + return tuple( + _metadatum( + c_metadata_array.metadata[index].key, + c_metadata_array.metadata[index].value) + for index in range(c_metadata_array.count)) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 9c40ebf0c2..594fdb1a8b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -37,10 +37,15 @@ cdef class OperationTag: cdef Server shutting_down_server cdef Call operation_call cdef CallDetails request_call_details - cdef MetadataArray request_metadata - cdef Operations batch_operations + cdef grpc_metadata_array _c_request_metadata + cdef grpc_op *c_ops + cdef size_t c_nops + cdef readonly object _operations cdef bint is_new_request + cdef void store_ops(self) + cdef object release_ops(self) + cdef class Event: @@ -57,7 +62,7 @@ cdef class Event: cdef readonly Call operation_call # For Call.start_batch - cdef readonly Operations batch_operations + cdef readonly object batch_operations cdef class ByteBuffer: @@ -84,28 +89,15 @@ cdef class ChannelArgs: cdef list args -cdef class Metadatum: - - cdef grpc_metadata c_metadata - cdef void _copy_metadatum(self, grpc_metadata *destination) nogil - - -cdef class Metadata: - - cdef grpc_metadata *c_metadata - cdef readonly size_t c_count - - -cdef class MetadataArray: - - cdef grpc_metadata_array c_metadata_array - - cdef class Operation: cdef grpc_op c_op + cdef bint _c_metadata_needs_release + cdef size_t _c_metadata_count + cdef grpc_metadata *_c_metadata cdef ByteBuffer _received_message - cdef MetadataArray _received_metadata + cdef bint _c_metadata_array_needs_destruction + cdef grpc_metadata_array _c_metadata_array cdef grpc_status_code _received_status_code cdef grpc_slice _status_details cdef int _received_cancelled @@ -113,13 +105,6 @@ cdef class Operation: cdef object references -cdef class Operations: - - cdef grpc_op *c_ops - cdef size_t c_nops - cdef list operations - - cdef class CompressionOptions: cdef grpc_compression_options c_options diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 4f87261e17..26eaf50eb4 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -220,9 +220,26 @@ cdef class CallDetails: cdef class OperationTag: - def __cinit__(self, user_tag): + def __cinit__(self, user_tag, operations): self.user_tag = user_tag self.references = [] + self._operations = operations + + cdef void store_ops(self): + self.c_nops = 0 if self._operations is None else len(self._operations) + if 0 < self.c_nops: + self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops) + for index in range(self.c_nops): + self.c_ops[index] = (<Operation>(self._operations[index])).c_op + + cdef object release_ops(self): + if 0 < self.c_nops: + for index, operation in enumerate(self._operations): + (<Operation>operation).c_op = self.c_ops[index] + gpr_free(self.c_ops) + return self._operations + else: + return () cdef class Event: @@ -232,7 +249,7 @@ cdef class Event: CallDetails request_call_details, object request_metadata, bint is_new_request, - Operations batch_operations): + object batch_operations): self.type = type self.success = success self.tag = tag @@ -320,7 +337,7 @@ cdef void* copy_ptr(void* ptr): return ptr -cdef void destroy_ptr(grpc_exec_ctx* ctx, void* ptr): +cdef void destroy_ptr(void* ptr): pass @@ -348,7 +365,7 @@ cdef class ChannelArg: elif hasattr(value, '__int__'): # Pointer objects must override __int__() to return # the underlying C address (Python ints are word size). The - # lifecycle of the pointer is fixed to the lifecycle of the + # lifecycle of the pointer is fixed to the lifecycle of the # python object wrapping it. self.ptr_vtable.copy = ©_ptr self.ptr_vtable.destroy = &destroy_ptr @@ -390,140 +407,13 @@ cdef class ChannelArgs: return self.args[i] -cdef class Metadatum: - - def __cinit__(self, bytes key, bytes value): - self.c_metadata.key = _slice_from_bytes(key) - self.c_metadata.value = _slice_from_bytes(value) - - cdef void _copy_metadatum(self, grpc_metadata *destination) nogil: - destination[0].key = _copy_slice(self.c_metadata.key) - destination[0].value = _copy_slice(self.c_metadata.value) - - @property - def key(self): - return _slice_bytes(self.c_metadata.key) - - @property - def value(self): - return _slice_bytes(self.c_metadata.value) - - def __len__(self): - return 2 - - def __getitem__(self, size_t i): - if i == 0: - return self.key - elif i == 1: - return self.value - else: - raise IndexError("index must be 0 (key) or 1 (value)") - - def __iter__(self): - return iter((self.key, self.value)) - - def __dealloc__(self): - grpc_slice_unref(self.c_metadata.key) - grpc_slice_unref(self.c_metadata.value) - -cdef class _MetadataIterator: - - cdef size_t i - cdef size_t _length - cdef object _metadatum_indexable - - def __cinit__(self, length, metadatum_indexable): - self._length = length - self._metadatum_indexable = metadatum_indexable - self.i = 0 - - def __iter__(self): - return self - - def __next__(self): - if self.i < self._length: - result = self._metadatum_indexable[self.i] - self.i = self.i + 1 - return result - else: - raise StopIteration() - - -# TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this; just use an -# ordinary sequence of pairs of bytestrings all the way down to the -# grpc_call_start_batch call. -cdef class Metadata: - """Metadata being passed from application to core.""" - - def __cinit__(self, metadata_iterable): - metadata_sequence = tuple(metadata_iterable) - cdef size_t count = len(metadata_sequence) - with nogil: - grpc_init() - self.c_metadata = <grpc_metadata *>gpr_malloc( - count * sizeof(grpc_metadata)) - self.c_count = count - for index, metadatum in enumerate(metadata_sequence): - self.c_metadata[index].key = grpc_slice_copy( - (<Metadatum>metadatum).c_metadata.key) - self.c_metadata[index].value = grpc_slice_copy( - (<Metadatum>metadatum).c_metadata.value) - - def __dealloc__(self): - with nogil: - for index in range(self.c_count): - grpc_slice_unref(self.c_metadata[index].key) - grpc_slice_unref(self.c_metadata[index].value) - gpr_free(self.c_metadata) - grpc_shutdown() - - def __len__(self): - return self.c_count - - def __getitem__(self, size_t index): - if index < self.c_count: - key = _slice_bytes(self.c_metadata[index].key) - value = _slice_bytes(self.c_metadata[index].value) - return Metadatum(key, value) - else: - raise IndexError() - - def __iter__(self): - return _MetadataIterator(self.c_count, self) - - -cdef class MetadataArray: - """Metadata being passed from core to application.""" - - def __cinit__(self): - with nogil: - grpc_init() - grpc_metadata_array_init(&self.c_metadata_array) - - def __dealloc__(self): - with nogil: - grpc_metadata_array_destroy(&self.c_metadata_array) - grpc_shutdown() - - def __len__(self): - return self.c_metadata_array.count - - def __getitem__(self, size_t i): - if i >= self.c_metadata_array.count: - raise IndexError() - key = _slice_bytes(self.c_metadata_array.metadata[i].key) - value = _slice_bytes(self.c_metadata_array.metadata[i].value) - return Metadatum(key=key, value=value) - - def __iter__(self): - return _MetadataIterator(self.c_metadata_array.count, self) - - cdef class Operation: def __cinit__(self): grpc_init() self.references = [] + self._c_metadata_needs_release = False + self._c_metadata_array_needs_destruction = False self._status_details = grpc_empty_slice() self.is_valid = False @@ -556,13 +446,7 @@ cdef class Operation: if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT): raise TypeError("self must be an operation receiving metadata") - # TODO(https://github.com/grpc/grpc/issues/7950): Drop the "all Cython - # objects must be legitimate for use from Python at any time" policy in - # place today, shift the policy toward "Operation objects are only usable - # while their calls are active", and move this making-a-copy-because-this- - # data-needs-to-live-much-longer-than-the-call-from-which-it-arose to the - # lowest Python layer. - return tuple(self._received_metadata) + return _metadata(&self._c_metadata_array) @property def received_status_code(self): @@ -602,16 +486,21 @@ cdef class Operation: return False if self._received_cancelled == 0 else True def __dealloc__(self): + if self._c_metadata_needs_release: + _release_c_metadata(self._c_metadata, self._c_metadata_count) + if self._c_metadata_array_needs_destruction: + grpc_metadata_array_destroy(&self._c_metadata_array) grpc_slice_unref(self._status_details) grpc_shutdown() -def operation_send_initial_metadata(Metadata metadata, int flags): +def operation_send_initial_metadata(metadata, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA op.c_op.flags = flags - op.c_op.data.send_initial_metadata.count = metadata.c_count - op.c_op.data.send_initial_metadata.metadata = metadata.c_metadata - op.references.append(metadata) + _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) + op._c_metadata_needs_release = True + op.c_op.data.send_initial_metadata.count = op._c_metadata_count + op.c_op.data.send_initial_metadata.metadata = op._c_metadata op.is_valid = True return op @@ -633,18 +522,19 @@ def operation_send_close_from_client(int flags): return op def operation_send_status_from_server( - Metadata metadata, grpc_status_code code, bytes details, int flags): + metadata, grpc_status_code code, bytes details, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER op.c_op.flags = flags + _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) + op._c_metadata_needs_release = True op.c_op.data.send_status_from_server.trailing_metadata_count = ( - metadata.c_count) - op.c_op.data.send_status_from_server.trailing_metadata = metadata.c_metadata + op._c_metadata_count) + op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata op.c_op.data.send_status_from_server.status = code grpc_slice_unref(op._status_details) op._status_details = _slice_from_bytes(details) op.c_op.data.send_status_from_server.status_details = &op._status_details - op.references.append(metadata) op.is_valid = True return op @@ -652,9 +542,10 @@ def operation_receive_initial_metadata(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA op.c_op.flags = flags - op._received_metadata = MetadataArray() + grpc_metadata_array_init(&op._c_metadata_array) op.c_op.data.receive_initial_metadata.receive_initial_metadata = ( - &op._received_metadata.c_metadata_array) + &op._c_metadata_array) + op._c_metadata_array_needs_destruction = True op.is_valid = True return op @@ -675,9 +566,10 @@ def operation_receive_status_on_client(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT op.c_op.flags = flags - op._received_metadata = MetadataArray() + grpc_metadata_array_init(&op._c_metadata_array) op.c_op.data.receive_status_on_client.trailing_metadata = ( - &op._received_metadata.c_metadata_array) + &op._c_metadata_array) + op._c_metadata_array_needs_destruction = True op.c_op.data.receive_status_on_client.status = ( &op._received_status_code) op.c_op.data.receive_status_on_client.status_details = ( @@ -694,59 +586,6 @@ def operation_receive_close_on_server(int flags): return op -cdef class _OperationsIterator: - - cdef size_t i - cdef Operations operations - - def __cinit__(self, Operations operations not None): - self.i = 0 - self.operations = operations - - def __iter__(self): - return self - - def __next__(self): - if self.i < len(self.operations): - result = self.operations[self.i] - self.i = self.i + 1 - return result - else: - raise StopIteration() - - -cdef class Operations: - - def __cinit__(self, operations): - grpc_init() - self.operations = list(operations) # normalize iterable - self.c_ops = NULL - self.c_nops = 0 - for operation in self.operations: - if not isinstance(operation, Operation): - raise TypeError("expected operations to be iterable of Operation") - self.c_nops = len(self.operations) - with nogil: - self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops) - for i in range(self.c_nops): - self.c_ops[i] = (<Operation>(self.operations[i])).c_op - - def __len__(self): - return self.c_nops - - def __getitem__(self, size_t i): - # self.operations is never stale; it's only updated from this file - return self.operations[i] - - def __dealloc__(self): - with nogil: - gpr_free(self.c_ops) - grpc_shutdown() - - def __iter__(self): - return _OperationsIterator(self) - - cdef class CompressionOptions: def __cinit__(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 5f3405936c..f8d7892858 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -78,23 +78,19 @@ cdef class Server: raise ValueError("server must be started and not shutting down") if server_queue not in self.registered_completion_queues: raise ValueError("server_queue must be a registered completion queue") - cdef grpc_call_error result - cdef OperationTag operation_tag = OperationTag(tag) + cdef OperationTag operation_tag = OperationTag(tag, None) operation_tag.operation_call = Call() operation_tag.request_call_details = CallDetails() - operation_tag.request_metadata = MetadataArray() + grpc_metadata_array_init(&operation_tag._c_request_metadata) operation_tag.references.extend([self, call_queue, server_queue]) operation_tag.is_new_request = True - operation_tag.batch_operations = Operations([]) cpython.Py_INCREF(operation_tag) - with nogil: - result = grpc_server_request_call( - self.c_server, &operation_tag.operation_call.c_call, - &operation_tag.request_call_details.c_details, - &operation_tag.request_metadata.c_metadata_array, - call_queue.c_completion_queue, server_queue.c_completion_queue, - <cpython.PyObject *>operation_tag) - return result + return grpc_server_request_call( + self.c_server, &operation_tag.operation_call.c_call, + &operation_tag.request_call_details.c_details, + &operation_tag._c_request_metadata, + call_queue.c_completion_queue, server_queue.c_completion_queue, + <cpython.PyObject *>operation_tag) def register_completion_queue( self, CompletionQueue queue not None): @@ -135,7 +131,7 @@ cdef class Server: cdef _c_shutdown(self, CompletionQueue queue, tag): self.is_shutting_down = True - operation_tag = OperationTag(tag) + operation_tag = OperationTag(tag, None) operation_tag.shutting_down_server = self cpython.Py_INCREF(operation_tag) with nogil: diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index fc6cc5fb9f..6fc5638d5d 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -18,6 +18,7 @@ include "_cygrpc/call.pxd.pxi" include "_cygrpc/channel.pxd.pxi" include "_cygrpc/credentials.pxd.pxi" include "_cygrpc/completion_queue.pxd.pxi" +include "_cygrpc/metadata.pxd.pxi" include "_cygrpc/records.pxd.pxi" include "_cygrpc/security.pxd.pxi" include "_cygrpc/server.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 57165d5f5a..d605229822 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -25,6 +25,7 @@ include "_cygrpc/call.pyx.pxi" include "_cygrpc/channel.pyx.pxi" include "_cygrpc/credentials.pyx.pxi" include "_cygrpc/completion_queue.pyx.pxi" +include "_cygrpc/metadata.pyx.pxi" include "_cygrpc/records.pyx.pxi" include "_cygrpc/security.pyx.pxi" include "_cygrpc/server.pyx.pxi" diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py new file mode 100644 index 0000000000..fffb269845 --- /dev/null +++ b/src/python/grpcio/grpc/_interceptor.py @@ -0,0 +1,318 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Implementation of gRPC Python interceptors.""" + +import collections +import sys + +import grpc + + +class _ServicePipeline(object): + + def __init__(self, interceptors): + self.interceptors = tuple(interceptors) + + def _continuation(self, thunk, index): + return lambda context: self._intercept_at(thunk, index, context) + + def _intercept_at(self, thunk, index, context): + if index < len(self.interceptors): + interceptor = self.interceptors[index] + thunk = self._continuation(thunk, index + 1) + return interceptor.intercept_service(thunk, context) + else: + return thunk(context) + + def execute(self, thunk, context): + return self._intercept_at(thunk, 0, context) + + +def service_pipeline(interceptors): + return _ServicePipeline(interceptors) if interceptors else None + + +class _ClientCallDetails( + collections.namedtuple('_ClientCallDetails', + ('method', 'timeout', 'metadata', + 'credentials')), grpc.ClientCallDetails): + pass + + +class _LocalFailure(grpc.RpcError, grpc.Future, grpc.Call): + + def __init__(self, exception, traceback): + super(_LocalFailure, self).__init__() + self._exception = exception + self._traceback = traceback + + def initial_metadata(self): + return None + + def trailing_metadata(self): + return None + + def code(self): + return grpc.StatusCode.INTERNAL + + def details(self): + return 'Exception raised while intercepting the RPC' + + def cancel(self): + return False + + def cancelled(self): + return False + + def running(self): + return False + + def done(self): + return True + + def result(self, ignored_timeout=None): + raise self._exception + + def exception(self, ignored_timeout=None): + return self._exception + + def traceback(self, ignored_timeout=None): + return self._traceback + + def add_done_callback(self, fn): + fn(self) + + def __iter__(self): + return self + + def next(self): + raise self._exception + + +class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): + + def __init__(self, thunk, method, interceptor): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + call_future = self.future( + request, + timeout=timeout, + metadata=metadata, + credentials=credentials) + return call_future.result() + + def with_call(self, request, timeout=None, metadata=None, credentials=None): + call_future = self.future( + request, + timeout=timeout, + metadata=metadata, + credentials=credentials) + return call_future.result(), call_future + + def future(self, request, timeout=None, metadata=None, credentials=None): + + def continuation(client_call_details, request): + return self._thunk(client_call_details.method).future( + request, + timeout=client_call_details.timeout, + metadata=client_call_details.metadata, + credentials=client_call_details.credentials) + + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) + try: + return self._interceptor.intercept_unary_unary( + continuation, client_call_details, request) + except Exception as exception: # pylint:disable=broad-except + return _LocalFailure(exception, sys.exc_info()[2]) + + +class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): + + def __init__(self, thunk, method, interceptor): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + + def continuation(client_call_details, request): + return self._thunk(client_call_details.method)( + request, + timeout=client_call_details.timeout, + metadata=client_call_details.metadata, + credentials=client_call_details.credentials) + + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) + try: + return self._interceptor.intercept_unary_stream( + continuation, client_call_details, request) + except Exception as exception: # pylint:disable=broad-except + return _LocalFailure(exception, sys.exc_info()[2]) + + +class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): + + def __init__(self, thunk, method, interceptor): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + call_future = self.future( + request_iterator, + timeout=timeout, + metadata=metadata, + credentials=credentials) + return call_future.result() + + def with_call(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + call_future = self.future( + request_iterator, + timeout=timeout, + metadata=metadata, + credentials=credentials) + return call_future.result(), call_future + + def future(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + + def continuation(client_call_details, request_iterator): + return self._thunk(client_call_details.method).future( + request_iterator, + timeout=client_call_details.timeout, + metadata=client_call_details.metadata, + credentials=client_call_details.credentials) + + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) + + try: + return self._interceptor.intercept_stream_unary( + continuation, client_call_details, request_iterator) + except Exception as exception: # pylint:disable=broad-except + return _LocalFailure(exception, sys.exc_info()[2]) + + +class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): + + def __init__(self, thunk, method, interceptor): + self._thunk = thunk + self._method = method + self._interceptor = interceptor + + def __call__(self, + request_iterator, + timeout=None, + metadata=None, + credentials=None): + + def continuation(client_call_details, request_iterator): + return self._thunk(client_call_details.method)( + request_iterator, + timeout=client_call_details.timeout, + metadata=client_call_details.metadata, + credentials=client_call_details.credentials) + + client_call_details = _ClientCallDetails(self._method, timeout, + metadata, credentials) + + try: + return self._interceptor.intercept_stream_stream( + continuation, client_call_details, request_iterator) + except Exception as exception: # pylint:disable=broad-except + return _LocalFailure(exception, sys.exc_info()[2]) + + +class _Channel(grpc.Channel): + + def __init__(self, channel, interceptor): + self._channel = channel + self._interceptor = interceptor + + def subscribe(self, *args, **kwargs): + self._channel.subscribe(*args, **kwargs) + + def unsubscribe(self, *args, **kwargs): + self._channel.unsubscribe(*args, **kwargs) + + def unary_unary(self, + method, + request_serializer=None, + response_deserializer=None): + thunk = lambda m: self._channel.unary_unary(m, request_serializer, response_deserializer) + if isinstance(self._interceptor, grpc.UnaryUnaryClientInterceptor): + return _UnaryUnaryMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + def unary_stream(self, + method, + request_serializer=None, + response_deserializer=None): + thunk = lambda m: self._channel.unary_stream(m, request_serializer, response_deserializer) + if isinstance(self._interceptor, grpc.UnaryStreamClientInterceptor): + return _UnaryStreamMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + def stream_unary(self, + method, + request_serializer=None, + response_deserializer=None): + thunk = lambda m: self._channel.stream_unary(m, request_serializer, response_deserializer) + if isinstance(self._interceptor, grpc.StreamUnaryClientInterceptor): + return _StreamUnaryMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + def stream_stream(self, + method, + request_serializer=None, + response_deserializer=None): + thunk = lambda m: self._channel.stream_stream(m, request_serializer, response_deserializer) + if isinstance(self._interceptor, grpc.StreamStreamClientInterceptor): + return _StreamStreamMultiCallable(thunk, method, self._interceptor) + else: + return thunk(method) + + +def intercept_channel(channel, *interceptors): + for interceptor in reversed(list(interceptors)): + if not isinstance(interceptor, grpc.UnaryUnaryClientInterceptor) and \ + not isinstance(interceptor, grpc.UnaryStreamClientInterceptor) and \ + not isinstance(interceptor, grpc.StreamUnaryClientInterceptor) and \ + not isinstance(interceptor, grpc.StreamStreamClientInterceptor): + raise TypeError('interceptor must be ' + 'grpc.UnaryUnaryClientInterceptor or ' + 'grpc.UnaryStreamClientInterceptor or ' + 'grpc.StreamUnaryClientInterceptor or ' + 'grpc.StreamStreamClientInterceptor or ') + channel = _Channel(channel, interceptor) + return channel diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py index cd17f4a049..f7287956dc 100644 --- a/src/python/grpcio/grpc/_plugin_wrapping.py +++ b/src/python/grpcio/grpc/_plugin_wrapping.py @@ -54,9 +54,7 @@ class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback): 'AuthMetadataPluginCallback raised exception "{}"!'.format( self._state.exception)) if error is None: - self._callback( - _common.to_cygrpc_metadata(metadata), cygrpc.StatusCode.ok, - None) + self._callback(metadata, cygrpc.StatusCode.ok, None) else: self._callback(None, cygrpc.StatusCode.internal, _common.encode(str(error))) diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 5b4812bffe..8857bd3eb5 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -23,6 +23,7 @@ import six import grpc from grpc import _common +from grpc import _interceptor from grpc._cython import cygrpc from grpc.framework.foundation import callable_util @@ -96,6 +97,7 @@ class _RPCState(object): self.statused = False self.rpc_errors = [] self.callbacks = [] + self.abortion = None def _raise_rpc_error(state): @@ -129,19 +131,17 @@ def _abort(state, call, code, details): effective_details = details if state.details is None else state.details if state.initial_metadata_allowed: operations = (cygrpc.operation_send_initial_metadata( - _common.EMPTY_METADATA, - _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( - _common.to_cygrpc_metadata(state.trailing_metadata), - effective_code, effective_details, _EMPTY_FLAGS),) + (), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( + state.trailing_metadata, effective_code, effective_details, + _EMPTY_FLAGS),) token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN else: operations = (cygrpc.operation_send_status_from_server( - _common.to_cygrpc_metadata(state.trailing_metadata), - effective_code, effective_details, _EMPTY_FLAGS),) + state.trailing_metadata, effective_code, effective_details, + _EMPTY_FLAGS),) token = _SEND_STATUS_FROM_SERVER_TOKEN - call.start_server_batch( - cygrpc.Operations(operations), - _send_status_from_server(state, token)) + call.start_server_batch(operations, + _send_status_from_server(state, token)) state.statused = True state.due.add(token) @@ -237,7 +237,7 @@ class _Context(grpc.ServicerContext): self._state.disable_next_compression = True def invocation_metadata(self): - return _common.to_application_metadata(self._rpc_event.request_metadata) + return self._rpc_event.request_metadata def peer(self): return _common.decode(self._rpc_event.operation_call.peer()) @@ -263,11 +263,9 @@ class _Context(grpc.ServicerContext): else: if self._state.initial_metadata_allowed: operation = cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(initial_metadata), - _EMPTY_FLAGS) + initial_metadata, _EMPTY_FLAGS) self._rpc_event.operation_call.start_server_batch( - cygrpc.Operations((operation,)), - _send_initial_metadata(self._state)) + (operation,), _send_initial_metadata(self._state)) self._state.initial_metadata_allowed = False self._state.due.add(_SEND_INITIAL_METADATA_TOKEN) else: @@ -275,8 +273,14 @@ class _Context(grpc.ServicerContext): def set_trailing_metadata(self, trailing_metadata): with self._state.condition: - self._state.trailing_metadata = _common.to_cygrpc_metadata( - trailing_metadata) + self._state.trailing_metadata = trailing_metadata + + def abort(self, code, details): + with self._state.condition: + self._state.code = code + self._state.details = _common.encode(details) + self._state.abortion = Exception() + raise self._state.abortion def set_code(self, code): with self._state.condition: @@ -301,8 +305,7 @@ class _RequestIterator(object): raise StopIteration() else: self._call.start_server_batch( - cygrpc.Operations( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),)), + (cygrpc.operation_receive_message(_EMPTY_FLAGS),), _receive_message(self._state, self._call, self._request_deserializer)) self._state.due.add(_RECEIVE_MESSAGE_TOKEN) @@ -345,8 +348,7 @@ def _unary_request(rpc_event, state, request_deserializer): return None else: rpc_event.operation_call.start_server_batch( - cygrpc.Operations( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),)), + (cygrpc.operation_receive_message(_EMPTY_FLAGS),), _receive_message(state, rpc_event.operation_call, request_deserializer)) state.due.add(_RECEIVE_MESSAGE_TOKEN) @@ -376,7 +378,10 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): return behavior(argument, context), True except Exception as exception: # pylint: disable=broad-except with state.condition: - if exception not in state.rpc_errors: + if exception is state.abortion: + _abort(state, rpc_event.operation_call, + cygrpc.StatusCode.unknown, b'RPC Aborted') + elif exception not in state.rpc_errors: details = 'Exception calling application: {}'.format(exception) logging.exception(details) _abort(state, rpc_event.operation_call, @@ -391,7 +396,10 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator): return None, True except Exception as exception: # pylint: disable=broad-except with state.condition: - if exception not in state.rpc_errors: + if exception is state.abortion: + _abort(state, rpc_event.operation_call, + cygrpc.StatusCode.unknown, b'RPC Aborted') + elif exception not in state.rpc_errors: details = 'Exception iterating responses: {}'.format(exception) logging.exception(details) _abort(state, rpc_event.operation_call, @@ -417,9 +425,8 @@ def _send_response(rpc_event, state, serialized_response): else: if state.initial_metadata_allowed: operations = (cygrpc.operation_send_initial_metadata( - _common.EMPTY_METADATA, _EMPTY_FLAGS), - cygrpc.operation_send_message(serialized_response, - _EMPTY_FLAGS),) + (), _EMPTY_FLAGS), cygrpc.operation_send_message( + serialized_response, _EMPTY_FLAGS),) state.initial_metadata_allowed = False token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN else: @@ -427,7 +434,7 @@ def _send_response(rpc_event, state, serialized_response): _EMPTY_FLAGS),) token = _SEND_MESSAGE_TOKEN rpc_event.operation_call.start_server_batch( - cygrpc.Operations(operations), _send_message(state, token)) + operations, _send_message(state, token)) state.due.add(token) while True: state.condition.wait() @@ -438,24 +445,21 @@ def _send_response(rpc_event, state, serialized_response): def _status(rpc_event, state, serialized_response): with state.condition: if state.client is not _CANCELLED: - trailing_metadata = _common.to_cygrpc_metadata( - state.trailing_metadata) code = _completion_code(state) details = _details(state) operations = [ cygrpc.operation_send_status_from_server( - trailing_metadata, code, details, _EMPTY_FLAGS), + state.trailing_metadata, code, details, _EMPTY_FLAGS), ] if state.initial_metadata_allowed: operations.append( - cygrpc.operation_send_initial_metadata( - _common.EMPTY_METADATA, _EMPTY_FLAGS)) + cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS)) if serialized_response is not None: operations.append( cygrpc.operation_send_message(serialized_response, _EMPTY_FLAGS)) rpc_event.operation_call.start_server_batch( - cygrpc.Operations(operations), + operations, _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN)) state.statused = True state.due.add(_SEND_STATUS_FROM_SERVER_TOKEN) @@ -538,24 +542,31 @@ def _handle_stream_stream(rpc_event, state, method_handler, thread_pool): method_handler.request_deserializer, method_handler.response_serializer) -def _find_method_handler(rpc_event, generic_handlers): - for generic_handler in generic_handlers: - method_handler = generic_handler.service( - _HandlerCallDetails( - _common.decode(rpc_event.request_call_details.method), - rpc_event.request_metadata)) - if method_handler is not None: - return method_handler - else: +def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline): + + def query_handlers(handler_call_details): + for generic_handler in generic_handlers: + method_handler = generic_handler.service(handler_call_details) + if method_handler is not None: + return method_handler return None + handler_call_details = _HandlerCallDetails( + _common.decode(rpc_event.request_call_details.method), + rpc_event.request_metadata) + + if interceptor_pipeline is not None: + return interceptor_pipeline.execute(query_handlers, + handler_call_details) + else: + return query_handlers(handler_call_details) + def _reject_rpc(rpc_event, status, details): - operations = (cygrpc.operation_send_initial_metadata(_common.EMPTY_METADATA, - _EMPTY_FLAGS), + operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS), cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( - _common.EMPTY_METADATA, status, details, _EMPTY_FLAGS),) + cygrpc.operation_send_status_from_server((), status, details, + _EMPTY_FLAGS),) rpc_state = _RPCState() rpc_event.operation_call.start_server_batch( operations, lambda ignored_event: (rpc_state, (),)) @@ -566,8 +577,7 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool): state = _RPCState() with state.condition: rpc_event.operation_call.start_server_batch( - cygrpc.Operations( - (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),)), + (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),), _receive_close_on_server(state)) state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN) if method_handler.request_streaming: @@ -586,13 +596,14 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool): method_handler, thread_pool) -def _handle_call(rpc_event, generic_handlers, thread_pool, +def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool, concurrency_exceeded): if not rpc_event.success: return None, None if rpc_event.request_call_details.method is not None: try: - method_handler = _find_method_handler(rpc_event, generic_handlers) + method_handler = _find_method_handler(rpc_event, generic_handlers, + interceptor_pipeline) except Exception as exception: # pylint: disable=broad-except details = 'Exception servicing handler: {}'.format(exception) logging.exception(details) @@ -620,12 +631,14 @@ class _ServerStage(enum.Enum): class _ServerState(object): - def __init__(self, completion_queue, server, generic_handlers, thread_pool, - maximum_concurrent_rpcs): + # pylint: disable=too-many-arguments + def __init__(self, completion_queue, server, generic_handlers, + interceptor_pipeline, thread_pool, maximum_concurrent_rpcs): self.lock = threading.Lock() self.completion_queue = completion_queue self.server = server self.generic_handlers = list(generic_handlers) + self.interceptor_pipeline = interceptor_pipeline self.thread_pool = thread_pool self.stage = _ServerStage.STOPPED self.shutdown_events = None @@ -690,8 +703,8 @@ def _serve(state): state.maximum_concurrent_rpcs is not None and state.active_rpc_count >= state.maximum_concurrent_rpcs) rpc_state, rpc_future = _handle_call( - event, state.generic_handlers, state.thread_pool, - concurrency_exceeded) + event, state.generic_handlers, state.interceptor_pipeline, + state.thread_pool, concurrency_exceeded) if rpc_state is not None: state.rpc_states.add(rpc_state) if rpc_future is not None: @@ -779,12 +792,14 @@ def _start(state): class Server(grpc.Server): - def __init__(self, thread_pool, generic_handlers, options, + # pylint: disable=too-many-arguments + def __init__(self, thread_pool, generic_handlers, interceptors, options, maximum_concurrent_rpcs): completion_queue = cygrpc.CompletionQueue() server = cygrpc.Server(_common.channel_args(options)) server.register_completion_queue(completion_queue) self._state = _ServerState(completion_queue, server, generic_handlers, + _interceptor.service_pipeline(interceptors), thread_pool, maximum_concurrent_rpcs) def add_generic_rpc_handlers(self, generic_rpc_handlers): diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py index 73ce22fa98..dcaa0eeaa2 100644 --- a/src/python/grpcio/grpc/beta/_client_adaptations.py +++ b/src/python/grpcio/grpc/beta/_client_adaptations.py @@ -15,6 +15,7 @@ import grpc from grpc import _common +from grpc.beta import _metadata from grpc.beta import interfaces from grpc.framework.common import cardinality from grpc.framework.foundation import future @@ -157,10 +158,10 @@ class _Rendezvous(future.Future, face.Call): return _InvocationProtocolContext() def initial_metadata(self): - return self._call.initial_metadata() + return _metadata.beta(self._call.initial_metadata()) def terminal_metadata(self): - return self._call.terminal_metadata() + return _metadata.beta(self._call.terminal_metadata()) def code(self): return self._call.code() @@ -182,14 +183,14 @@ def _blocking_unary_unary(channel, group, method, timeout, with_call, response, call = multi_callable.with_call( request, timeout=timeout, - metadata=effective_metadata, + metadata=_metadata.unbeta(effective_metadata), credentials=_credentials(protocol_options)) return response, _Rendezvous(None, None, call) else: return multi_callable( request, timeout=timeout, - metadata=effective_metadata, + metadata=_metadata.unbeta(effective_metadata), credentials=_credentials(protocol_options)) except grpc.RpcError as rpc_error_call: raise _abortion_error(rpc_error_call) @@ -206,7 +207,7 @@ def _future_unary_unary(channel, group, method, timeout, protocol_options, response_future = multi_callable.future( request, timeout=timeout, - metadata=effective_metadata, + metadata=_metadata.unbeta(effective_metadata), credentials=_credentials(protocol_options)) return _Rendezvous(response_future, None, response_future) @@ -222,7 +223,7 @@ def _unary_stream(channel, group, method, timeout, protocol_options, metadata, response_iterator = multi_callable( request, timeout=timeout, - metadata=effective_metadata, + metadata=_metadata.unbeta(effective_metadata), credentials=_credentials(protocol_options)) return _Rendezvous(None, response_iterator, response_iterator) @@ -241,14 +242,14 @@ def _blocking_stream_unary(channel, group, method, timeout, with_call, response, call = multi_callable.with_call( request_iterator, timeout=timeout, - metadata=effective_metadata, + metadata=_metadata.unbeta(effective_metadata), credentials=_credentials(protocol_options)) return response, _Rendezvous(None, None, call) else: return multi_callable( request_iterator, timeout=timeout, - metadata=effective_metadata, + metadata=_metadata.unbeta(effective_metadata), credentials=_credentials(protocol_options)) except grpc.RpcError as rpc_error_call: raise _abortion_error(rpc_error_call) @@ -265,7 +266,7 @@ def _future_stream_unary(channel, group, method, timeout, protocol_options, response_future = multi_callable.future( request_iterator, timeout=timeout, - metadata=effective_metadata, + metadata=_metadata.unbeta(effective_metadata), credentials=_credentials(protocol_options)) return _Rendezvous(response_future, None, response_future) @@ -281,7 +282,7 @@ def _stream_stream(channel, group, method, timeout, protocol_options, metadata, response_iterator = multi_callable( request_iterator, timeout=timeout, - metadata=effective_metadata, + metadata=_metadata.unbeta(effective_metadata), credentials=_credentials(protocol_options)) return _Rendezvous(None, response_iterator, response_iterator) diff --git a/src/python/grpcio/grpc/beta/_metadata.py b/src/python/grpcio/grpc/beta/_metadata.py new file mode 100644 index 0000000000..e135f4dff4 --- /dev/null +++ b/src/python/grpcio/grpc/beta/_metadata.py @@ -0,0 +1,49 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""API metadata conversion utilities.""" + +import collections + +_Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',)) + + +def _beta_metadatum(key, value): + beta_key = key if isinstance(key, (bytes,)) else key.encode('ascii') + beta_value = value if isinstance(value, (bytes,)) else value.encode('ascii') + return _Metadatum(beta_key, beta_value) + + +def _metadatum(beta_key, beta_value): + key = beta_key if isinstance(beta_key, (str,)) else beta_key.decode('utf8') + if isinstance(beta_value, (str,)) or key[-4:] == '-bin': + value = beta_value + else: + value = beta_value.decode('utf8') + return _Metadatum(key, value) + + +def beta(metadata): + if metadata is None: + return () + else: + return tuple(_beta_metadatum(key, value) for key, value in metadata) + + +def unbeta(beta_metadata): + if beta_metadata is None: + return () + else: + return tuple( + _metadatum(beta_key, beta_value) + for beta_key, beta_value in beta_metadata) diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index ec363e9bc9..1c22dbe3bb 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -18,6 +18,7 @@ import threading import grpc from grpc import _common +from grpc.beta import _metadata from grpc.beta import interfaces from grpc.framework.common import cardinality from grpc.framework.common import style @@ -65,14 +66,15 @@ class _FaceServicerContext(face.ServicerContext): return _ServerProtocolContext(self._servicer_context) def invocation_metadata(self): - return _common.to_cygrpc_metadata( - self._servicer_context.invocation_metadata()) + return _metadata.beta(self._servicer_context.invocation_metadata()) def initial_metadata(self, initial_metadata): - self._servicer_context.send_initial_metadata(initial_metadata) + self._servicer_context.send_initial_metadata( + _metadata.unbeta(initial_metadata)) def terminal_metadata(self, terminal_metadata): - self._servicer_context.set_terminal_metadata(terminal_metadata) + self._servicer_context.set_terminal_metadata( + _metadata.unbeta(terminal_metadata)) def code(self, code): self._servicer_context.set_code(code) diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py index e52ce764b5..312daf033e 100644 --- a/src/python/grpcio/grpc/beta/implementations.py +++ b/src/python/grpcio/grpc/beta/implementations.py @@ -21,6 +21,7 @@ import threading # pylint: disable=unused-import import grpc from grpc import _auth from grpc.beta import _client_adaptations +from grpc.beta import _metadata from grpc.beta import _server_adaptations from grpc.beta import interfaces # pylint: disable=unused-import from grpc.framework.common import cardinality # pylint: disable=unused-import @@ -31,7 +32,18 @@ from grpc.framework.interfaces.face import face # pylint: disable=unused-import ChannelCredentials = grpc.ChannelCredentials ssl_channel_credentials = grpc.ssl_channel_credentials CallCredentials = grpc.CallCredentials -metadata_call_credentials = grpc.metadata_call_credentials + + +def metadata_call_credentials(metadata_plugin, name=None): + + def plugin(context, callback): + + def wrapped_callback(beta_metadata, error): + callback(_metadata.unbeta(beta_metadata), error) + + metadata_plugin(context, wrapped_callback) + + return grpc.metadata_call_credentials(plugin, name=name) def google_call_credentials(credentials): |