aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi15
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi14
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi35
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi12
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi148
-rw-r--r--src/python/grpcio/grpc/_server.py2
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py6
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py14
8 files changed, 115 insertions, 131 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 246e8399bc..73d1ff7b97 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -60,25 +60,20 @@ cdef class Channel:
method, host, Timespec deadline not None):
if queue.is_shutting_down:
raise ValueError("queue must not be shutting down or shutdown")
- cdef grpc_slice method_slice = _slice_from_bytes(method)
- cdef grpc_slice host_slice
- cdef grpc_slice *host_slice_ptr = NULL
+ cdef char *method_c_string = method
+ cdef char *host_c_string = NULL
if host is not None:
- host_slice = _slice_from_bytes(host)
- host_slice_ptr = &host_slice
+ host_c_string = host
cdef Call operation_call = Call()
- operation_call.references = [self, queue]
+ operation_call.references = [self, method, host, queue]
cdef grpc_call *parent_call = NULL
if parent is not None:
parent_call = parent.c_call
with nogil:
operation_call.c_call = grpc_channel_create_call(
self.c_channel, parent_call, flags,
- queue.c_completion_queue, method_slice, host_slice_ptr,
+ queue.c_completion_queue, method_c_string, host_c_string,
deadline.c_time, NULL)
- grpc_slice_unref(method_slice)
- if host_slice_ptr:
- grpc_slice_unref(host_slice)
return operation_call
def check_connectivity_state(self, bint try_to_connect):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index d8df6c2ef4..a258ba4063 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -51,7 +51,6 @@ cdef class CompletionQueue:
cdef CallDetails request_call_details = None
cdef Metadata request_metadata = None
cdef Operations batch_operations = None
- cdef Operation batch_operation = None
if event.type == GRPC_QUEUE_TIMEOUT:
return Event(
event.type, False, None, None, None, None, False, None)
@@ -70,15 +69,8 @@ cdef class CompletionQueue:
user_tag = tag.user_tag
operation_call = tag.operation_call
request_call_details = tag.request_call_details
- if tag.request_metadata is not None:
- request_metadata = tag.request_metadata
- request_metadata._claim_slice_ownership()
+ request_metadata = tag.request_metadata
batch_operations = tag.batch_operations
- if tag.batch_operations is not None:
- for op in batch_operations.operations:
- batch_operation = <Operation>op
- if batch_operation._received_metadata is not None:
- batch_operation._received_metadata._claim_slice_ownership()
if tag.is_new_request:
# Stuff in the tag not explicitly handled by us needs to live through
# the life of the call
@@ -99,7 +91,7 @@ cdef class CompletionQueue:
c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if deadline is not None:
c_deadline = deadline.c_time
-
+
while True:
c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment)
if gpr_time_cmp(c_timeout, c_deadline) > 0:
@@ -108,7 +100,7 @@ cdef class CompletionQueue:
self.c_completion_queue, c_timeout, NULL)
if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0:
break;
-
+
# Handle any signals
with gil:
cpython.PyErr_CheckSignals()
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 141580b82a..ad766186bd 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -51,13 +51,6 @@ cdef extern from "grpc/byte_buffer_reader.h":
pass
-cdef extern from "grpc/impl/codegen/exec_ctx_fwd.h":
-
- struct grpc_exec_ctx:
- # We don't care about the internals
- pass
-
-
cdef extern from "grpc/grpc.h":
ctypedef struct grpc_slice:
@@ -67,7 +60,6 @@ cdef extern from "grpc/grpc.h":
grpc_slice grpc_slice_ref(grpc_slice s) nogil
void grpc_slice_unref(grpc_slice s) nogil
- grpc_slice grpc_empty_slice() nogil
grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil
grpc_slice grpc_slice_new_with_len(
void *p, size_t len, void (*destroy)(void *, size_t)) nogil
@@ -183,7 +175,7 @@ cdef extern from "grpc/grpc.h":
ctypedef struct grpc_arg_pointer_vtable:
void *(*copy)(void *)
- void (*destroy)(grpc_exec_ctx *, void *)
+ void (*destroy)(void *)
int (*cmp)(void *, void *)
ctypedef struct grpc_arg_value_pointer:
@@ -225,8 +217,9 @@ cdef extern from "grpc/grpc.h":
GRPC_CHANNEL_SHUTDOWN
ctypedef struct grpc_metadata:
- grpc_slice key
- grpc_slice value
+ const char *key
+ const char *value
+ size_t value_length
# ignore the 'internal_data.obfuscated' fields.
ctypedef enum grpc_completion_type:
@@ -248,8 +241,10 @@ cdef extern from "grpc/grpc.h":
void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil
ctypedef struct grpc_call_details:
- grpc_slice method
- grpc_slice host
+ char *method
+ size_t method_capacity
+ char *host
+ size_t host_capacity
gpr_timespec deadline
void grpc_call_details_init(grpc_call_details *details) nogil
@@ -273,12 +268,13 @@ cdef extern from "grpc/grpc.h":
size_t trailing_metadata_count
grpc_metadata *trailing_metadata
grpc_status_code status
- grpc_slice *status_details
+ const char *status_details
ctypedef struct grpc_op_data_recv_status_on_client:
grpc_metadata_array *trailing_metadata
grpc_status_code *status
- grpc_slice *status_details
+ char **status_details
+ size_t *status_details_capacity
ctypedef struct grpc_op_data_recv_close_on_server:
int *cancelled
@@ -325,9 +321,9 @@ cdef extern from "grpc/grpc.h":
const grpc_channel_args *args,
void *reserved) nogil
grpc_call *grpc_channel_create_call(
- grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
- grpc_completion_queue *completion_queue, grpc_slice method,
- const grpc_slice *host, gpr_timespec deadline, void *reserved) nogil
+ grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
+ grpc_completion_queue *completion_queue, const char *method,
+ const char *host, gpr_timespec deadline, void *reserved) nogil
grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel *channel, int try_to_connect) nogil
void grpc_channel_watch_connectivity_state(
@@ -477,7 +473,8 @@ cdef extern from "grpc/compression.h":
grpc_compression_algorithm default_compression_algorithm
int grpc_compression_algorithm_parse(
- grpc_slice value, grpc_compression_algorithm *algorithm) nogil
+ const char *name, size_t name_length,
+ grpc_compression_algorithm *algorithm) nogil
int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
char **name) nogil
grpc_compression_algorithm grpc_compression_algorithm_for_level(
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index c4a17118c0..00ec91b131 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -28,11 +28,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-cdef bytes _slice_bytes(grpc_slice slice)
-cdef grpc_slice _copy_slice(grpc_slice slice) nogil
-cdef grpc_slice _slice_from_bytes(bytes value) nogil
-
-
cdef class Timespec:
cdef gpr_timespec c_time
@@ -102,13 +97,13 @@ cdef class ChannelArgs:
cdef class Metadatum:
cdef grpc_metadata c_metadata
- cdef void _copy_metadatum(self, grpc_metadata *destination) nogil
+ cdef object _key, _value
cdef class Metadata:
cdef grpc_metadata_array c_metadata_array
- cdef void _claim_slice_ownership(self)
+ cdef object metadata
cdef class Operation:
@@ -117,7 +112,8 @@ cdef class Operation:
cdef ByteBuffer _received_message
cdef Metadata _received_metadata
cdef grpc_status_code _received_status_code
- cdef grpc_slice _status_details
+ cdef char *_received_status_details
+ cdef size_t _received_status_details_capacity
cdef int _received_cancelled
cdef readonly bint is_valid
cdef object references
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index d052b3f8bc..69b837c4db 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -29,26 +29,6 @@
from libc.stdint cimport intptr_t
-
-cdef bytes _slice_bytes(grpc_slice slice):
- cdef void *start = grpc_slice_start_ptr(slice)
- cdef size_t length = grpc_slice_length(slice)
- return (<const char *>start)[:length]
-
-cdef grpc_slice _copy_slice(grpc_slice slice) nogil:
- cdef void *start = grpc_slice_start_ptr(slice)
- cdef size_t length = grpc_slice_length(slice)
- return grpc_slice_from_copied_buffer(<const char *>start, length)
-
-cdef grpc_slice _slice_from_bytes(bytes value) nogil:
- cdef const char *value_ptr
- cdef size_t length
- with gil:
- value_ptr = <const char *>value
- length = len(value)
- return grpc_slice_from_copied_buffer(value_ptr, length)
-
-
class ConnectivityState:
idle = GRPC_CHANNEL_IDLE
connecting = GRPC_CHANNEL_CONNECTING
@@ -194,6 +174,25 @@ cdef class Timespec:
def infinite_past():
return Timespec(float("-inf"))
+ def __richcmp__(Timespec self not None, Timespec other not None, int op):
+ cdef gpr_timespec self_c_time = self.c_time
+ cdef gpr_timespec other_c_time = other.c_time
+ cdef int result = gpr_time_cmp(self_c_time, other_c_time)
+ if op == 0: # <
+ return result < 0
+ elif op == 2: # ==
+ return result == 0
+ elif op == 4: # >
+ return result > 0
+ elif op == 1: # <=
+ return result <= 0
+ elif op == 3: # !=
+ return result != 0
+ elif op == 5: # >=
+ return result >= 0
+ else:
+ raise ValueError('__richcmp__ `op` contract violated')
+
cdef class CallDetails:
@@ -209,11 +208,17 @@ cdef class CallDetails:
@property
def method(self):
- return _slice_bytes(self.c_details.method)
+ if self.c_details.method != NULL:
+ return <bytes>self.c_details.method
+ else:
+ return None
@property
def host(self):
- return _slice_bytes(self.c_details.host)
+ if self.c_details.host != NULL:
+ return <bytes>self.c_details.host
+ else:
+ return None
@property
def deadline(self):
@@ -324,7 +329,7 @@ cdef void* copy_ptr(void* ptr):
return ptr
-cdef void destroy_ptr(grpc_exec_ctx* ctx, void* ptr):
+cdef void destroy_ptr(void* ptr):
pass
@@ -397,20 +402,19 @@ cdef class ChannelArgs:
cdef class Metadatum:
def __cinit__(self, bytes key, bytes value):
- self.c_metadata.key = _slice_from_bytes(key)
- self.c_metadata.value = _slice_from_bytes(value)
-
- cdef void _copy_metadatum(self, grpc_metadata *destination) nogil:
- destination[0].key = _copy_slice(self.c_metadata.key)
- destination[0].value = _copy_slice(self.c_metadata.value)
+ self._key = key
+ self._value = value
+ self.c_metadata.key = self._key
+ self.c_metadata.value = self._value
+ self.c_metadata.value_length = len(self._value)
@property
def key(self):
- return _slice_bytes(self.c_metadata.key)
+ return <bytes>self.c_metadata.key
@property
def value(self):
- return _slice_bytes(self.c_metadata.value)
+ return <bytes>self.c_metadata.value[:self.c_metadata.value_length]
def __len__(self):
return 2
@@ -426,9 +430,6 @@ cdef class Metadatum:
def __iter__(self):
return iter((self.key, self.value))
- def __dealloc__(self):
- grpc_slice_unref(self.c_metadata.key)
- grpc_slice_unref(self.c_metadata.value)
cdef class _MetadataIterator:
@@ -453,65 +454,51 @@ cdef class _MetadataIterator:
cdef class Metadata:
- def __cinit__(self, metadata_iterable):
- with nogil:
- grpc_init()
- grpc_metadata_array_init(&self.c_metadata_array)
- metadata = list(metadata_iterable)
+ def __cinit__(self, metadata):
+ grpc_init()
+ self.metadata = list(metadata)
for metadatum in metadata:
if not isinstance(metadatum, Metadatum):
raise TypeError("expected list of Metadatum")
- self.c_metadata_array.count = len(metadata)
- self.c_metadata_array.capacity = len(metadata)
+ with nogil:
+ grpc_metadata_array_init(&self.c_metadata_array)
+ self.c_metadata_array.count = len(self.metadata)
+ self.c_metadata_array.capacity = len(self.metadata)
with nogil:
self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
self.c_metadata_array.count*sizeof(grpc_metadata)
)
for i in range(self.c_metadata_array.count):
- (<Metadatum>metadata[i])._copy_metadatum(&self.c_metadata_array.metadata[i])
+ self.c_metadata_array.metadata[i] = (
+ (<Metadatum>self.metadata[i]).c_metadata)
def __dealloc__(self):
- with nogil:
- # this frees the allocated memory for the grpc_metadata_array (although
- # it'd be nice if that were documented somewhere...)
- # TODO(atash): document this in the C core
- grpc_metadata_array_destroy(&self.c_metadata_array)
- grpc_shutdown()
+ # this frees the allocated memory for the grpc_metadata_array (although
+ # it'd be nice if that were documented somewhere...)
+ # TODO(atash): document this in the C core
+ grpc_metadata_array_destroy(&self.c_metadata_array)
+ grpc_shutdown()
def __len__(self):
return self.c_metadata_array.count
def __getitem__(self, size_t i):
- if i >= self.c_metadata_array.count:
- raise IndexError
- key = _slice_bytes(self.c_metadata_array.metadata[i].key)
- value = _slice_bytes(self.c_metadata_array.metadata[i].value)
- return Metadatum(key=key, value=value)
+ return Metadatum(
+ key=<bytes>self.c_metadata_array.metadata[i].key,
+ value=<bytes>self.c_metadata_array.metadata[i].value[
+ :self.c_metadata_array.metadata[i].value_length])
def __iter__(self):
return _MetadataIterator(self)
- cdef void _claim_slice_ownership(self):
- cdef grpc_metadata_array new_c_metadata_array
- grpc_metadata_array_init(&new_c_metadata_array)
- new_c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
- self.c_metadata_array.count*sizeof(grpc_metadata))
- new_c_metadata_array.count = self.c_metadata_array.count
- for i in range(self.c_metadata_array.count):
- new_c_metadata_array.metadata[i].key = _copy_slice(
- self.c_metadata_array.metadata[i].key)
- new_c_metadata_array.metadata[i].value = _copy_slice(
- self.c_metadata_array.metadata[i].value)
- grpc_metadata_array_destroy(&self.c_metadata_array)
- self.c_metadata_array = new_c_metadata_array
-
cdef class Operation:
def __cinit__(self):
grpc_init()
self.references = []
- self._status_details = grpc_empty_slice()
+ self._received_status_details = NULL
+ self._received_status_details_capacity = 0
self.is_valid = False
@property
@@ -568,13 +555,19 @@ cdef class Operation:
def received_status_details(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
raise TypeError("self must be an operation receiving status details")
- return _slice_bytes(self._status_details)
+ if self._received_status_details:
+ return self._received_status_details
+ else:
+ return None
@property
def received_status_details_or_none(self):
if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
return None
- return _slice_bytes(self._status_details)
+ if self._received_status_details:
+ return self._received_status_details
+ else:
+ return None
@property
def received_cancelled(self):
@@ -590,7 +583,11 @@ cdef class Operation:
return False if self._received_cancelled == 0 else True
def __dealloc__(self):
- grpc_slice_unref(self._status_details)
+ # We *almost* don't need to do anything; most of the objects are handled by
+ # Python. The remaining one(s) are primitive fields filled in by GRPC core.
+ # This means that we need to clean up after receive_status_on_client.
+ if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT:
+ gpr_free(self._received_status_details)
grpc_shutdown()
def operation_send_initial_metadata(Metadata metadata, int flags):
@@ -631,10 +628,9 @@ def operation_send_status_from_server(
op.c_op.data.send_status_from_server.trailing_metadata = (
metadata.c_metadata_array.metadata)
op.c_op.data.send_status_from_server.status = code
- grpc_slice_unref(op._status_details)
- op._status_details = _slice_from_bytes(details)
- op.c_op.data.send_status_from_server.status_details = &op._status_details
+ op.c_op.data.send_status_from_server.status_details = details
op.references.append(metadata)
+ op.references.append(details)
op.is_valid = True
return op
@@ -670,7 +666,9 @@ def operation_receive_status_on_client(int flags):
op.c_op.data.receive_status_on_client.status = (
&op._received_status_code)
op.c_op.data.receive_status_on_client.status_details = (
- &op._status_details)
+ &op._received_status_details)
+ op.c_op.data.receive_status_on_client.status_details_capacity = (
+ &op._received_status_details_capacity)
op.is_valid = True
return op
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 31551e0f1b..cbbe2dcbf5 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -592,8 +592,6 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
def _handle_call(rpc_event, generic_handlers, thread_pool):
- if not rpc_event.success:
- return None
if rpc_event.request_call_details.method is not None:
method_handler = _find_method_handler(rpc_event, generic_handlers)
if method_handler is None:
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 0b86661db8..ba64174863 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -159,8 +159,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/slice/percent_encoding.c',
'src/core/lib/slice/slice.c',
'src/core/lib/slice/slice_buffer.c',
- 'src/core/lib/slice/slice_hash_table.c',
- 'src/core/lib/slice/slice_intern.c',
'src/core/lib/slice/slice_string_helpers.c',
'src/core/lib/surface/alarm.c',
'src/core/lib/surface/api_trace.c',
@@ -182,13 +180,12 @@ CORE_SOURCE_FILES = [
'src/core/lib/surface/version.c',
'src/core/lib/transport/byte_stream.c',
'src/core/lib/transport/connectivity_state.c',
- 'src/core/lib/transport/error_utils.c',
+ 'src/core/lib/transport/mdstr_hash_table.c',
'src/core/lib/transport/metadata.c',
'src/core/lib/transport/metadata_batch.c',
'src/core/lib/transport/pid_controller.c',
'src/core/lib/transport/service_config.c',
'src/core/lib/transport/static_metadata.c',
- 'src/core/lib/transport/status_conversion.c',
'src/core/lib/transport/timeout_encoding.c',
'src/core/lib/transport/transport.c',
'src/core/lib/transport/transport_op_string.c',
@@ -209,6 +206,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/transport/huffsyms.c',
'src/core/ext/transport/chttp2/transport/incoming_metadata.c',
'src/core/ext/transport/chttp2/transport/parsing.c',
+ 'src/core/ext/transport/chttp2/transport/status_conversion.c',
'src/core/ext/transport/chttp2/transport/stream_lists.c',
'src/core/ext/transport/chttp2/transport/stream_map.c',
'src/core/ext/transport/chttp2/transport/varint.c',
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 6377164f0c..b4efe87730 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -95,8 +95,18 @@ class TypeSmokeTest(unittest.TestCase):
def testTimespec(self):
now = time.time()
- timespec = cygrpc.Timespec(now)
- self.assertAlmostEqual(now, float(timespec), places=8)
+ now_timespec_a = cygrpc.Timespec(now)
+ now_timespec_b = cygrpc.Timespec(now)
+ self.assertAlmostEqual(now, float(now_timespec_a), places=8)
+ self.assertEqual(now_timespec_a, now_timespec_b)
+ self.assertLess(cygrpc.Timespec(now - 1), cygrpc.Timespec(now))
+ self.assertGreater(cygrpc.Timespec(now + 1), cygrpc.Timespec(now))
+ self.assertGreaterEqual(cygrpc.Timespec(now + 1), cygrpc.Timespec(now))
+ self.assertGreaterEqual(cygrpc.Timespec(now), cygrpc.Timespec(now))
+ self.assertLessEqual(cygrpc.Timespec(now - 1), cygrpc.Timespec(now))
+ self.assertLessEqual(cygrpc.Timespec(now), cygrpc.Timespec(now))
+ self.assertNotEqual(cygrpc.Timespec(now - 1), cygrpc.Timespec(now))
+ self.assertNotEqual(cygrpc.Timespec(now + 1), cygrpc.Timespec(now))
def testCompletionQueueUpDown(self):
completion_queue = cygrpc.CompletionQueue()