diff options
author | David G. Quintas <dgq@google.com> | 2017-01-23 12:08:46 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-23 12:08:46 -0800 |
commit | f2c5c1e541a3c42fd418063ec24bb9e591b3ad40 (patch) | |
tree | a93b1c9939dccef7173b56ee73a7aadd9e1e8ccb /src/python/grpcio | |
parent | 6e407b77627a538c4a9dda96fcf00438dcb76041 (diff) | |
parent | 5e01e2ac977655aa074faf7fde0a74298f5e4c55 (diff) |
Merge pull request #9428 from grpc/revert-8842-metadata_filter
Revert "Metadata handling rewrite"
Diffstat (limited to 'src/python/grpcio')
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi | 15 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi | 14 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi | 35 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi | 12 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi | 129 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_server.py | 2 | ||||
-rw-r--r-- | src/python/grpcio/grpc_core_dependencies.py | 6 |
7 files changed, 84 insertions, 129 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 246e8399bc..73d1ff7b97 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -60,25 +60,20 @@ cdef class Channel: method, host, Timespec deadline not None): if queue.is_shutting_down: raise ValueError("queue must not be shutting down or shutdown") - cdef grpc_slice method_slice = _slice_from_bytes(method) - cdef grpc_slice host_slice - cdef grpc_slice *host_slice_ptr = NULL + cdef char *method_c_string = method + cdef char *host_c_string = NULL if host is not None: - host_slice = _slice_from_bytes(host) - host_slice_ptr = &host_slice + host_c_string = host cdef Call operation_call = Call() - operation_call.references = [self, queue] + operation_call.references = [self, method, host, queue] cdef grpc_call *parent_call = NULL if parent is not None: parent_call = parent.c_call with nogil: operation_call.c_call = grpc_channel_create_call( self.c_channel, parent_call, flags, - queue.c_completion_queue, method_slice, host_slice_ptr, + queue.c_completion_queue, method_c_string, host_c_string, deadline.c_time, NULL) - grpc_slice_unref(method_slice) - if host_slice_ptr: - grpc_slice_unref(host_slice) return operation_call def check_connectivity_state(self, bint try_to_connect): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index d8df6c2ef4..a258ba4063 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -51,7 +51,6 @@ cdef class CompletionQueue: cdef CallDetails request_call_details = None cdef Metadata request_metadata = None cdef Operations batch_operations = None - cdef Operation batch_operation = None if event.type == GRPC_QUEUE_TIMEOUT: return Event( event.type, False, None, None, None, None, False, None) @@ -70,15 +69,8 @@ cdef class CompletionQueue: user_tag = tag.user_tag operation_call = tag.operation_call request_call_details = tag.request_call_details - if tag.request_metadata is not None: - request_metadata = tag.request_metadata - request_metadata._claim_slice_ownership() + request_metadata = tag.request_metadata batch_operations = tag.batch_operations - if tag.batch_operations is not None: - for op in batch_operations.operations: - batch_operation = <Operation>op - if batch_operation._received_metadata is not None: - batch_operation._received_metadata._claim_slice_ownership() if tag.is_new_request: # Stuff in the tag not explicitly handled by us needs to live through # the life of the call @@ -99,7 +91,7 @@ cdef class CompletionQueue: c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) if deadline is not None: c_deadline = deadline.c_time - + while True: c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) if gpr_time_cmp(c_timeout, c_deadline) > 0: @@ -108,7 +100,7 @@ cdef class CompletionQueue: self.c_completion_queue, c_timeout, NULL) if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0: break; - + # Handle any signals with gil: cpython.PyErr_CheckSignals() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 141580b82a..ad766186bd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -51,13 +51,6 @@ cdef extern from "grpc/byte_buffer_reader.h": pass -cdef extern from "grpc/impl/codegen/exec_ctx_fwd.h": - - struct grpc_exec_ctx: - # We don't care about the internals - pass - - cdef extern from "grpc/grpc.h": ctypedef struct grpc_slice: @@ -67,7 +60,6 @@ cdef extern from "grpc/grpc.h": grpc_slice grpc_slice_ref(grpc_slice s) nogil void grpc_slice_unref(grpc_slice s) nogil - grpc_slice grpc_empty_slice() nogil grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil grpc_slice grpc_slice_new_with_len( void *p, size_t len, void (*destroy)(void *, size_t)) nogil @@ -183,7 +175,7 @@ cdef extern from "grpc/grpc.h": ctypedef struct grpc_arg_pointer_vtable: void *(*copy)(void *) - void (*destroy)(grpc_exec_ctx *, void *) + void (*destroy)(void *) int (*cmp)(void *, void *) ctypedef struct grpc_arg_value_pointer: @@ -225,8 +217,9 @@ cdef extern from "grpc/grpc.h": GRPC_CHANNEL_SHUTDOWN ctypedef struct grpc_metadata: - grpc_slice key - grpc_slice value + const char *key + const char *value + size_t value_length # ignore the 'internal_data.obfuscated' fields. ctypedef enum grpc_completion_type: @@ -248,8 +241,10 @@ cdef extern from "grpc/grpc.h": void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil ctypedef struct grpc_call_details: - grpc_slice method - grpc_slice host + char *method + size_t method_capacity + char *host + size_t host_capacity gpr_timespec deadline void grpc_call_details_init(grpc_call_details *details) nogil @@ -273,12 +268,13 @@ cdef extern from "grpc/grpc.h": size_t trailing_metadata_count grpc_metadata *trailing_metadata grpc_status_code status - grpc_slice *status_details + const char *status_details ctypedef struct grpc_op_data_recv_status_on_client: grpc_metadata_array *trailing_metadata grpc_status_code *status - grpc_slice *status_details + char **status_details + size_t *status_details_capacity ctypedef struct grpc_op_data_recv_close_on_server: int *cancelled @@ -325,9 +321,9 @@ cdef extern from "grpc/grpc.h": const grpc_channel_args *args, void *reserved) nogil grpc_call *grpc_channel_create_call( - grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, - grpc_completion_queue *completion_queue, grpc_slice method, - const grpc_slice *host, gpr_timespec deadline, void *reserved) nogil + grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, + grpc_completion_queue *completion_queue, const char *method, + const char *host, gpr_timespec deadline, void *reserved) nogil grpc_connectivity_state grpc_channel_check_connectivity_state( grpc_channel *channel, int try_to_connect) nogil void grpc_channel_watch_connectivity_state( @@ -477,7 +473,8 @@ cdef extern from "grpc/compression.h": grpc_compression_algorithm default_compression_algorithm int grpc_compression_algorithm_parse( - grpc_slice value, grpc_compression_algorithm *algorithm) nogil + const char *name, size_t name_length, + grpc_compression_algorithm *algorithm) nogil int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, char **name) nogil grpc_compression_algorithm grpc_compression_algorithm_for_level( diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index c4a17118c0..00ec91b131 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -28,11 +28,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -cdef bytes _slice_bytes(grpc_slice slice) -cdef grpc_slice _copy_slice(grpc_slice slice) nogil -cdef grpc_slice _slice_from_bytes(bytes value) nogil - - cdef class Timespec: cdef gpr_timespec c_time @@ -102,13 +97,13 @@ cdef class ChannelArgs: cdef class Metadatum: cdef grpc_metadata c_metadata - cdef void _copy_metadatum(self, grpc_metadata *destination) nogil + cdef object _key, _value cdef class Metadata: cdef grpc_metadata_array c_metadata_array - cdef void _claim_slice_ownership(self) + cdef object metadata cdef class Operation: @@ -117,7 +112,8 @@ cdef class Operation: cdef ByteBuffer _received_message cdef Metadata _received_metadata cdef grpc_status_code _received_status_code - cdef grpc_slice _status_details + cdef char *_received_status_details + cdef size_t _received_status_details_capacity cdef int _received_cancelled cdef readonly bint is_valid cdef object references diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index a9163c75bf..69b837c4db 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -29,26 +29,6 @@ from libc.stdint cimport intptr_t - -cdef bytes _slice_bytes(grpc_slice slice): - cdef void *start = grpc_slice_start_ptr(slice) - cdef size_t length = grpc_slice_length(slice) - return (<const char *>start)[:length] - -cdef grpc_slice _copy_slice(grpc_slice slice) nogil: - cdef void *start = grpc_slice_start_ptr(slice) - cdef size_t length = grpc_slice_length(slice) - return grpc_slice_from_copied_buffer(<const char *>start, length) - -cdef grpc_slice _slice_from_bytes(bytes value) nogil: - cdef const char *value_ptr - cdef size_t length - with gil: - value_ptr = <const char *>value - length = len(value) - return grpc_slice_from_copied_buffer(value_ptr, length) - - class ConnectivityState: idle = GRPC_CHANNEL_IDLE connecting = GRPC_CHANNEL_CONNECTING @@ -228,11 +208,17 @@ cdef class CallDetails: @property def method(self): - return _slice_bytes(self.c_details.method) + if self.c_details.method != NULL: + return <bytes>self.c_details.method + else: + return None @property def host(self): - return _slice_bytes(self.c_details.host) + if self.c_details.host != NULL: + return <bytes>self.c_details.host + else: + return None @property def deadline(self): @@ -343,7 +329,7 @@ cdef void* copy_ptr(void* ptr): return ptr -cdef void destroy_ptr(grpc_exec_ctx* ctx, void* ptr): +cdef void destroy_ptr(void* ptr): pass @@ -416,20 +402,19 @@ cdef class ChannelArgs: cdef class Metadatum: def __cinit__(self, bytes key, bytes value): - self.c_metadata.key = _slice_from_bytes(key) - self.c_metadata.value = _slice_from_bytes(value) - - cdef void _copy_metadatum(self, grpc_metadata *destination) nogil: - destination[0].key = _copy_slice(self.c_metadata.key) - destination[0].value = _copy_slice(self.c_metadata.value) + self._key = key + self._value = value + self.c_metadata.key = self._key + self.c_metadata.value = self._value + self.c_metadata.value_length = len(self._value) @property def key(self): - return _slice_bytes(self.c_metadata.key) + return <bytes>self.c_metadata.key @property def value(self): - return _slice_bytes(self.c_metadata.value) + return <bytes>self.c_metadata.value[:self.c_metadata.value_length] def __len__(self): return 2 @@ -445,9 +430,6 @@ cdef class Metadatum: def __iter__(self): return iter((self.key, self.value)) - def __dealloc__(self): - grpc_slice_unref(self.c_metadata.key) - grpc_slice_unref(self.c_metadata.value) cdef class _MetadataIterator: @@ -472,65 +454,51 @@ cdef class _MetadataIterator: cdef class Metadata: - def __cinit__(self, metadata_iterable): - with nogil: - grpc_init() - grpc_metadata_array_init(&self.c_metadata_array) - metadata = list(metadata_iterable) + def __cinit__(self, metadata): + grpc_init() + self.metadata = list(metadata) for metadatum in metadata: if not isinstance(metadatum, Metadatum): raise TypeError("expected list of Metadatum") - self.c_metadata_array.count = len(metadata) - self.c_metadata_array.capacity = len(metadata) + with nogil: + grpc_metadata_array_init(&self.c_metadata_array) + self.c_metadata_array.count = len(self.metadata) + self.c_metadata_array.capacity = len(self.metadata) with nogil: self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( self.c_metadata_array.count*sizeof(grpc_metadata) ) for i in range(self.c_metadata_array.count): - (<Metadatum>metadata[i])._copy_metadatum(&self.c_metadata_array.metadata[i]) + self.c_metadata_array.metadata[i] = ( + (<Metadatum>self.metadata[i]).c_metadata) def __dealloc__(self): - with nogil: - # this frees the allocated memory for the grpc_metadata_array (although - # it'd be nice if that were documented somewhere...) - # TODO(atash): document this in the C core - grpc_metadata_array_destroy(&self.c_metadata_array) - grpc_shutdown() + # this frees the allocated memory for the grpc_metadata_array (although + # it'd be nice if that were documented somewhere...) + # TODO(atash): document this in the C core + grpc_metadata_array_destroy(&self.c_metadata_array) + grpc_shutdown() def __len__(self): return self.c_metadata_array.count def __getitem__(self, size_t i): - if i >= self.c_metadata_array.count: - raise IndexError - key = _slice_bytes(self.c_metadata_array.metadata[i].key) - value = _slice_bytes(self.c_metadata_array.metadata[i].value) - return Metadatum(key=key, value=value) + return Metadatum( + key=<bytes>self.c_metadata_array.metadata[i].key, + value=<bytes>self.c_metadata_array.metadata[i].value[ + :self.c_metadata_array.metadata[i].value_length]) def __iter__(self): return _MetadataIterator(self) - cdef void _claim_slice_ownership(self): - cdef grpc_metadata_array new_c_metadata_array - grpc_metadata_array_init(&new_c_metadata_array) - new_c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( - self.c_metadata_array.count*sizeof(grpc_metadata)) - new_c_metadata_array.count = self.c_metadata_array.count - for i in range(self.c_metadata_array.count): - new_c_metadata_array.metadata[i].key = _copy_slice( - self.c_metadata_array.metadata[i].key) - new_c_metadata_array.metadata[i].value = _copy_slice( - self.c_metadata_array.metadata[i].value) - grpc_metadata_array_destroy(&self.c_metadata_array) - self.c_metadata_array = new_c_metadata_array - cdef class Operation: def __cinit__(self): grpc_init() self.references = [] - self._status_details = grpc_empty_slice() + self._received_status_details = NULL + self._received_status_details_capacity = 0 self.is_valid = False @property @@ -587,13 +555,19 @@ cdef class Operation: def received_status_details(self): if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: raise TypeError("self must be an operation receiving status details") - return _slice_bytes(self._status_details) + if self._received_status_details: + return self._received_status_details + else: + return None @property def received_status_details_or_none(self): if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: return None - return _slice_bytes(self._status_details) + if self._received_status_details: + return self._received_status_details + else: + return None @property def received_cancelled(self): @@ -609,7 +583,11 @@ cdef class Operation: return False if self._received_cancelled == 0 else True def __dealloc__(self): - grpc_slice_unref(self._status_details) + # We *almost* don't need to do anything; most of the objects are handled by + # Python. The remaining one(s) are primitive fields filled in by GRPC core. + # This means that we need to clean up after receive_status_on_client. + if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT: + gpr_free(self._received_status_details) grpc_shutdown() def operation_send_initial_metadata(Metadata metadata, int flags): @@ -650,10 +628,9 @@ def operation_send_status_from_server( op.c_op.data.send_status_from_server.trailing_metadata = ( metadata.c_metadata_array.metadata) op.c_op.data.send_status_from_server.status = code - grpc_slice_unref(op._status_details) - op._status_details = _slice_from_bytes(details) - op.c_op.data.send_status_from_server.status_details = &op._status_details + op.c_op.data.send_status_from_server.status_details = details op.references.append(metadata) + op.references.append(details) op.is_valid = True return op @@ -689,7 +666,9 @@ def operation_receive_status_on_client(int flags): op.c_op.data.receive_status_on_client.status = ( &op._received_status_code) op.c_op.data.receive_status_on_client.status_details = ( - &op._status_details) + &op._received_status_details) + op.c_op.data.receive_status_on_client.status_details_capacity = ( + &op._received_status_details_capacity) op.is_valid = True return op diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 31551e0f1b..cbbe2dcbf5 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -592,8 +592,6 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool): def _handle_call(rpc_event, generic_handlers, thread_pool): - if not rpc_event.success: - return None if rpc_event.request_call_details.method is not None: method_handler = _find_method_handler(rpc_event, generic_handlers) if method_handler is None: diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 0b86661db8..ba64174863 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -159,8 +159,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/slice/percent_encoding.c', 'src/core/lib/slice/slice.c', 'src/core/lib/slice/slice_buffer.c', - 'src/core/lib/slice/slice_hash_table.c', - 'src/core/lib/slice/slice_intern.c', 'src/core/lib/slice/slice_string_helpers.c', 'src/core/lib/surface/alarm.c', 'src/core/lib/surface/api_trace.c', @@ -182,13 +180,12 @@ CORE_SOURCE_FILES = [ 'src/core/lib/surface/version.c', 'src/core/lib/transport/byte_stream.c', 'src/core/lib/transport/connectivity_state.c', - 'src/core/lib/transport/error_utils.c', + 'src/core/lib/transport/mdstr_hash_table.c', 'src/core/lib/transport/metadata.c', 'src/core/lib/transport/metadata_batch.c', 'src/core/lib/transport/pid_controller.c', 'src/core/lib/transport/service_config.c', 'src/core/lib/transport/static_metadata.c', - 'src/core/lib/transport/status_conversion.c', 'src/core/lib/transport/timeout_encoding.c', 'src/core/lib/transport/transport.c', 'src/core/lib/transport/transport_op_string.c', @@ -209,6 +206,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/transport/chttp2/transport/huffsyms.c', 'src/core/ext/transport/chttp2/transport/incoming_metadata.c', 'src/core/ext/transport/chttp2/transport/parsing.c', + 'src/core/ext/transport/chttp2/transport/status_conversion.c', 'src/core/ext/transport/chttp2/transport/stream_lists.c', 'src/core/ext/transport/chttp2/transport/stream_map.c', 'src/core/ext/transport/chttp2/transport/varint.c', |