From 80516e884a8cd03daaa1f4a40d2bb284d5cc1be6 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Fri, 8 Dec 2017 03:04:15 +0000 Subject: Streamline metadata in gRPC Python --- src/python/grpcio/grpc/_channel.py | 27 ++-- src/python/grpcio/grpc/_common.py | 17 -- .../grpc/_cython/_cygrpc/completion_queue.pyx.pxi | 5 +- .../grpc/_cython/_cygrpc/credentials.pyx.pxi | 8 +- .../grpc/_cython/_cygrpc/grpc_string.pyx.pxi | 24 +++ .../grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi | 26 ++++ .../grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi | 62 ++++++++ .../grpcio/grpc/_cython/_cygrpc/records.pxd.pxi | 25 +-- .../grpcio/grpc/_cython/_cygrpc/records.pyx.pxi | 171 +++------------------ .../grpcio/grpc/_cython/_cygrpc/server.pyx.pxi | 17 +- src/python/grpcio/grpc/_cython/cygrpc.pxd | 1 + src/python/grpcio/grpc/_cython/cygrpc.pyx | 1 + src/python/grpcio/grpc/_plugin_wrapping.py | 4 +- src/python/grpcio/grpc/_server.py | 37 ++--- 14 files changed, 185 insertions(+), 240 deletions(-) create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi (limited to 'src/python/grpcio') diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index cf4ce0941b..dad32dea90 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -122,8 +122,8 @@ def _abort(state, code, details): state.code = code state.details = details if state.initial_metadata is None: - state.initial_metadata = _common.EMPTY_METADATA - state.trailing_metadata = _common.EMPTY_METADATA + state.initial_metadata = () + state.trailing_metadata = () def _handle_event(event, state, response_deserializer): @@ -372,14 +372,13 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): with self._state.condition: while self._state.initial_metadata is None: self._state.condition.wait() - return _common.to_application_metadata(self._state.initial_metadata) + return self._state.initial_metadata def trailing_metadata(self): with self._state.condition: while self._state.trailing_metadata is None: self._state.condition.wait() - return _common.to_application_metadata( - self._state.trailing_metadata) + return self._state.trailing_metadata def code(self): with self._state.condition: @@ -420,8 +419,7 @@ def _start_unary_request(request, timeout, request_serializer): deadline, deadline_timespec = _deadline(timeout) serialized_request = _common.serialize(request, request_serializer) if serialized_request is None: - state = _RPCState((), _common.EMPTY_METADATA, _common.EMPTY_METADATA, - grpc.StatusCode.INTERNAL, + state = _RPCState((), (), (), grpc.StatusCode.INTERNAL, 'Exception serializing request!') rendezvous = _Rendezvous(state, None, None, deadline) return deadline, deadline_timespec, None, rendezvous @@ -458,8 +456,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): else: state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( - cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), + cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS), cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), @@ -549,8 +546,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): )), event_handler) operations = ( cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(metadata), - _EMPTY_FLAGS), cygrpc.operation_send_message( + metadata, _EMPTY_FLAGS), cygrpc.operation_send_message( serialized_request, _EMPTY_FLAGS), cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) @@ -588,8 +584,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)), None) operations = ( - cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), + cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) call_error = call.start_client_batch( @@ -642,8 +637,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)), event_handler) operations = ( - cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), + cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) call_error = call.start_client_batch( @@ -685,8 +679,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),)), event_handler) operations = ( - cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(metadata), _EMPTY_FLAGS), + cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) call_error = call.start_client_batch( cygrpc.Operations(operations), event_handler) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index 740d4639db..130fc42630 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -22,8 +22,6 @@ import six import grpc from grpc._cython import cygrpc -EMPTY_METADATA = cygrpc.Metadata(()) - CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = { cygrpc.ConnectivityState.idle: grpc.ChannelConnectivity.IDLE, @@ -91,21 +89,6 @@ def channel_args(options): return cygrpc.ChannelArgs(cygrpc_args) -def to_cygrpc_metadata(application_metadata): - return EMPTY_METADATA if application_metadata is None else cygrpc.Metadata( - cygrpc.Metadatum(encode(key), encode(value)) - for key, value in application_metadata) - - -def to_application_metadata(cygrpc_metadata): - if cygrpc_metadata is None: - return () - else: - return tuple((decode(key), value - if key[-4:] == b'-bin' else decode(value)) - for key, value in cygrpc_metadata) - - def _transform(message, transformer, exception_message): if transformer is None: return message diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 237f430799..af31a27fce 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -61,8 +61,9 @@ cdef class CompletionQueue: user_tag = tag.user_tag operation_call = tag.operation_call request_call_details = tag.request_call_details - if tag.request_metadata is not None: - request_metadata = tuple(tag.request_metadata) + if tag.is_new_request: + request_metadata = _metadata(&tag._c_request_metadata) + grpc_metadata_array_destroy(&tag._c_request_metadata) batch_operations = tag.batch_operations if tag.is_new_request: # Stuff in the tag not explicitly handled by us needs to live through diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 246a271893..500086f6cb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -30,9 +30,13 @@ cdef int _get_metadata( grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX], size_t *num_creds_md, grpc_status_code *status, const char **error_details) with gil: - def callback(Metadata metadata, grpc_status_code status, bytes error_details): + cdef size_t metadata_count + cdef grpc_metadata *c_metadata + def callback(metadata, grpc_status_code status, bytes error_details): if status is StatusCode.ok: - cb(user_data, metadata.c_metadata, metadata.c_count, status, NULL) + _store_c_metadata(metadata, &c_metadata, &metadata_count) + cb(user_data, c_metadata, metadata_count, status, NULL) + _release_c_metadata(c_metadata, metadata_count) else: cb(user_data, NULL, 0, status, error_details) args = context.service_url, context.method_name, callback, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi index c8f11f8e19..e3cad9acb3 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + # This function will ascii encode unicode string inputs if neccesary. # In Python3, unicode strings are the default str type. @@ -22,3 +24,25 @@ cdef bytes str_to_bytes(object s): return s.encode('ascii') else: raise TypeError('Expected bytes, str, or unicode, not {}'.format(type(s))) + + +cdef bytes _encode(str native_string_or_none): + if native_string_or_none is None: + return b'' + elif isinstance(native_string_or_none, (bytes,)): + return native_string_or_none + elif isinstance(native_string_or_none, (unicode,)): + return native_string_or_none.encode('ascii') + else: + raise TypeError('Expected str, not {}'.format(type(native_string_or_none))) + + +cdef str _decode(bytes bytestring): + if isinstance(bytestring, (str,)): + return bytestring + else: + try: + return bytestring.decode('utf8') + except UnicodeDecodeError: + logging.exception('Invalid encoding on %s', bytestring) + return bytestring.decode('latin1') diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi new file mode 100644 index 0000000000..a18c365807 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi @@ -0,0 +1,26 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef void _store_c_metadata( + metadata, grpc_metadata **c_metadata, size_t *c_count) + + +cdef void _release_c_metadata(grpc_metadata *c_metadata, int count) + + +cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice) + + +cdef tuple _metadata(grpc_metadata_array *c_metadata_array) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi new file mode 100644 index 0000000000..c39fef08fa --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi @@ -0,0 +1,62 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections + + +_Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',)) + + +cdef void _store_c_metadata( + metadata, grpc_metadata **c_metadata, size_t *c_count): + if metadata is None: + c_count[0] = 0 + c_metadata[0] = NULL + else: + metadatum_count = len(metadata) + if metadatum_count == 0: + c_count[0] = 0 + c_metadata[0] = NULL + else: + c_count[0] = metadatum_count + c_metadata[0] = gpr_malloc( + metadatum_count * sizeof(grpc_metadata)) + for index, (key, value) in enumerate(metadata): + encoded_key = _encode(key) + encoded_value = value if encoded_key[-4:] == b'-bin' else _encode(value) + c_metadata[0][index].key = _slice_from_bytes(encoded_key) + c_metadata[0][index].value = _slice_from_bytes(encoded_value) + + +cdef void _release_c_metadata(grpc_metadata *c_metadata, int count): + if 0 < count: + for index in range(count): + grpc_slice_unref(c_metadata[index].key) + grpc_slice_unref(c_metadata[index].value) + gpr_free(c_metadata) + + +cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice): + cdef bytes key = _slice_bytes(key_slice) + cdef bytes value = _slice_bytes(value_slice) + return _Metadatum( + _decode(key), value if key[-4:] == b'-bin' else _decode(value)) + + +cdef tuple _metadata(grpc_metadata_array *c_metadata_array): + return tuple( + _metadatum( + c_metadata_array.metadata[index].key, + c_metadata_array.metadata[index].value) + for index in range(c_metadata_array.count)) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 9c40ebf0c2..738769ff40 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -37,7 +37,7 @@ cdef class OperationTag: cdef Server shutting_down_server cdef Call operation_call cdef CallDetails request_call_details - cdef MetadataArray request_metadata + cdef grpc_metadata_array _c_request_metadata cdef Operations batch_operations cdef bint is_new_request @@ -84,28 +84,15 @@ cdef class ChannelArgs: cdef list args -cdef class Metadatum: - - cdef grpc_metadata c_metadata - cdef void _copy_metadatum(self, grpc_metadata *destination) nogil - - -cdef class Metadata: - - cdef grpc_metadata *c_metadata - cdef readonly size_t c_count - - -cdef class MetadataArray: - - cdef grpc_metadata_array c_metadata_array - - cdef class Operation: cdef grpc_op c_op + cdef bint _c_metadata_needs_release + cdef size_t _c_metadata_count + cdef grpc_metadata *_c_metadata cdef ByteBuffer _received_message - cdef MetadataArray _received_metadata + cdef bint _c_metadata_array_needs_destruction + cdef grpc_metadata_array _c_metadata_array cdef grpc_status_code _received_status_code cdef grpc_slice _status_details cdef int _received_cancelled diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 4f87261e17..8577c157c3 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -390,140 +390,13 @@ cdef class ChannelArgs: return self.args[i] -cdef class Metadatum: - - def __cinit__(self, bytes key, bytes value): - self.c_metadata.key = _slice_from_bytes(key) - self.c_metadata.value = _slice_from_bytes(value) - - cdef void _copy_metadatum(self, grpc_metadata *destination) nogil: - destination[0].key = _copy_slice(self.c_metadata.key) - destination[0].value = _copy_slice(self.c_metadata.value) - - @property - def key(self): - return _slice_bytes(self.c_metadata.key) - - @property - def value(self): - return _slice_bytes(self.c_metadata.value) - - def __len__(self): - return 2 - - def __getitem__(self, size_t i): - if i == 0: - return self.key - elif i == 1: - return self.value - else: - raise IndexError("index must be 0 (key) or 1 (value)") - - def __iter__(self): - return iter((self.key, self.value)) - - def __dealloc__(self): - grpc_slice_unref(self.c_metadata.key) - grpc_slice_unref(self.c_metadata.value) - -cdef class _MetadataIterator: - - cdef size_t i - cdef size_t _length - cdef object _metadatum_indexable - - def __cinit__(self, length, metadatum_indexable): - self._length = length - self._metadatum_indexable = metadatum_indexable - self.i = 0 - - def __iter__(self): - return self - - def __next__(self): - if self.i < self._length: - result = self._metadatum_indexable[self.i] - self.i = self.i + 1 - return result - else: - raise StopIteration() - - -# TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this; just use an -# ordinary sequence of pairs of bytestrings all the way down to the -# grpc_call_start_batch call. -cdef class Metadata: - """Metadata being passed from application to core.""" - - def __cinit__(self, metadata_iterable): - metadata_sequence = tuple(metadata_iterable) - cdef size_t count = len(metadata_sequence) - with nogil: - grpc_init() - self.c_metadata = gpr_malloc( - count * sizeof(grpc_metadata)) - self.c_count = count - for index, metadatum in enumerate(metadata_sequence): - self.c_metadata[index].key = grpc_slice_copy( - (metadatum).c_metadata.key) - self.c_metadata[index].value = grpc_slice_copy( - (metadatum).c_metadata.value) - - def __dealloc__(self): - with nogil: - for index in range(self.c_count): - grpc_slice_unref(self.c_metadata[index].key) - grpc_slice_unref(self.c_metadata[index].value) - gpr_free(self.c_metadata) - grpc_shutdown() - - def __len__(self): - return self.c_count - - def __getitem__(self, size_t index): - if index < self.c_count: - key = _slice_bytes(self.c_metadata[index].key) - value = _slice_bytes(self.c_metadata[index].value) - return Metadatum(key, value) - else: - raise IndexError() - - def __iter__(self): - return _MetadataIterator(self.c_count, self) - - -cdef class MetadataArray: - """Metadata being passed from core to application.""" - - def __cinit__(self): - with nogil: - grpc_init() - grpc_metadata_array_init(&self.c_metadata_array) - - def __dealloc__(self): - with nogil: - grpc_metadata_array_destroy(&self.c_metadata_array) - grpc_shutdown() - - def __len__(self): - return self.c_metadata_array.count - - def __getitem__(self, size_t i): - if i >= self.c_metadata_array.count: - raise IndexError() - key = _slice_bytes(self.c_metadata_array.metadata[i].key) - value = _slice_bytes(self.c_metadata_array.metadata[i].value) - return Metadatum(key=key, value=value) - - def __iter__(self): - return _MetadataIterator(self.c_metadata_array.count, self) - - cdef class Operation: def __cinit__(self): grpc_init() self.references = [] + self._c_metadata_needs_release = False + self._c_metadata_array_needs_destruction = False self._status_details = grpc_empty_slice() self.is_valid = False @@ -556,13 +429,7 @@ cdef class Operation: if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT): raise TypeError("self must be an operation receiving metadata") - # TODO(https://github.com/grpc/grpc/issues/7950): Drop the "all Cython - # objects must be legitimate for use from Python at any time" policy in - # place today, shift the policy toward "Operation objects are only usable - # while their calls are active", and move this making-a-copy-because-this- - # data-needs-to-live-much-longer-than-the-call-from-which-it-arose to the - # lowest Python layer. - return tuple(self._received_metadata) + return _metadata(&self._c_metadata_array) @property def received_status_code(self): @@ -602,16 +469,21 @@ cdef class Operation: return False if self._received_cancelled == 0 else True def __dealloc__(self): + if self._c_metadata_needs_release: + _release_c_metadata(self._c_metadata, self._c_metadata_count) + if self._c_metadata_array_needs_destruction: + grpc_metadata_array_destroy(&self._c_metadata_array) grpc_slice_unref(self._status_details) grpc_shutdown() -def operation_send_initial_metadata(Metadata metadata, int flags): +def operation_send_initial_metadata(metadata, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA op.c_op.flags = flags - op.c_op.data.send_initial_metadata.count = metadata.c_count - op.c_op.data.send_initial_metadata.metadata = metadata.c_metadata - op.references.append(metadata) + _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) + op._c_metadata_needs_release = True + op.c_op.data.send_initial_metadata.count = op._c_metadata_count + op.c_op.data.send_initial_metadata.metadata = op._c_metadata op.is_valid = True return op @@ -633,18 +505,19 @@ def operation_send_close_from_client(int flags): return op def operation_send_status_from_server( - Metadata metadata, grpc_status_code code, bytes details, int flags): + metadata, grpc_status_code code, bytes details, int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER op.c_op.flags = flags + _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) + op._c_metadata_needs_release = True op.c_op.data.send_status_from_server.trailing_metadata_count = ( - metadata.c_count) - op.c_op.data.send_status_from_server.trailing_metadata = metadata.c_metadata + op._c_metadata_count) + op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata op.c_op.data.send_status_from_server.status = code grpc_slice_unref(op._status_details) op._status_details = _slice_from_bytes(details) op.c_op.data.send_status_from_server.status_details = &op._status_details - op.references.append(metadata) op.is_valid = True return op @@ -652,9 +525,10 @@ def operation_receive_initial_metadata(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA op.c_op.flags = flags - op._received_metadata = MetadataArray() + grpc_metadata_array_init(&op._c_metadata_array) op.c_op.data.receive_initial_metadata.receive_initial_metadata = ( - &op._received_metadata.c_metadata_array) + &op._c_metadata_array) + op._c_metadata_array_needs_destruction = True op.is_valid = True return op @@ -675,9 +549,10 @@ def operation_receive_status_on_client(int flags): cdef Operation op = Operation() op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT op.c_op.flags = flags - op._received_metadata = MetadataArray() + grpc_metadata_array_init(&op._c_metadata_array) op.c_op.data.receive_status_on_client.trailing_metadata = ( - &op._received_metadata.c_metadata_array) + &op._c_metadata_array) + op._c_metadata_array_needs_destruction = True op.c_op.data.receive_status_on_client.status = ( &op._received_status_code) op.c_op.data.receive_status_on_client.status_details = ( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 5f3405936c..9d912acedf 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -78,23 +78,20 @@ cdef class Server: raise ValueError("server must be started and not shutting down") if server_queue not in self.registered_completion_queues: raise ValueError("server_queue must be a registered completion queue") - cdef grpc_call_error result cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = Call() operation_tag.request_call_details = CallDetails() - operation_tag.request_metadata = MetadataArray() + grpc_metadata_array_init(&operation_tag._c_request_metadata) operation_tag.references.extend([self, call_queue, server_queue]) operation_tag.is_new_request = True operation_tag.batch_operations = Operations([]) cpython.Py_INCREF(operation_tag) - with nogil: - result = grpc_server_request_call( - self.c_server, &operation_tag.operation_call.c_call, - &operation_tag.request_call_details.c_details, - &operation_tag.request_metadata.c_metadata_array, - call_queue.c_completion_queue, server_queue.c_completion_queue, - operation_tag) - return result + return grpc_server_request_call( + self.c_server, &operation_tag.operation_call.c_call, + &operation_tag.request_call_details.c_details, + &operation_tag._c_request_metadata, + call_queue.c_completion_queue, server_queue.c_completion_queue, + operation_tag) def register_completion_queue( self, CompletionQueue queue not None): diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index fc6cc5fb9f..6fc5638d5d 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -18,6 +18,7 @@ include "_cygrpc/call.pxd.pxi" include "_cygrpc/channel.pxd.pxi" include "_cygrpc/credentials.pxd.pxi" include "_cygrpc/completion_queue.pxd.pxi" +include "_cygrpc/metadata.pxd.pxi" include "_cygrpc/records.pxd.pxi" include "_cygrpc/security.pxd.pxi" include "_cygrpc/server.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 57165d5f5a..d605229822 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -25,6 +25,7 @@ include "_cygrpc/call.pyx.pxi" include "_cygrpc/channel.pyx.pxi" include "_cygrpc/credentials.pyx.pxi" include "_cygrpc/completion_queue.pyx.pxi" +include "_cygrpc/metadata.pyx.pxi" include "_cygrpc/records.pyx.pxi" include "_cygrpc/security.pyx.pxi" include "_cygrpc/server.pyx.pxi" diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py index cd17f4a049..f7287956dc 100644 --- a/src/python/grpcio/grpc/_plugin_wrapping.py +++ b/src/python/grpcio/grpc/_plugin_wrapping.py @@ -54,9 +54,7 @@ class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback): 'AuthMetadataPluginCallback raised exception "{}"!'.format( self._state.exception)) if error is None: - self._callback( - _common.to_cygrpc_metadata(metadata), cygrpc.StatusCode.ok, - None) + self._callback(metadata, cygrpc.StatusCode.ok, None) else: self._callback(None, cygrpc.StatusCode.internal, _common.encode(str(error))) diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index cd59b07c04..0771c2499e 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -129,15 +129,14 @@ def _abort(state, call, code, details): effective_details = details if state.details is None else state.details if state.initial_metadata_allowed: operations = (cygrpc.operation_send_initial_metadata( - _common.EMPTY_METADATA, - _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( - _common.to_cygrpc_metadata(state.trailing_metadata), - effective_code, effective_details, _EMPTY_FLAGS),) + (), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( + state.trailing_metadata, effective_code, effective_details, + _EMPTY_FLAGS),) token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN else: operations = (cygrpc.operation_send_status_from_server( - _common.to_cygrpc_metadata(state.trailing_metadata), - effective_code, effective_details, _EMPTY_FLAGS),) + state.trailing_metadata, effective_code, effective_details, + _EMPTY_FLAGS),) token = _SEND_STATUS_FROM_SERVER_TOKEN call.start_server_batch( cygrpc.Operations(operations), @@ -237,7 +236,7 @@ class _Context(grpc.ServicerContext): self._state.disable_next_compression = True def invocation_metadata(self): - return _common.to_application_metadata(self._rpc_event.request_metadata) + return self._rpc_event.request_metadata def peer(self): return _common.decode(self._rpc_event.operation_call.peer()) @@ -263,8 +262,7 @@ class _Context(grpc.ServicerContext): else: if self._state.initial_metadata_allowed: operation = cygrpc.operation_send_initial_metadata( - _common.to_cygrpc_metadata(initial_metadata), - _EMPTY_FLAGS) + initial_metadata, _EMPTY_FLAGS) self._rpc_event.operation_call.start_server_batch( cygrpc.Operations((operation,)), _send_initial_metadata(self._state)) @@ -275,8 +273,7 @@ class _Context(grpc.ServicerContext): def set_trailing_metadata(self, trailing_metadata): with self._state.condition: - self._state.trailing_metadata = _common.to_cygrpc_metadata( - trailing_metadata) + self._state.trailing_metadata = trailing_metadata def set_code(self, code): with self._state.condition: @@ -417,9 +414,8 @@ def _send_response(rpc_event, state, serialized_response): else: if state.initial_metadata_allowed: operations = (cygrpc.operation_send_initial_metadata( - _common.EMPTY_METADATA, _EMPTY_FLAGS), - cygrpc.operation_send_message(serialized_response, - _EMPTY_FLAGS),) + (), _EMPTY_FLAGS), cygrpc.operation_send_message( + serialized_response, _EMPTY_FLAGS),) state.initial_metadata_allowed = False token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN else: @@ -438,8 +434,7 @@ def _send_response(rpc_event, state, serialized_response): def _status(rpc_event, state, serialized_response): with state.condition: if state.client is not _CANCELLED: - trailing_metadata = _common.to_cygrpc_metadata( - state.trailing_metadata) + trailing_metadata = state.trailing_metadata code = _completion_code(state) details = _details(state) operations = [ @@ -448,8 +443,7 @@ def _status(rpc_event, state, serialized_response): ] if state.initial_metadata_allowed: operations.append( - cygrpc.operation_send_initial_metadata( - _common.EMPTY_METADATA, _EMPTY_FLAGS)) + cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS)) if serialized_response is not None: operations.append( cygrpc.operation_send_message(serialized_response, @@ -551,11 +545,10 @@ def _find_method_handler(rpc_event, generic_handlers): def _reject_rpc(rpc_event, status, details): - operations = (cygrpc.operation_send_initial_metadata(_common.EMPTY_METADATA, - _EMPTY_FLAGS), + operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS), cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( - _common.EMPTY_METADATA, status, details, _EMPTY_FLAGS),) + cygrpc.operation_send_status_from_server((), status, details, + _EMPTY_FLAGS),) rpc_state = _RPCState() rpc_event.operation_call.start_server_batch( operations, lambda ignored_event: (rpc_state, (),)) -- cgit v1.2.3