aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_cython/_cygrpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/grpc/_cython/_cygrpc')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi12
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi9
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi8
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi11
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi24
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi26
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi62
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi41
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi249
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi22
11 files changed, 196 insertions, 270 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 6b3a276097..6361669757 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -26,20 +26,16 @@ cdef class Call:
def _start_batch(self, operations, tag, retain_self):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
- cdef grpc_call_error result
- cdef Operations cy_operations = Operations(operations)
- cdef OperationTag operation_tag = OperationTag(tag)
+ cdef OperationTag operation_tag = OperationTag(tag, operations)
if retain_self:
operation_tag.operation_call = self
else:
operation_tag.operation_call = None
- operation_tag.batch_operations = cy_operations
+ operation_tag.store_ops()
cpython.Py_INCREF(operation_tag)
- with nogil:
- result = grpc_call_start_batch(
- self.c_call, cy_operations.c_ops, cy_operations.c_nops,
+ return grpc_call_start_batch(
+ self.c_call, operation_tag.c_ops, operation_tag.c_nops,
<cpython.PyObject *>operation_tag, NULL)
- return result
def start_client_batch(self, operations, tag):
# We don't reference this call in the operations tag because
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 4c397f8f64..644df674cc 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -76,7 +76,7 @@ cdef class Channel:
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state,
Timespec deadline not None, CompletionQueue queue not None, tag):
- cdef OperationTag operation_tag = OperationTag(tag)
+ cdef OperationTag operation_tag = OperationTag(tag, None)
cpython.Py_INCREF(operation_tag)
with nogil:
grpc_channel_watch_connectivity_state(
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 237f430799..140fc357b9 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -42,7 +42,7 @@ cdef class CompletionQueue:
cdef Call operation_call = None
cdef CallDetails request_call_details = None
cdef object request_metadata = None
- cdef Operations batch_operations = None
+ cdef object batch_operations = None
if event.type == GRPC_QUEUE_TIMEOUT:
return Event(
event.type, False, None, None, None, None, False, None)
@@ -61,9 +61,10 @@ cdef class CompletionQueue:
user_tag = tag.user_tag
operation_call = tag.operation_call
request_call_details = tag.request_call_details
- if tag.request_metadata is not None:
- request_metadata = tuple(tag.request_metadata)
- batch_operations = tag.batch_operations
+ if tag.is_new_request:
+ request_metadata = _metadata(&tag._c_request_metadata)
+ grpc_metadata_array_destroy(&tag._c_request_metadata)
+ batch_operations = tag.release_ops()
if tag.is_new_request:
# Stuff in the tag not explicitly handled by us needs to live through
# the life of the call
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index 246a271893..500086f6cb 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -30,9 +30,13 @@ cdef int _get_metadata(
grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
size_t *num_creds_md, grpc_status_code *status,
const char **error_details) with gil:
- def callback(Metadata metadata, grpc_status_code status, bytes error_details):
+ cdef size_t metadata_count
+ cdef grpc_metadata *c_metadata
+ def callback(metadata, grpc_status_code status, bytes error_details):
if status is StatusCode.ok:
- cb(user_data, metadata.c_metadata, metadata.c_count, status, NULL)
+ _store_c_metadata(metadata, &c_metadata, &metadata_count)
+ cb(user_data, c_metadata, metadata_count, status, NULL)
+ _release_c_metadata(c_metadata, metadata_count)
else:
cb(user_data, NULL, 0, status, error_details)
args = context.service_url, context.method_name, callback,
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 660263fc09..6a72bbf693 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -36,13 +36,6 @@ cdef extern from "grpc/byte_buffer_reader.h":
pass
-cdef extern from "grpc/impl/codegen/exec_ctx_fwd.h":
-
- struct grpc_exec_ctx:
- # We don't care about the internals
- pass
-
-
cdef extern from "grpc/grpc.h":
ctypedef struct grpc_slice:
@@ -169,7 +162,7 @@ cdef extern from "grpc/grpc.h":
ctypedef struct grpc_arg_pointer_vtable:
void *(*copy)(void *)
- void (*destroy)(grpc_exec_ctx *, void *)
+ void (*destroy)(void *)
int (*cmp)(void *, void *)
ctypedef struct grpc_arg_value_pointer:
@@ -524,7 +517,7 @@ cdef extern from "grpc/grpc_security.h":
grpc_auth_property_iterator grpc_auth_context_property_iterator(
const grpc_auth_context *ctx)
-
+
grpc_auth_property_iterator grpc_auth_context_peer_identity(
const grpc_auth_context *ctx)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi
index c8f11f8e19..e3cad9acb3 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
# This function will ascii encode unicode string inputs if neccesary.
# In Python3, unicode strings are the default str type.
@@ -22,3 +24,25 @@ cdef bytes str_to_bytes(object s):
return s.encode('ascii')
else:
raise TypeError('Expected bytes, str, or unicode, not {}'.format(type(s)))
+
+
+cdef bytes _encode(str native_string_or_none):
+ if native_string_or_none is None:
+ return b''
+ elif isinstance(native_string_or_none, (bytes,)):
+ return <bytes>native_string_or_none
+ elif isinstance(native_string_or_none, (unicode,)):
+ return native_string_or_none.encode('ascii')
+ else:
+ raise TypeError('Expected str, not {}'.format(type(native_string_or_none)))
+
+
+cdef str _decode(bytes bytestring):
+ if isinstance(bytestring, (str,)):
+ return <str>bytestring
+ else:
+ try:
+ return bytestring.decode('utf8')
+ except UnicodeDecodeError:
+ logging.exception('Invalid encoding on %s', bytestring)
+ return bytestring.decode('latin1')
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi
new file mode 100644
index 0000000000..a18c365807
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pxd.pxi
@@ -0,0 +1,26 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef void _store_c_metadata(
+ metadata, grpc_metadata **c_metadata, size_t *c_count)
+
+
+cdef void _release_c_metadata(grpc_metadata *c_metadata, int count)
+
+
+cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice)
+
+
+cdef tuple _metadata(grpc_metadata_array *c_metadata_array)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi
new file mode 100644
index 0000000000..c39fef08fa
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi
@@ -0,0 +1,62 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+
+_Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',))
+
+
+cdef void _store_c_metadata(
+ metadata, grpc_metadata **c_metadata, size_t *c_count):
+ if metadata is None:
+ c_count[0] = 0
+ c_metadata[0] = NULL
+ else:
+ metadatum_count = len(metadata)
+ if metadatum_count == 0:
+ c_count[0] = 0
+ c_metadata[0] = NULL
+ else:
+ c_count[0] = metadatum_count
+ c_metadata[0] = <grpc_metadata *>gpr_malloc(
+ metadatum_count * sizeof(grpc_metadata))
+ for index, (key, value) in enumerate(metadata):
+ encoded_key = _encode(key)
+ encoded_value = value if encoded_key[-4:] == b'-bin' else _encode(value)
+ c_metadata[0][index].key = _slice_from_bytes(encoded_key)
+ c_metadata[0][index].value = _slice_from_bytes(encoded_value)
+
+
+cdef void _release_c_metadata(grpc_metadata *c_metadata, int count):
+ if 0 < count:
+ for index in range(count):
+ grpc_slice_unref(c_metadata[index].key)
+ grpc_slice_unref(c_metadata[index].value)
+ gpr_free(c_metadata)
+
+
+cdef tuple _metadatum(grpc_slice key_slice, grpc_slice value_slice):
+ cdef bytes key = _slice_bytes(key_slice)
+ cdef bytes value = _slice_bytes(value_slice)
+ return <tuple>_Metadatum(
+ _decode(key), value if key[-4:] == b'-bin' else _decode(value))
+
+
+cdef tuple _metadata(grpc_metadata_array *c_metadata_array):
+ return tuple(
+ _metadatum(
+ c_metadata_array.metadata[index].key,
+ c_metadata_array.metadata[index].value)
+ for index in range(c_metadata_array.count))
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 9c40ebf0c2..594fdb1a8b 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -37,10 +37,15 @@ cdef class OperationTag:
cdef Server shutting_down_server
cdef Call operation_call
cdef CallDetails request_call_details
- cdef MetadataArray request_metadata
- cdef Operations batch_operations
+ cdef grpc_metadata_array _c_request_metadata
+ cdef grpc_op *c_ops
+ cdef size_t c_nops
+ cdef readonly object _operations
cdef bint is_new_request
+ cdef void store_ops(self)
+ cdef object release_ops(self)
+
cdef class Event:
@@ -57,7 +62,7 @@ cdef class Event:
cdef readonly Call operation_call
# For Call.start_batch
- cdef readonly Operations batch_operations
+ cdef readonly object batch_operations
cdef class ByteBuffer:
@@ -84,28 +89,15 @@ cdef class ChannelArgs:
cdef list args
-cdef class Metadatum:
-
- cdef grpc_metadata c_metadata
- cdef void _copy_metadatum(self, grpc_metadata *destination) nogil
-
-
-cdef class Metadata:
-
- cdef grpc_metadata *c_metadata
- cdef readonly size_t c_count
-
-
-cdef class MetadataArray:
-
- cdef grpc_metadata_array c_metadata_array
-
-
cdef class Operation:
cdef grpc_op c_op
+ cdef bint _c_metadata_needs_release
+ cdef size_t _c_metadata_count
+ cdef grpc_metadata *_c_metadata
cdef ByteBuffer _received_message
- cdef MetadataArray _received_metadata
+ cdef bint _c_metadata_array_needs_destruction
+ cdef grpc_metadata_array _c_metadata_array
cdef grpc_status_code _received_status_code
cdef grpc_slice _status_details
cdef int _received_cancelled
@@ -113,13 +105,6 @@ cdef class Operation:
cdef object references
-cdef class Operations:
-
- cdef grpc_op *c_ops
- cdef size_t c_nops
- cdef list operations
-
-
cdef class CompressionOptions:
cdef grpc_compression_options c_options
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 4f87261e17..26eaf50eb4 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -220,9 +220,26 @@ cdef class CallDetails:
cdef class OperationTag:
- def __cinit__(self, user_tag):
+ def __cinit__(self, user_tag, operations):
self.user_tag = user_tag
self.references = []
+ self._operations = operations
+
+ cdef void store_ops(self):
+ self.c_nops = 0 if self._operations is None else len(self._operations)
+ if 0 < self.c_nops:
+ self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
+ for index in range(self.c_nops):
+ self.c_ops[index] = (<Operation>(self._operations[index])).c_op
+
+ cdef object release_ops(self):
+ if 0 < self.c_nops:
+ for index, operation in enumerate(self._operations):
+ (<Operation>operation).c_op = self.c_ops[index]
+ gpr_free(self.c_ops)
+ return self._operations
+ else:
+ return ()
cdef class Event:
@@ -232,7 +249,7 @@ cdef class Event:
CallDetails request_call_details,
object request_metadata,
bint is_new_request,
- Operations batch_operations):
+ object batch_operations):
self.type = type
self.success = success
self.tag = tag
@@ -320,7 +337,7 @@ cdef void* copy_ptr(void* ptr):
return ptr
-cdef void destroy_ptr(grpc_exec_ctx* ctx, void* ptr):
+cdef void destroy_ptr(void* ptr):
pass
@@ -348,7 +365,7 @@ cdef class ChannelArg:
elif hasattr(value, '__int__'):
# Pointer objects must override __int__() to return
# the underlying C address (Python ints are word size). The
- # lifecycle of the pointer is fixed to the lifecycle of the
+ # lifecycle of the pointer is fixed to the lifecycle of the
# python object wrapping it.
self.ptr_vtable.copy = &copy_ptr
self.ptr_vtable.destroy = &destroy_ptr
@@ -390,140 +407,13 @@ cdef class ChannelArgs:
return self.args[i]
-cdef class Metadatum:
-
- def __cinit__(self, bytes key, bytes value):
- self.c_metadata.key = _slice_from_bytes(key)
- self.c_metadata.value = _slice_from_bytes(value)
-
- cdef void _copy_metadatum(self, grpc_metadata *destination) nogil:
- destination[0].key = _copy_slice(self.c_metadata.key)
- destination[0].value = _copy_slice(self.c_metadata.value)
-
- @property
- def key(self):
- return _slice_bytes(self.c_metadata.key)
-
- @property
- def value(self):
- return _slice_bytes(self.c_metadata.value)
-
- def __len__(self):
- return 2
-
- def __getitem__(self, size_t i):
- if i == 0:
- return self.key
- elif i == 1:
- return self.value
- else:
- raise IndexError("index must be 0 (key) or 1 (value)")
-
- def __iter__(self):
- return iter((self.key, self.value))
-
- def __dealloc__(self):
- grpc_slice_unref(self.c_metadata.key)
- grpc_slice_unref(self.c_metadata.value)
-
-cdef class _MetadataIterator:
-
- cdef size_t i
- cdef size_t _length
- cdef object _metadatum_indexable
-
- def __cinit__(self, length, metadatum_indexable):
- self._length = length
- self._metadatum_indexable = metadatum_indexable
- self.i = 0
-
- def __iter__(self):
- return self
-
- def __next__(self):
- if self.i < self._length:
- result = self._metadatum_indexable[self.i]
- self.i = self.i + 1
- return result
- else:
- raise StopIteration()
-
-
-# TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this; just use an
-# ordinary sequence of pairs of bytestrings all the way down to the
-# grpc_call_start_batch call.
-cdef class Metadata:
- """Metadata being passed from application to core."""
-
- def __cinit__(self, metadata_iterable):
- metadata_sequence = tuple(metadata_iterable)
- cdef size_t count = len(metadata_sequence)
- with nogil:
- grpc_init()
- self.c_metadata = <grpc_metadata *>gpr_malloc(
- count * sizeof(grpc_metadata))
- self.c_count = count
- for index, metadatum in enumerate(metadata_sequence):
- self.c_metadata[index].key = grpc_slice_copy(
- (<Metadatum>metadatum).c_metadata.key)
- self.c_metadata[index].value = grpc_slice_copy(
- (<Metadatum>metadatum).c_metadata.value)
-
- def __dealloc__(self):
- with nogil:
- for index in range(self.c_count):
- grpc_slice_unref(self.c_metadata[index].key)
- grpc_slice_unref(self.c_metadata[index].value)
- gpr_free(self.c_metadata)
- grpc_shutdown()
-
- def __len__(self):
- return self.c_count
-
- def __getitem__(self, size_t index):
- if index < self.c_count:
- key = _slice_bytes(self.c_metadata[index].key)
- value = _slice_bytes(self.c_metadata[index].value)
- return Metadatum(key, value)
- else:
- raise IndexError()
-
- def __iter__(self):
- return _MetadataIterator(self.c_count, self)
-
-
-cdef class MetadataArray:
- """Metadata being passed from core to application."""
-
- def __cinit__(self):
- with nogil:
- grpc_init()
- grpc_metadata_array_init(&self.c_metadata_array)
-
- def __dealloc__(self):
- with nogil:
- grpc_metadata_array_destroy(&self.c_metadata_array)
- grpc_shutdown()
-
- def __len__(self):
- return self.c_metadata_array.count
-
- def __getitem__(self, size_t i):
- if i >= self.c_metadata_array.count:
- raise IndexError()
- key = _slice_bytes(self.c_metadata_array.metadata[i].key)
- value = _slice_bytes(self.c_metadata_array.metadata[i].value)
- return Metadatum(key=key, value=value)
-
- def __iter__(self):
- return _MetadataIterator(self.c_metadata_array.count, self)
-
-
cdef class Operation:
def __cinit__(self):
grpc_init()
self.references = []
+ self._c_metadata_needs_release = False
+ self._c_metadata_array_needs_destruction = False
self._status_details = grpc_empty_slice()
self.is_valid = False
@@ -556,13 +446,7 @@ cdef class Operation:
if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
raise TypeError("self must be an operation receiving metadata")
- # TODO(https://github.com/grpc/grpc/issues/7950): Drop the "all Cython
- # objects must be legitimate for use from Python at any time" policy in
- # place today, shift the policy toward "Operation objects are only usable
- # while their calls are active", and move this making-a-copy-because-this-
- # data-needs-to-live-much-longer-than-the-call-from-which-it-arose to the
- # lowest Python layer.
- return tuple(self._received_metadata)
+ return _metadata(&self._c_metadata_array)
@property
def received_status_code(self):
@@ -602,16 +486,21 @@ cdef class Operation:
return False if self._received_cancelled == 0 else True
def __dealloc__(self):
+ if self._c_metadata_needs_release:
+ _release_c_metadata(self._c_metadata, self._c_metadata_count)
+ if self._c_metadata_array_needs_destruction:
+ grpc_metadata_array_destroy(&self._c_metadata_array)
grpc_slice_unref(self._status_details)
grpc_shutdown()
-def operation_send_initial_metadata(Metadata metadata, int flags):
+def operation_send_initial_metadata(metadata, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
op.c_op.flags = flags
- op.c_op.data.send_initial_metadata.count = metadata.c_count
- op.c_op.data.send_initial_metadata.metadata = metadata.c_metadata
- op.references.append(metadata)
+ _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
+ op._c_metadata_needs_release = True
+ op.c_op.data.send_initial_metadata.count = op._c_metadata_count
+ op.c_op.data.send_initial_metadata.metadata = op._c_metadata
op.is_valid = True
return op
@@ -633,18 +522,19 @@ def operation_send_close_from_client(int flags):
return op
def operation_send_status_from_server(
- Metadata metadata, grpc_status_code code, bytes details, int flags):
+ metadata, grpc_status_code code, bytes details, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
op.c_op.flags = flags
+ _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
+ op._c_metadata_needs_release = True
op.c_op.data.send_status_from_server.trailing_metadata_count = (
- metadata.c_count)
- op.c_op.data.send_status_from_server.trailing_metadata = metadata.c_metadata
+ op._c_metadata_count)
+ op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata
op.c_op.data.send_status_from_server.status = code
grpc_slice_unref(op._status_details)
op._status_details = _slice_from_bytes(details)
op.c_op.data.send_status_from_server.status_details = &op._status_details
- op.references.append(metadata)
op.is_valid = True
return op
@@ -652,9 +542,10 @@ def operation_receive_initial_metadata(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
op.c_op.flags = flags
- op._received_metadata = MetadataArray()
+ grpc_metadata_array_init(&op._c_metadata_array)
op.c_op.data.receive_initial_metadata.receive_initial_metadata = (
- &op._received_metadata.c_metadata_array)
+ &op._c_metadata_array)
+ op._c_metadata_array_needs_destruction = True
op.is_valid = True
return op
@@ -675,9 +566,10 @@ def operation_receive_status_on_client(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
op.c_op.flags = flags
- op._received_metadata = MetadataArray()
+ grpc_metadata_array_init(&op._c_metadata_array)
op.c_op.data.receive_status_on_client.trailing_metadata = (
- &op._received_metadata.c_metadata_array)
+ &op._c_metadata_array)
+ op._c_metadata_array_needs_destruction = True
op.c_op.data.receive_status_on_client.status = (
&op._received_status_code)
op.c_op.data.receive_status_on_client.status_details = (
@@ -694,59 +586,6 @@ def operation_receive_close_on_server(int flags):
return op
-cdef class _OperationsIterator:
-
- cdef size_t i
- cdef Operations operations
-
- def __cinit__(self, Operations operations not None):
- self.i = 0
- self.operations = operations
-
- def __iter__(self):
- return self
-
- def __next__(self):
- if self.i < len(self.operations):
- result = self.operations[self.i]
- self.i = self.i + 1
- return result
- else:
- raise StopIteration()
-
-
-cdef class Operations:
-
- def __cinit__(self, operations):
- grpc_init()
- self.operations = list(operations) # normalize iterable
- self.c_ops = NULL
- self.c_nops = 0
- for operation in self.operations:
- if not isinstance(operation, Operation):
- raise TypeError("expected operations to be iterable of Operation")
- self.c_nops = len(self.operations)
- with nogil:
- self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops)
- for i in range(self.c_nops):
- self.c_ops[i] = (<Operation>(self.operations[i])).c_op
-
- def __len__(self):
- return self.c_nops
-
- def __getitem__(self, size_t i):
- # self.operations is never stale; it's only updated from this file
- return self.operations[i]
-
- def __dealloc__(self):
- with nogil:
- gpr_free(self.c_ops)
- grpc_shutdown()
-
- def __iter__(self):
- return _OperationsIterator(self)
-
-
cdef class CompressionOptions:
def __cinit__(self):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index 5f3405936c..f8d7892858 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -78,23 +78,19 @@ cdef class Server:
raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue")
- cdef grpc_call_error result
- cdef OperationTag operation_tag = OperationTag(tag)
+ cdef OperationTag operation_tag = OperationTag(tag, None)
operation_tag.operation_call = Call()
operation_tag.request_call_details = CallDetails()
- operation_tag.request_metadata = MetadataArray()
+ grpc_metadata_array_init(&operation_tag._c_request_metadata)
operation_tag.references.extend([self, call_queue, server_queue])
operation_tag.is_new_request = True
- operation_tag.batch_operations = Operations([])
cpython.Py_INCREF(operation_tag)
- with nogil:
- result = grpc_server_request_call(
- self.c_server, &operation_tag.operation_call.c_call,
- &operation_tag.request_call_details.c_details,
- &operation_tag.request_metadata.c_metadata_array,
- call_queue.c_completion_queue, server_queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
- return result
+ return grpc_server_request_call(
+ self.c_server, &operation_tag.operation_call.c_call,
+ &operation_tag.request_call_details.c_details,
+ &operation_tag._c_request_metadata,
+ call_queue.c_completion_queue, server_queue.c_completion_queue,
+ <cpython.PyObject *>operation_tag)
def register_completion_queue(
self, CompletionQueue queue not None):
@@ -135,7 +131,7 @@ cdef class Server:
cdef _c_shutdown(self, CompletionQueue queue, tag):
self.is_shutting_down = True
- operation_tag = OperationTag(tag)
+ operation_tag = OperationTag(tag, None)
operation_tag.shutting_down_server = self
cpython.Py_INCREF(operation_tag)
with nogil: