aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-01-23 07:48:42 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-01-23 07:48:42 -0800
commit7c70b6c144a20782b6be4751da68c6aa7b35648d (patch)
treef8ca929338d9f73cd9eec35001ecf52e3b1f6c1b /src/python
parentc7342a01bb069fcff6df0e22f6c2a403010998a1 (diff)
Revert "Revert "Metadata handling rewrite""
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi15
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi14
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi35
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi12
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi129
-rw-r--r--src/python/grpcio/grpc/_server.py2
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py6
7 files changed, 129 insertions, 84 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 73d1ff7b97..246e8399bc 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -60,20 +60,25 @@ cdef class Channel:
method, host, Timespec deadline not None):
if queue.is_shutting_down:
raise ValueError("queue must not be shutting down or shutdown")
- cdef char *method_c_string = method
- cdef char *host_c_string = NULL
+ cdef grpc_slice method_slice = _slice_from_bytes(method)
+ cdef grpc_slice host_slice
+ cdef grpc_slice *host_slice_ptr = NULL
if host is not None:
- host_c_string = host
+ host_slice = _slice_from_bytes(host)
+ host_slice_ptr = &host_slice
cdef Call operation_call = Call()
- operation_call.references = [self, method, host, queue]
+ operation_call.references = [self, queue]
cdef grpc_call *parent_call = NULL
if parent is not None:
parent_call = parent.c_call
with nogil:
operation_call.c_call = grpc_channel_create_call(
self.c_channel, parent_call, flags,
- queue.c_completion_queue, method_c_string, host_c_string,
+ queue.c_completion_queue, method_slice, host_slice_ptr,
deadline.c_time, NULL)
+ grpc_slice_unref(method_slice)
+ if host_slice_ptr:
+ grpc_slice_unref(host_slice)
return operation_call
def check_connectivity_state(self, bint try_to_connect):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index a258ba4063..d8df6c2ef4 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -51,6 +51,7 @@ cdef class CompletionQueue:
cdef CallDetails request_call_details = None
cdef Metadata request_metadata = None
cdef Operations batch_operations = None
+ cdef Operation batch_operation = None
if event.type == GRPC_QUEUE_TIMEOUT:
return Event(
event.type, False, None, None, None, None, False, None)
@@ -69,8 +70,15 @@ cdef class CompletionQueue:
user_tag = tag.user_tag
operation_call = tag.operation_call
request_call_details = tag.request_call_details
- request_metadata = tag.request_metadata
+ if tag.request_metadata is not None:
+ request_metadata = tag.request_metadata
+ request_metadata._claim_slice_ownership()
batch_operations = tag.batch_operations
+ if tag.batch_operations is not None:
+ for op in batch_operations.operations:
+ batch_operation = <Operation>op
+ if batch_operation._received_metadata is not None:
+ batch_operation._received_metadata._claim_slice_ownership()
if tag.is_new_request:
# Stuff in the tag not explicitly handled by us needs to live through
# the life of the call
@@ -91,7 +99,7 @@ cdef class CompletionQueue:
c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if deadline is not None:
c_deadline = deadline.c_time
-
+
while True:
c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
if gpr_time_cmp(c_timeout, c_deadline) > 0:
@@ -100,7 +108,7 @@ cdef class CompletionQueue:
self.c_completion_queue, c_timeout, NULL)
if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0:
break;
-
+
# Handle any signals
with gil:
cpython.PyErr_CheckSignals()
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index ad766186bd..141580b82a 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -51,6 +51,13 @@ cdef extern from "grpc/byte_buffer_reader.h":
pass
+cdef extern from "grpc/impl/codegen/exec_ctx_fwd.h":
+
+ struct grpc_exec_ctx:
+ # We don't care about the internals
+ pass
+
+
cdef extern from "grpc/grpc.h":
ctypedef struct grpc_slice:
@@ -60,6 +67,7 @@ cdef extern from "grpc/grpc.h":
grpc_slice grpc_slice_ref(grpc_slice s) nogil
void grpc_slice_unref(grpc_slice s) nogil
+ grpc_slice grpc_empty_slice() nogil
grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil
grpc_slice grpc_slice_new_with_len(
void *p, size_t len, void (*destroy)(void *, size_t)) nogil
@@ -175,7 +183,7 @@ cdef extern from "grpc/grpc.h":
ctypedef struct grpc_arg_pointer_vtable:
void *(*copy)(void *)
- void (*destroy)(void *)
+ void (*destroy)(grpc_exec_ctx *, void *)
int (*cmp)(void *, void *)
ctypedef struct grpc_arg_value_pointer:
@@ -217,9 +225,8 @@ cdef extern from "grpc/grpc.h":
GRPC_CHANNEL_SHUTDOWN
ctypedef struct grpc_metadata:
- const char *key
- const char *value
- size_t value_length
+ grpc_slice key
+ grpc_slice value
# ignore the 'internal_data.obfuscated' fields.
ctypedef enum grpc_completion_type:
@@ -241,10 +248,8 @@ cdef extern from "grpc/grpc.h":
void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil
ctypedef struct grpc_call_details:
- char *method
- size_t method_capacity
- char *host
- size_t host_capacity
+ grpc_slice method
+ grpc_slice host
gpr_timespec deadline
void grpc_call_details_init(grpc_call_details *details) nogil
@@ -268,13 +273,12 @@ cdef extern from "grpc/grpc.h":
size_t trailing_metadata_count
grpc_metadata *trailing_metadata
grpc_status_code status
- const char *status_details
+ grpc_slice *status_details
ctypedef struct grpc_op_data_recv_status_on_client:
grpc_metadata_array *trailing_metadata
grpc_status_code *status
- char **status_details
- size_t *status_details_capacity
+ grpc_slice *status_details
ctypedef struct grpc_op_data_recv_close_on_server:
int *cancelled
@@ -321,9 +325,9 @@ cdef extern from "grpc/grpc.h":
const grpc_channel_args *args,
void *reserved) nogil
grpc_call *grpc_channel_create_call(
- grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
- grpc_completion_queue *completion_queue, const char *method,
- const char *host, gpr_timespec deadline, void *reserved) nogil
+ grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
+ grpc_completion_queue *completion_queue, grpc_slice method,
+ const grpc_slice *host, gpr_timespec deadline, void *reserved) nogil
grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel *channel, int try_to_connect) nogil
void grpc_channel_watch_connectivity_state(
@@ -473,8 +477,7 @@ cdef extern from "grpc/compression.h":
grpc_compression_algorithm default_compression_algorithm
int grpc_compression_algorithm_parse(
- const char *name, size_t name_length,
- grpc_compression_algorithm *algorithm) nogil
+ grpc_slice value, grpc_compression_algorithm *algorithm) nogil
int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name) nogil
grpc_compression_algorithm grpc_compression_algorithm_for_level(
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 00ec91b131..c4a17118c0 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -28,6 +28,11 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+cdef bytes _slice_bytes(grpc_slice slice)
+cdef grpc_slice _copy_slice(grpc_slice slice) nogil
+cdef grpc_slice _slice_from_bytes(bytes value) nogil
+
+
cdef class Timespec:
cdef gpr_timespec c_time
@@ -97,13 +102,13 @@ cdef class ChannelArgs:
cdef class Metadatum:
cdef grpc_metadata c_metadata
- cdef object _key, _value
+ cdef void _copy_metadatum(self, grpc_metadata *destination) nogil
cdef class Metadata:
cdef grpc_metadata_array c_metadata_array
- cdef object metadata
+ cdef void _claim_slice_ownership(self)
cdef class Operation:
@@ -112,8 +117,7 @@ cdef class Operation:
cdef ByteBuffer _received_message
cdef Metadata _received_metadata
cdef grpc_status_code _received_status_code
- cdef char *_received_status_details
- cdef size_t _received_status_details_capacity
+ cdef grpc_slice _status_details
cdef int _received_cancelled
cdef readonly bint is_valid
cdef object references
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index cadfce6ee6..d052b3f8bc 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -29,6 +29,26 @@
from libc.stdint cimport intptr_t
+
+cdef bytes _slice_bytes(grpc_slice slice):
+ cdef void *start = grpc_slice_start_ptr(slice)
+ cdef size_t length = grpc_slice_length(slice)
+ return (<const char *>start)[:length]
+
+cdef grpc_slice _copy_slice(grpc_slice slice) nogil:
+ cdef void *start = grpc_slice_start_ptr(slice)
+ cdef size_t length = grpc_slice_length(slice)
+ return grpc_slice_from_copied_buffer(<const char *>start, length)
+
+cdef grpc_slice _slice_from_bytes(bytes value) nogil:
+ cdef const char *value_ptr
+ cdef size_t length
+ with gil:
+ value_ptr = <const char *>value
+ length = len(value)
+ return grpc_slice_from_copied_buffer(value_ptr, length)
+
+
class ConnectivityState:
idle = GRPC_CHANNEL_IDLE
connecting = GRPC_CHANNEL_CONNECTING
@@ -189,17 +209,11 @@ cdef class CallDetails:
@property
def method(self):
- if self.c_details.method != NULL:
- return <bytes>self.c_details.method
- else:
- return None
+ return _slice_bytes(self.c_details.method)
@property
def host(self):
- if self.c_details.host != NULL:
- return <bytes>self.c_details.host
- else:
- return None
+ return _slice_bytes(self.c_details.host)
@property
def deadline(self):
@@ -310,7 +324,7 @@ cdef void* copy_ptr(void* ptr):
return ptr
-cdef void destroy_ptr(void* ptr):
+cdef void destroy_ptr(grpc_exec_ctx* ctx, void* ptr):
pass
@@ -383,19 +397,20 @@ cdef class ChannelArgs:
cdef class Metadatum:
def __cinit__(self, bytes key, bytes value):
- self._key = key
- self._value = value
- self.c_metadata.key = self._key
- self.c_metadata.value = self._value
- self.c_metadata.value_length = len(self._value)
+ self.c_metadata.key = _slice_from_bytes(key)
+ self.c_metadata.value = _slice_from_bytes(value)
+
+ cdef void _copy_metadatum(self, grpc_metadata *destination) nogil:
+ destination[0].key = _copy_slice(self.c_metadata.key)
+ destination[0].value = _copy_slice(self.c_metadata.value)
@property
def key(self):
- return <bytes>self.c_metadata.key
+ return _slice_bytes(self.c_metadata.key)
@property
def value(self):
- return <bytes>self.c_metadata.value[:self.c_metadata.value_length]
+ return _slice_bytes(self.c_metadata.value)
def __len__(self):
return 2
@@ -411,6 +426,9 @@ cdef class Metadatum:
def __iter__(self):
return iter((self.key, self.value))
+ def __dealloc__(self):
+ grpc_slice_unref(self.c_metadata.key)
+ grpc_slice_unref(self.c_metadata.value)
cdef class _MetadataIterator:
@@ -435,51 +453,65 @@ cdef class _MetadataIterator:
cdef class Metadata:
- def __cinit__(self, metadata):
- grpc_init()
- self.metadata = list(metadata)
+ def __cinit__(self, metadata_iterable):
+ with nogil:
+ grpc_init()
+ grpc_metadata_array_init(&self.c_metadata_array)
+ metadata = list(metadata_iterable)
for metadatum in metadata:
if not isinstance(metadatum, Metadatum):
raise TypeError("expected list of Metadatum")
- with nogil:
- grpc_metadata_array_init(&self.c_metadata_array)
- self.c_metadata_array.count = len(self.metadata)
- self.c_metadata_array.capacity = len(self.metadata)
+ self.c_metadata_array.count = len(metadata)
+ self.c_metadata_array.capacity = len(metadata)
with nogil:
self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
self.c_metadata_array.count*sizeof(grpc_metadata)
)
for i in range(self.c_metadata_array.count):
- self.c_metadata_array.metadata[i] = (
- (<Metadatum>self.metadata[i]).c_metadata)
+ (<Metadatum>metadata[i])._copy_metadatum(&self.c_metadata_array.metadata[i])
def __dealloc__(self):
- # this frees the allocated memory for the grpc_metadata_array (although
- # it'd be nice if that were documented somewhere...)
- # TODO(atash): document this in the C core
- grpc_metadata_array_destroy(&self.c_metadata_array)
- grpc_shutdown()
+ with nogil:
+ # this frees the allocated memory for the grpc_metadata_array (although
+ # it'd be nice if that were documented somewhere...)
+ # TODO(atash): document this in the C core
+ grpc_metadata_array_destroy(&self.c_metadata_array)
+ grpc_shutdown()
def __len__(self):
return self.c_metadata_array.count
def __getitem__(self, size_t i):
- return Metadatum(
- key=<bytes>self.c_metadata_array.metadata[i].key,
- value=<bytes>self.c_metadata_array.metadata[i].value[
- :self.c_metadata_array.metadata[i].value_length])
+ if i >= self.c_metadata_array.count:
+ raise IndexError
+ key = _slice_bytes(self.c_metadata_array.metadata[i].key)
+ value = _slice_bytes(self.c_metadata_array.metadata[i].value)
+ return Metadatum(key=key, value=value)
def __iter__(self):
return _MetadataIterator(self)
+ cdef void _claim_slice_ownership(self):
+ cdef grpc_metadata_array new_c_metadata_array
+ grpc_metadata_array_init(&new_c_metadata_array)
+ new_c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
+ self.c_metadata_array.count*sizeof(grpc_metadata))
+ new_c_metadata_array.count = self.c_metadata_array.count
+ for i in range(self.c_metadata_array.count):
+ new_c_metadata_array.metadata[i].key = _copy_slice(
+ self.c_metadata_array.metadata[i].key)
+ new_c_metadata_array.metadata[i].value = _copy_slice(
+ self.c_metadata_array.metadata[i].value)
+ grpc_metadata_array_destroy(&self.c_metadata_array)
+ self.c_metadata_array = new_c_metadata_array
+
cdef class Operation:
def __cinit__(self):
grpc_init()
self.references = []
- self._received_status_details = NULL
- self._received_status_details_capacity = 0
+ self._status_details = grpc_empty_slice()
self.is_valid = False
@property
@@ -536,19 +568,13 @@ cdef class Operation:
def received_status_details(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
raise TypeError("self must be an operation receiving status details")
- if self._received_status_details:
- return self._received_status_details
- else:
- return None
+ return _slice_bytes(self._status_details)
@property
def received_status_details_or_none(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
return None
- if self._received_status_details:
- return self._received_status_details
- else:
- return None
+ return _slice_bytes(self._status_details)
@property
def received_cancelled(self):
@@ -564,11 +590,7 @@ cdef class Operation:
return False if self._received_cancelled == 0 else True
def __dealloc__(self):
- # We *almost* don't need to do anything; most of the objects are handled by
- # Python. The remaining one(s) are primitive fields filled in by GRPC core.
- # This means that we need to clean up after receive_status_on_client.
- if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT:
- gpr_free(self._received_status_details)
+ grpc_slice_unref(self._status_details)
grpc_shutdown()
def operation_send_initial_metadata(Metadata metadata, int flags):
@@ -609,9 +631,10 @@ def operation_send_status_from_server(
op.c_op.data.send_status_from_server.trailing_metadata = (
metadata.c_metadata_array.metadata)
op.c_op.data.send_status_from_server.status = code
- op.c_op.data.send_status_from_server.status_details = details
+ grpc_slice_unref(op._status_details)
+ op._status_details = _slice_from_bytes(details)
+ op.c_op.data.send_status_from_server.status_details = &op._status_details
op.references.append(metadata)
- op.references.append(details)
op.is_valid = True
return op
@@ -647,9 +670,7 @@ def operation_receive_status_on_client(int flags):
op.c_op.data.receive_status_on_client.status = (
&op._received_status_code)
op.c_op.data.receive_status_on_client.status_details = (
- &op._received_status_details)
- op.c_op.data.receive_status_on_client.status_details_capacity = (
- &op._received_status_details_capacity)
+ &op._status_details)
op.is_valid = True
return op
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index cbbe2dcbf5..31551e0f1b 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -592,6 +592,8 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
def _handle_call(rpc_event, generic_handlers, thread_pool):
+ if not rpc_event.success:
+ return None
if rpc_event.request_call_details.method is not None:
method_handler = _find_method_handler(rpc_event, generic_handlers)
if method_handler is None:
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index ba64174863..0b86661db8 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -159,6 +159,8 @@ CORE_SOURCE_FILES = [
'src/core/lib/slice/percent_encoding.c',
'src/core/lib/slice/slice.c',
'src/core/lib/slice/slice_buffer.c',
+ 'src/core/lib/slice/slice_hash_table.c',
+ 'src/core/lib/slice/slice_intern.c',
'src/core/lib/slice/slice_string_helpers.c',
'src/core/lib/surface/alarm.c',
'src/core/lib/surface/api_trace.c',
@@ -180,12 +182,13 @@ CORE_SOURCE_FILES = [
'src/core/lib/surface/version.c',
'src/core/lib/transport/byte_stream.c',
'src/core/lib/transport/connectivity_state.c',
- 'src/core/lib/transport/mdstr_hash_table.c',
+ 'src/core/lib/transport/error_utils.c',
'src/core/lib/transport/metadata.c',
'src/core/lib/transport/metadata_batch.c',
'src/core/lib/transport/pid_controller.c',
'src/core/lib/transport/service_config.c',
'src/core/lib/transport/static_metadata.c',
+ 'src/core/lib/transport/status_conversion.c',
'src/core/lib/transport/timeout_encoding.c',
'src/core/lib/transport/transport.c',
'src/core/lib/transport/transport_op_string.c',
@@ -206,7 +209,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/transport/huffsyms.c',
'src/core/ext/transport/chttp2/transport/incoming_metadata.c',
'src/core/ext/transport/chttp2/transport/parsing.c',
- 'src/core/ext/transport/chttp2/transport/status_conversion.c',
'src/core/ext/transport/chttp2/transport/stream_lists.c',
'src/core/ext/transport/chttp2/transport/stream_map.c',
'src/core/ext/transport/chttp2/transport/varint.c',