aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2017-12-31 20:43:58 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2017-12-31 20:43:58 +0000
commit7b9c0233914a70fe20139fe5604b3b9b73a20e3c (patch)
tree6b98d14684153f673e5052480567200d925fe084
parent81edf5ff9af2d90813773acb9c2793e1a4cd1057 (diff)
Reform cygrpc.OperationTag and cygrpc.Event
Rather than single classes they are now broken up into class families with each class containing only those fields and methods that are needed in the context in which the class is used.
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi15
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi6
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi42
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi45
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi55
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi37
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi44
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi29
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi58
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi87
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pxd2
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx2
-rw-r--r--src/python/grpcio/grpc/_server.py65
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py35
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py35
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py13
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py23
18 files changed, 366 insertions, 229 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 6361669757..0892215b6d 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -26,16 +26,13 @@ cdef class Call:
def _start_batch(self, operations, tag, retain_self):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
- cdef OperationTag operation_tag = OperationTag(tag, operations)
- if retain_self:
- operation_tag.operation_call = self
- else:
- operation_tag.operation_call = None
- operation_tag.store_ops()
- cpython.Py_INCREF(operation_tag)
+ cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(
+ tag, operations, self if retain_self else None)
+ batch_operation_tag.prepare()
+ cpython.Py_INCREF(batch_operation_tag)
return grpc_call_start_batch(
- self.c_call, operation_tag.c_ops, operation_tag.c_nops,
- <cpython.PyObject *>operation_tag, NULL)
+ self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops,
+ <cpython.PyObject *>batch_operation_tag, NULL)
def start_client_batch(self, operations, tag):
# We don't reference this call in the operations tag because
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 644df674cc..443d534d7e 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -76,12 +76,12 @@ cdef class Channel:
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state,
Timespec deadline not None, CompletionQueue queue not None, tag):
- cdef OperationTag operation_tag = OperationTag(tag, None)
- cpython.Py_INCREF(operation_tag)
+ cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag)
+ cpython.Py_INCREF(connectivity_tag)
with nogil:
grpc_channel_watch_connectivity_state(
self.c_channel, last_observed_state, deadline.c_time,
- queue.c_completion_queue, <cpython.PyObject *>operation_tag)
+ queue.c_completion_queue, <cpython.PyObject *>connectivity_tag)
def target(self):
cdef char *target = NULL
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 140fc357b9..e259789b35 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -37,42 +37,20 @@ cdef class CompletionQueue:
self.is_shutdown = False
cdef _interpret_event(self, grpc_event event):
- cdef OperationTag tag = None
- cdef object user_tag = None
- cdef Call operation_call = None
- cdef CallDetails request_call_details = None
- cdef object request_metadata = None
- cdef object batch_operations = None
+ cdef _Tag tag = None
if event.type == GRPC_QUEUE_TIMEOUT:
- return Event(
- event.type, False, None, None, None, None, False, None)
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
elif event.type == GRPC_QUEUE_SHUTDOWN:
self.is_shutdown = True
- return Event(
- event.type, True, None, None, None, None, False, None)
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None)
else:
- if event.tag != NULL:
- tag = <OperationTag>event.tag
- # We receive event tags only after they've been inc-ref'd elsewhere in
- # the code.
- cpython.Py_DECREF(tag)
- if tag.shutting_down_server is not None:
- tag.shutting_down_server.notify_shutdown_complete()
- user_tag = tag.user_tag
- operation_call = tag.operation_call
- request_call_details = tag.request_call_details
- if tag.is_new_request:
- request_metadata = _metadata(&tag._c_request_metadata)
- grpc_metadata_array_destroy(&tag._c_request_metadata)
- batch_operations = tag.release_ops()
- if tag.is_new_request:
- # Stuff in the tag not explicitly handled by us needs to live through
- # the life of the call
- operation_call.references.extend(tag.references)
- return Event(
- event.type, event.success, user_tag, operation_call,
- request_call_details, request_metadata, tag.is_new_request,
- batch_operations)
+ tag = <_Tag>event.tag
+ # We receive event tags only after they've been inc-ref'd elsewhere in
+ # the code.
+ cpython.Py_DECREF(tag)
+ return tag.event(event)
def poll(self, Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi
new file mode 100644
index 0000000000..686199ecf4
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi
@@ -0,0 +1,45 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class ConnectivityEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+
+
+cdef class RequestCallEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+ cdef readonly Call call
+ cdef readonly CallDetails call_details
+ cdef readonly tuple invocation_metadata
+
+
+cdef class BatchOperationEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+ cdef readonly object batch_operations
+
+
+cdef class ServerShutdownEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi
new file mode 100644
index 0000000000..af26d27318
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi
@@ -0,0 +1,55 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class ConnectivityEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+
+
+cdef class RequestCallEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag,
+ Call call, CallDetails call_details, tuple invocation_metadata):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+ self.call = call
+ self.call_details = call_details
+ self.invocation_metadata = invocation_metadata
+
+
+cdef class BatchOperationEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag,
+ object batch_operations):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+ self.batch_operations = batch_operations
+
+
+cdef class ServerShutdownEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 537cf2b537..7b2482d947 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -28,43 +28,6 @@ cdef class CallDetails:
cdef grpc_call_details c_details
-cdef class OperationTag:
-
- cdef object user_tag
- cdef list references
- # This allows CompletionQueue to notify the Python Server object that the
- # underlying GRPC core server has shutdown
- cdef Server shutting_down_server
- cdef Call operation_call
- cdef CallDetails request_call_details
- cdef grpc_metadata_array _c_request_metadata
- cdef grpc_op *c_ops
- cdef size_t c_nops
- cdef readonly object _operations
- cdef bint is_new_request
-
- cdef void store_ops(self)
- cdef object release_ops(self)
-
-
-cdef class Event:
-
- cdef readonly grpc_completion_type type
- cdef readonly bint success
- cdef readonly object tag
-
- # For Server.request_call
- cdef readonly bint is_new_request
- cdef readonly CallDetails request_call_details
- cdef readonly object request_metadata
-
- # For server calls
- cdef readonly Call operation_call
-
- # For Call.start_batch
- cdef readonly object batch_operations
-
-
cdef class SslPemKeyCertPair:
cdef grpc_ssl_pem_key_cert_pair c_pair
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 99f8ffa3e7..1ff3b4471f 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -218,50 +218,6 @@ cdef class CallDetails:
return timespec
-cdef class OperationTag:
-
- def __cinit__(self, user_tag, operations):
- self.user_tag = user_tag
- self.references = []
- self._operations = operations
-
- cdef void store_ops(self):
- self.c_nops = 0 if self._operations is None else len(self._operations)
- if 0 < self.c_nops:
- self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
- for index, operation in enumerate(self._operations):
- (<Operation>operation).c()
- self.c_ops[index] = (<Operation>operation).c_op
-
- cdef object release_ops(self):
- if 0 < self.c_nops:
- for index, operation in enumerate(self._operations):
- (<Operation>operation).c_op = self.c_ops[index]
- (<Operation>operation).un_c()
- gpr_free(self.c_ops)
- return self._operations
- else:
- return ()
-
-
-cdef class Event:
-
- def __cinit__(self, grpc_completion_type type, bint success,
- object tag, Call operation_call,
- CallDetails request_call_details,
- object request_metadata,
- bint is_new_request,
- object batch_operations):
- self.type = type
- self.success = success
- self.tag = tag
- self.operation_call = operation_call
- self.request_call_details = request_call_details
- self.request_metadata = request_metadata
- self.batch_operations = batch_operations
- self.is_new_request = is_new_request
-
-
cdef class SslPemKeyCertPair:
def __cinit__(self, bytes private_key, bytes certificate_chain):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index f8d7892858..c19beccde6 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -78,19 +78,15 @@ cdef class Server:
raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue")
- cdef OperationTag operation_tag = OperationTag(tag, None)
- operation_tag.operation_call = Call()
- operation_tag.request_call_details = CallDetails()
- grpc_metadata_array_init(&operation_tag._c_request_metadata)
- operation_tag.references.extend([self, call_queue, server_queue])
- operation_tag.is_new_request = True
- cpython.Py_INCREF(operation_tag)
+ cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
+ request_call_tag.prepare()
+ cpython.Py_INCREF(request_call_tag)
return grpc_server_request_call(
- self.c_server, &operation_tag.operation_call.c_call,
- &operation_tag.request_call_details.c_details,
- &operation_tag._c_request_metadata,
+ self.c_server, &request_call_tag.call.c_call,
+ &request_call_tag.call_details.c_details,
+ &request_call_tag.c_invocation_metadata,
call_queue.c_completion_queue, server_queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
+ <cpython.PyObject *>request_call_tag)
def register_completion_queue(
self, CompletionQueue queue not None):
@@ -131,16 +127,14 @@ cdef class Server:
cdef _c_shutdown(self, CompletionQueue queue, tag):
self.is_shutting_down = True
- operation_tag = OperationTag(tag, None)
- operation_tag.shutting_down_server = self
- cpython.Py_INCREF(operation_tag)
+ cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
+ cpython.Py_INCREF(server_shutdown_tag)
with nogil:
grpc_server_shutdown_and_notify(
self.c_server, queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
+ <cpython.PyObject *>server_shutdown_tag)
def shutdown(self, CompletionQueue queue not None, tag):
- cdef OperationTag operation_tag
if queue.is_shutting_down:
raise ValueError("queue must be live")
elif not self.is_started:
@@ -153,7 +147,8 @@ cdef class Server:
self._c_shutdown(queue, tag)
cdef notify_shutdown_complete(self):
- # called only by a completion queue on receiving our shutdown operation tag
+ # called only after our server shutdown tag has emerged from a completion
+ # queue.
self.is_shutdown = True
def cancel_all_calls(self):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi
new file mode 100644
index 0000000000..f9a3b5e8f4
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi
@@ -0,0 +1,58 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _Tag:
+
+ cdef object event(self, grpc_event c_event)
+
+
+cdef class _ConnectivityTag(_Tag):
+
+ cdef readonly object _user_tag
+
+ cdef ConnectivityEvent event(self, grpc_event c_event)
+
+
+cdef class _RequestCallTag(_Tag):
+
+ cdef readonly object _user_tag
+ cdef Call call
+ cdef CallDetails call_details
+ cdef grpc_metadata_array c_invocation_metadata
+
+ cdef void prepare(self)
+ cdef RequestCallEvent event(self, grpc_event c_event)
+
+
+cdef class _BatchOperationTag(_Tag):
+
+ cdef object _user_tag
+ cdef readonly object _operations
+ cdef readonly object _retained_call
+ cdef grpc_op *c_ops
+ cdef size_t c_nops
+
+ cdef void prepare(self)
+ cdef BatchOperationEvent event(self, grpc_event c_event)
+
+
+cdef class _ServerShutdownTag(_Tag):
+
+ cdef readonly object _user_tag
+ # This allows CompletionQueue to notify the Python Server object that the
+ # underlying GRPC core server has shutdown
+ cdef readonly Server _shutting_down_server
+
+ cdef ServerShutdownEvent event(self, grpc_event c_event)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi
new file mode 100644
index 0000000000..aaca458442
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi
@@ -0,0 +1,87 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _Tag:
+
+ cdef object event(self, grpc_event c_event):
+ raise NotImplementedError()
+
+
+cdef class _ConnectivityTag(_Tag):
+
+ def __cinit__(self, user_tag):
+ self._user_tag = user_tag
+
+ cdef ConnectivityEvent event(self, grpc_event c_event):
+ return ConnectivityEvent(c_event.type, c_event.success, self._user_tag)
+
+
+cdef class _RequestCallTag(_Tag):
+
+ def __cinit__(self, user_tag):
+ self._user_tag = user_tag
+ self.call = None
+ self.call_details = None
+
+ cdef void prepare(self):
+ self.call = Call()
+ self.call_details = CallDetails()
+ grpc_metadata_array_init(&self.c_invocation_metadata)
+
+ cdef RequestCallEvent event(self, grpc_event c_event):
+ cdef tuple invocation_metadata = _metadata(&self.c_invocation_metadata)
+ grpc_metadata_array_destroy(&self.c_invocation_metadata)
+ return RequestCallEvent(
+ c_event.type, c_event.success, self._user_tag, self.call,
+ self.call_details, invocation_metadata)
+
+
+cdef class _BatchOperationTag:
+
+ def __cinit__(self, user_tag, operations, call):
+ self._user_tag = user_tag
+ self._operations = operations
+ self._retained_call = call
+
+ cdef void prepare(self):
+ self.c_nops = 0 if self._operations is None else len(self._operations)
+ if 0 < self.c_nops:
+ self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
+ for index, operation in enumerate(self._operations):
+ (<Operation>operation).c()
+ self.c_ops[index] = (<Operation>operation).c_op
+
+ cdef BatchOperationEvent event(self, grpc_event c_event):
+ if 0 < self.c_nops:
+ for index, operation in enumerate(self._operations):
+ (<Operation>operation).c_op = self.c_ops[index]
+ (<Operation>operation).un_c()
+ gpr_free(self.c_ops)
+ return BatchOperationEvent(
+ c_event.type, c_event.success, self._user_tag, self._operations)
+ else:
+ return BatchOperationEvent(
+ c_event.type, c_event.success, self._user_tag, ())
+
+
+cdef class _ServerShutdownTag(_Tag):
+
+ def __cinit__(self, user_tag, shutting_down_server):
+ self._user_tag = user_tag
+ self._shutting_down_server = shutting_down_server
+
+ cdef ServerShutdownEvent event(self, grpc_event c_event):
+ self._shutting_down_server.notify_shutdown_complete()
+ return ServerShutdownEvent(c_event.type, c_event.success, self._user_tag) \ No newline at end of file
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd
index ad229de0ae..b32fa518fc 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pxd
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd
@@ -18,8 +18,10 @@ include "_cygrpc/call.pxd.pxi"
include "_cygrpc/channel.pxd.pxi"
include "_cygrpc/credentials.pxd.pxi"
include "_cygrpc/completion_queue.pxd.pxi"
+include "_cygrpc/event.pxd.pxi"
include "_cygrpc/metadata.pxd.pxi"
include "_cygrpc/operation.pxd.pxi"
include "_cygrpc/records.pxd.pxi"
include "_cygrpc/security.pxd.pxi"
include "_cygrpc/server.pxd.pxi"
+include "_cygrpc/tag.pxd.pxi"
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index 0964fb66ab..5106394708 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -25,11 +25,13 @@ include "_cygrpc/call.pyx.pxi"
include "_cygrpc/channel.pyx.pxi"
include "_cygrpc/credentials.pyx.pxi"
include "_cygrpc/completion_queue.pyx.pxi"
+include "_cygrpc/event.pyx.pxi"
include "_cygrpc/metadata.pyx.pxi"
include "_cygrpc/operation.pyx.pxi"
include "_cygrpc/records.pyx.pxi"
include "_cygrpc/security.pyx.pxi"
include "_cygrpc/server.pyx.pxi"
+include "_cygrpc/tag.pyx.pxi"
#
# initialize gRPC
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index eec31bdcf6..22244b9cec 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -217,11 +217,10 @@ class _Context(grpc.ServicerContext):
def time_remaining(self):
return max(
- float(self._rpc_event.request_call_details.deadline) - time.time(),
- 0)
+ float(self._rpc_event.call_details.deadline) - time.time(), 0)
def cancel(self):
- self._rpc_event.operation_call.cancel()
+ self._rpc_event.call.cancel()
def add_callback(self, callback):
with self._state.condition:
@@ -236,23 +235,23 @@ class _Context(grpc.ServicerContext):
self._state.disable_next_compression = True
def invocation_metadata(self):
- return self._rpc_event.request_metadata
+ return self._rpc_event.invocation_metadata
def peer(self):
- return _common.decode(self._rpc_event.operation_call.peer())
+ return _common.decode(self._rpc_event.call.peer())
def peer_identities(self):
- return cygrpc.peer_identities(self._rpc_event.operation_call)
+ return cygrpc.peer_identities(self._rpc_event.call)
def peer_identity_key(self):
- id_key = cygrpc.peer_identity_key(self._rpc_event.operation_call)
+ id_key = cygrpc.peer_identity_key(self._rpc_event.call)
return id_key if id_key is None else _common.decode(id_key)
def auth_context(self):
return {
_common.decode(key): value
for key, value in six.iteritems(
- cygrpc.auth_context(self._rpc_event.operation_call))
+ cygrpc.auth_context(self._rpc_event.call))
}
def send_initial_metadata(self, initial_metadata):
@@ -263,7 +262,7 @@ class _Context(grpc.ServicerContext):
if self._state.initial_metadata_allowed:
operation = cygrpc.SendInitialMetadataOperation(
initial_metadata, _EMPTY_FLAGS)
- self._rpc_event.operation_call.start_server_batch(
+ self._rpc_event.call.start_server_batch(
(operation,), _send_initial_metadata(self._state))
self._state.initial_metadata_allowed = False
self._state.due.add(_SEND_INITIAL_METADATA_TOKEN)
@@ -346,9 +345,9 @@ def _unary_request(rpc_event, state, request_deserializer):
if state.client is _CANCELLED or state.statused:
return None
else:
- rpc_event.operation_call.start_server_batch(
+ rpc_event.call.start_server_batch(
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
- _receive_message(state, rpc_event.operation_call,
+ _receive_message(state, rpc_event.call,
request_deserializer))
state.due.add(_RECEIVE_MESSAGE_TOKEN)
while True:
@@ -356,8 +355,8 @@ def _unary_request(rpc_event, state, request_deserializer):
if state.request is None:
if state.client is _CLOSED:
details = '"{}" requires exactly one request message.'.format(
- rpc_event.request_call_details.method)
- _abort(state, rpc_event.operation_call,
+ rpc_event.call_details.method)
+ _abort(state, rpc_event.call,
cygrpc.StatusCode.unimplemented,
_common.encode(details))
return None
@@ -378,13 +377,13 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if exception is state.abortion:
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, b'RPC Aborted')
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ b'RPC Aborted')
elif exception not in state.rpc_errors:
details = 'Exception calling application: {}'.format(exception)
logging.exception(details)
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, _common.encode(details))
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ _common.encode(details))
return None, False
@@ -396,13 +395,13 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator):
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if exception is state.abortion:
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, b'RPC Aborted')
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ b'RPC Aborted')
elif exception not in state.rpc_errors:
details = 'Exception iterating responses: {}'.format(exception)
logging.exception(details)
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, _common.encode(details))
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ _common.encode(details))
return None, False
@@ -410,7 +409,7 @@ def _serialize_response(rpc_event, state, response, response_serializer):
serialized_response = _common.serialize(response, response_serializer)
if serialized_response is None:
with state.condition:
- _abort(state, rpc_event.operation_call, cygrpc.StatusCode.internal,
+ _abort(state, rpc_event.call, cygrpc.StatusCode.internal,
b'Failed to serialize response!')
return None
else:
@@ -433,8 +432,8 @@ def _send_response(rpc_event, state, serialized_response):
operations = (cygrpc.SendMessageOperation(serialized_response,
_EMPTY_FLAGS),)
token = _SEND_MESSAGE_TOKEN
- rpc_event.operation_call.start_server_batch(
- operations, _send_message(state, token))
+ rpc_event.call.start_server_batch(operations,
+ _send_message(state, token))
state.due.add(token)
while True:
state.condition.wait()
@@ -458,7 +457,7 @@ def _status(rpc_event, state, serialized_response):
operations.append(
cygrpc.SendMessageOperation(serialized_response,
_EMPTY_FLAGS))
- rpc_event.operation_call.start_server_batch(
+ rpc_event.call.start_server_batch(
operations,
_send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
state.statused = True
@@ -525,7 +524,7 @@ def _handle_unary_stream(rpc_event, state, method_handler, thread_pool):
def _handle_stream_unary(rpc_event, state, method_handler, thread_pool):
- request_iterator = _RequestIterator(state, rpc_event.operation_call,
+ request_iterator = _RequestIterator(state, rpc_event.call,
method_handler.request_deserializer)
return thread_pool.submit(
_unary_response_in_pool, rpc_event, state, method_handler.stream_unary,
@@ -534,7 +533,7 @@ def _handle_stream_unary(rpc_event, state, method_handler, thread_pool):
def _handle_stream_stream(rpc_event, state, method_handler, thread_pool):
- request_iterator = _RequestIterator(state, rpc_event.operation_call,
+ request_iterator = _RequestIterator(state, rpc_event.call,
method_handler.request_deserializer)
return thread_pool.submit(
_stream_response_in_pool, rpc_event, state,
@@ -552,8 +551,8 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
return None
handler_call_details = _HandlerCallDetails(
- _common.decode(rpc_event.request_call_details.method),
- rpc_event.request_metadata)
+ _common.decode(rpc_event.call_details.method),
+ rpc_event.invocation_metadata)
if interceptor_pipeline is not None:
return interceptor_pipeline.execute(query_handlers,
@@ -568,15 +567,15 @@ def _reject_rpc(rpc_event, status, details):
cygrpc.SendStatusFromServerOperation(None, status, details,
_EMPTY_FLAGS),)
rpc_state = _RPCState()
- rpc_event.operation_call.start_server_batch(
- operations, lambda ignored_event: (rpc_state, (),))
+ rpc_event.call.start_server_batch(operations,
+ lambda ignored_event: (rpc_state, (),))
return rpc_state
def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
state = _RPCState()
with state.condition:
- rpc_event.operation_call.start_server_batch(
+ rpc_event.call.start_server_batch(
(cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
_receive_close_on_server(state))
state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
@@ -600,7 +599,7 @@ def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,
concurrency_exceeded):
if not rpc_event.success:
return None, None
- if rpc_event.request_call_details.method is not None:
+ if rpc_event.call_details.method is not None:
try:
method_handler = _find_method_handler(rpc_event, generic_handlers,
interceptor_pipeline)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
index 458b4fe542..7c7353af94 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
@@ -53,7 +53,7 @@ class _Handler(object):
self._state = state
self._lock = threading.Lock()
self._completion_queue = completion_queue
- self._call = rpc_event.operation_call
+ self._call = rpc_event.call
def __call__(self):
with self._state.condition:
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
index 41291cc88f..583136cf23 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
@@ -72,7 +72,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
+ server_request_call_event.call.start_server_batch([
cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
@@ -84,7 +84,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_complete_rpc_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
+ server_request_call_event.call.start_server_batch([
cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
@@ -101,23 +101,24 @@ class Test(_common.RpcTest, unittest.TestCase):
client_complete_rpc_event = self.client_driver.event_with_tag(
client_complete_rpc_tag)
- return (_common.OperationResult(server_request_call_start_batch_result,
- server_request_call_event.type,
- server_request_call_event.success),
+ return (_common.OperationResult(
+ server_request_call_start_batch_result,
+ server_request_call_event.completion_type,
+ server_request_call_event.success), _common.OperationResult(
+ client_receive_initial_metadata_start_batch_result,
+ client_receive_initial_metadata_event.completion_type,
+ client_receive_initial_metadata_event.success),
_common.OperationResult(
- client_receive_initial_metadata_start_batch_result,
- client_receive_initial_metadata_event.type,
- client_receive_initial_metadata_event.success),
- _common.OperationResult(client_complete_rpc_start_batch_result,
- client_complete_rpc_event.type,
- client_complete_rpc_event.success),
+ client_complete_rpc_start_batch_result,
+ client_complete_rpc_event.completion_type,
+ client_complete_rpc_event.success), _common.OperationResult(
+ server_send_initial_metadata_start_batch_result,
+ server_send_initial_metadata_event.completion_type,
+ server_send_initial_metadata_event.success),
_common.OperationResult(
- server_send_initial_metadata_start_batch_result,
- server_send_initial_metadata_event.type,
- server_send_initial_metadata_event.success),
- _common.OperationResult(server_complete_rpc_start_batch_result,
- server_complete_rpc_event.type,
- server_complete_rpc_event.success),)
+ server_complete_rpc_start_batch_result,
+ server_complete_rpc_event.completion_type,
+ server_complete_rpc_event.success),)
def test_rpcs(self):
expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
index b429a20ed7..c5cf606c90 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
@@ -63,7 +63,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_send_initial_metadata_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
+ server_request_call_event.call.start_server_batch([
cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
@@ -75,7 +75,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_complete_rpc_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
+ server_request_call_event.call.start_server_batch([
cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
@@ -92,23 +92,24 @@ class Test(_common.RpcTest, unittest.TestCase):
client_complete_rpc_event = self.client_driver.event_with_tag(
client_complete_rpc_tag)
- return (_common.OperationResult(server_request_call_start_batch_result,
- server_request_call_event.type,
- server_request_call_event.success),
+ return (_common.OperationResult(
+ server_request_call_start_batch_result,
+ server_request_call_event.completion_type,
+ server_request_call_event.success), _common.OperationResult(
+ client_receive_initial_metadata_start_batch_result,
+ client_receive_initial_metadata_event.completion_type,
+ client_receive_initial_metadata_event.success),
_common.OperationResult(
- client_receive_initial_metadata_start_batch_result,
- client_receive_initial_metadata_event.type,
- client_receive_initial_metadata_event.success),
- _common.OperationResult(client_complete_rpc_start_batch_result,
- client_complete_rpc_event.type,
- client_complete_rpc_event.success),
+ client_complete_rpc_start_batch_result,
+ client_complete_rpc_event.completion_type,
+ client_complete_rpc_event.success), _common.OperationResult(
+ server_send_initial_metadata_start_batch_result,
+ server_send_initial_metadata_event.completion_type,
+ server_send_initial_metadata_event.success),
_common.OperationResult(
- server_send_initial_metadata_start_batch_result,
- server_send_initial_metadata_event.type,
- server_send_initial_metadata_event.success),
- _common.OperationResult(server_complete_rpc_start_batch_result,
- server_complete_rpc_event.type,
- server_complete_rpc_event.success),)
+ server_complete_rpc_start_batch_result,
+ server_complete_rpc_event.completion_type,
+ server_complete_rpc_event.success),)
def test_rpcs(self):
expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
index a6d92f2203..655938e507 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -174,12 +174,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
+ server_rpc_event.call.start_server_batch([
cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
_EMPTY_FLAGS),
], server_send_initial_metadata_tag))
server_send_first_message_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
+ server_rpc_event.call.start_server_batch([
cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_first_message_tag))
server_send_initial_metadata_event = server_call_driver.event_with_tag(
@@ -188,11 +188,11 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server_send_first_message_tag)
with server_call_condition:
server_send_second_message_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
+ server_rpc_event.call.start_server_batch([
cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_second_message_tag))
server_complete_rpc_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
+ server_rpc_event.call.start_server_batch([
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
(), cygrpc.StatusCode.ok, b'test details',
@@ -231,9 +231,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result)
self.assertIs(server_rpc_tag, server_rpc_event.tag)
self.assertEqual(cygrpc.CompletionType.operation_complete,
- server_rpc_event.type)
- self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call)
- self.assertEqual(0, len(server_rpc_event.batch_operations))
+ server_rpc_event.completion_type)
+ self.assertIsInstance(server_rpc_event.call, cygrpc.Call)
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 002fb9b7f4..d22cd79a45 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -84,7 +84,8 @@ class TypeSmokeTest(unittest.TestCase):
shutdown_tag = object()
server.shutdown(completion_queue, shutdown_tag)
event = completion_queue.poll()
- self.assertEqual(cygrpc.CompletionType.operation_complete, event.type)
+ self.assertEqual(cygrpc.CompletionType.operation_complete,
+ event.completion_type)
self.assertIs(shutdown_tag, event.tag)
del server
del completion_queue
@@ -143,7 +144,7 @@ class ServerClientMixin(object):
self.assertEqual(cygrpc.CallError.ok, call_result)
event = queue.poll(deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete,
- event.type)
+ event.completion_type)
self.assertTrue(event.success)
self.assertIs(tag, event.tag)
except Exception as error:
@@ -201,22 +202,20 @@ class ServerClientMixin(object):
request_event = self.server_completion_queue.poll(cygrpc_deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete,
- request_event.type)
- self.assertIsInstance(request_event.operation_call, cygrpc.Call)
+ request_event.completion_type)
+ self.assertIsInstance(request_event.call, cygrpc.Call)
self.assertIs(server_request_tag, request_event.tag)
- self.assertEqual(0, len(request_event.batch_operations))
self.assertTrue(
test_common.metadata_transmitted(client_initial_metadata,
- request_event.request_metadata))
- self.assertEqual(METHOD, request_event.request_call_details.method)
- self.assertEqual(self.expected_host,
- request_event.request_call_details.host)
+ request_event.invocation_metadata))
+ self.assertEqual(METHOD, request_event.call_details.method)
+ self.assertEqual(self.expected_host, request_event.call_details.host)
self.assertLess(
- abs(DEADLINE - float(request_event.request_call_details.deadline)),
+ abs(DEADLINE - float(request_event.call_details.deadline)),
DEADLINE_TOLERANCE)
server_call_tag = object()
- server_call = request_event.operation_call
+ server_call = request_event.call
server_initial_metadata = (
(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE,),)
server_trailing_metadata = (
@@ -318,7 +317,7 @@ class ServerClientMixin(object):
], "Client prologue")
request_event = self.server_completion_queue.poll(cygrpc_deadline)
- server_call = request_event.operation_call
+ server_call = request_event.call
def perform_server_operations(operations, description):
return self._perform_operations(operations, server_call,