aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi15
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi6
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi42
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi45
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi55
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi37
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi44
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi29
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi58
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi87
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pxd2
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx2
-rw-r--r--src/python/grpcio/grpc/_server.py65
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py35
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py35
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py13
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py23
18 files changed, 366 insertions, 229 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 6361669757..0892215b6d 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -26,16 +26,13 @@ cdef class Call:
def _start_batch(self, operations, tag, retain_self):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
- cdef OperationTag operation_tag = OperationTag(tag, operations)
- if retain_self:
- operation_tag.operation_call = self
- else:
- operation_tag.operation_call = None
- operation_tag.store_ops()
- cpython.Py_INCREF(operation_tag)
+ cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(
+ tag, operations, self if retain_self else None)
+ batch_operation_tag.prepare()
+ cpython.Py_INCREF(batch_operation_tag)
return grpc_call_start_batch(
- self.c_call, operation_tag.c_ops, operation_tag.c_nops,
- <cpython.PyObject *>operation_tag, NULL)
+ self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops,
+ <cpython.PyObject *>batch_operation_tag, NULL)
def start_client_batch(self, operations, tag):
# We don't reference this call in the operations tag because
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 644df674cc..443d534d7e 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -76,12 +76,12 @@ cdef class Channel:
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state,
Timespec deadline not None, CompletionQueue queue not None, tag):
- cdef OperationTag operation_tag = OperationTag(tag, None)
- cpython.Py_INCREF(operation_tag)
+ cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag)
+ cpython.Py_INCREF(connectivity_tag)
with nogil:
grpc_channel_watch_connectivity_state(
self.c_channel, last_observed_state, deadline.c_time,
- queue.c_completion_queue, <cpython.PyObject *>operation_tag)
+ queue.c_completion_queue, <cpython.PyObject *>connectivity_tag)
def target(self):
cdef char *target = NULL
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 140fc357b9..e259789b35 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -37,42 +37,20 @@ cdef class CompletionQueue:
self.is_shutdown = False
cdef _interpret_event(self, grpc_event event):
- cdef OperationTag tag = None
- cdef object user_tag = None
- cdef Call operation_call = None
- cdef CallDetails request_call_details = None
- cdef object request_metadata = None
- cdef object batch_operations = None
+ cdef _Tag tag = None
if event.type == GRPC_QUEUE_TIMEOUT:
- return Event(
- event.type, False, None, None, None, None, False, None)
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
elif event.type == GRPC_QUEUE_SHUTDOWN:
self.is_shutdown = True
- return Event(
- event.type, True, None, None, None, None, False, None)
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None)
else:
- if event.tag != NULL:
- tag = <OperationTag>event.tag
- # We receive event tags only after they've been inc-ref'd elsewhere in
- # the code.
- cpython.Py_DECREF(tag)
- if tag.shutting_down_server is not None:
- tag.shutting_down_server.notify_shutdown_complete()
- user_tag = tag.user_tag
- operation_call = tag.operation_call
- request_call_details = tag.request_call_details
- if tag.is_new_request:
- request_metadata = _metadata(&tag._c_request_metadata)
- grpc_metadata_array_destroy(&tag._c_request_metadata)
- batch_operations = tag.release_ops()
- if tag.is_new_request:
- # Stuff in the tag not explicitly handled by us needs to live through
- # the life of the call
- operation_call.references.extend(tag.references)
- return Event(
- event.type, event.success, user_tag, operation_call,
- request_call_details, request_metadata, tag.is_new_request,
- batch_operations)
+ tag = <_Tag>event.tag
+ # We receive event tags only after they've been inc-ref'd elsewhere in
+ # the code.
+ cpython.Py_DECREF(tag)
+ return tag.event(event)
def poll(self, Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi
new file mode 100644
index 0000000000..686199ecf4
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi
@@ -0,0 +1,45 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class ConnectivityEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+
+
+cdef class RequestCallEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+ cdef readonly Call call
+ cdef readonly CallDetails call_details
+ cdef readonly tuple invocation_metadata
+
+
+cdef class BatchOperationEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+ cdef readonly object batch_operations
+
+
+cdef class ServerShutdownEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi
new file mode 100644
index 0000000000..af26d27318
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi
@@ -0,0 +1,55 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class ConnectivityEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+
+
+cdef class RequestCallEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag,
+ Call call, CallDetails call_details, tuple invocation_metadata):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+ self.call = call
+ self.call_details = call_details
+ self.invocation_metadata = invocation_metadata
+
+
+cdef class BatchOperationEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag,
+ object batch_operations):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+ self.batch_operations = batch_operations
+
+
+cdef class ServerShutdownEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 537cf2b537..7b2482d947 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -28,43 +28,6 @@ cdef class CallDetails:
cdef grpc_call_details c_details
-cdef class OperationTag:
-
- cdef object user_tag
- cdef list references
- # This allows CompletionQueue to notify the Python Server object that the
- # underlying GRPC core server has shutdown
- cdef Server shutting_down_server
- cdef Call operation_call
- cdef CallDetails request_call_details
- cdef grpc_metadata_array _c_request_metadata
- cdef grpc_op *c_ops
- cdef size_t c_nops
- cdef readonly object _operations
- cdef bint is_new_request
-
- cdef void store_ops(self)
- cdef object release_ops(self)
-
-
-cdef class Event:
-
- cdef readonly grpc_completion_type type
- cdef readonly bint success
- cdef readonly object tag
-
- # For Server.request_call
- cdef readonly bint is_new_request
- cdef readonly CallDetails request_call_details
- cdef readonly object request_metadata
-
- # For server calls
- cdef readonly Call operation_call
-
- # For Call.start_batch
- cdef readonly object batch_operations
-
-
cdef class SslPemKeyCertPair:
cdef grpc_ssl_pem_key_cert_pair c_pair
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 99f8ffa3e7..1ff3b4471f 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -218,50 +218,6 @@ cdef class CallDetails:
return timespec
-cdef class OperationTag:
-
- def __cinit__(self, user_tag, operations):
- self.user_tag = user_tag
- self.references = []
- self._operations = operations
-
- cdef void store_ops(self):
- self.c_nops = 0 if self._operations is None else len(self._operations)
- if 0 < self.c_nops:
- self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
- for index, operation in enumerate(self._operations):
- (<Operation>operation).c()
- self.c_ops[index] = (<Operation>operation).c_op
-
- cdef object release_ops(self):
- if 0 < self.c_nops:
- for index, operation in enumerate(self._operations):
- (<Operation>operation).c_op = self.c_ops[index]
- (<Operation>operation).un_c()
- gpr_free(self.c_ops)
- return self._operations
- else:
- return ()
-
-
-cdef class Event:
-
- def __cinit__(self, grpc_completion_type type, bint success,
- object tag, Call operation_call,
- CallDetails request_call_details,
- object request_metadata,
- bint is_new_request,
- object batch_operations):
- self.type = type
- self.success = success
- self.tag = tag
- self.operation_call = operation_call
- self.request_call_details = request_call_details
- self.request_metadata = request_metadata
- self.batch_operations = batch_operations
- self.is_new_request = is_new_request
-
-
cdef class SslPemKeyCertPair:
def __cinit__(self, bytes private_key, bytes certificate_chain):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index f8d7892858..c19beccde6 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -78,19 +78,15 @@ cdef class Server:
raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue")
- cdef OperationTag operation_tag = OperationTag(tag, None)
- operation_tag.operation_call = Call()
- operation_tag.request_call_details = CallDetails()
- grpc_metadata_array_init(&operation_tag._c_request_metadata)
- operation_tag.references.extend([self, call_queue, server_queue])
- operation_tag.is_new_request = True
- cpython.Py_INCREF(operation_tag)
+ cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
+ request_call_tag.prepare()
+ cpython.Py_INCREF(request_call_tag)
return grpc_server_request_call(
- self.c_server, &operation_tag.operation_call.c_call,
- &operation_tag.request_call_details.c_details,
- &operation_tag._c_request_metadata,
+ self.c_server, &request_call_tag.call.c_call,
+ &request_call_tag.call_details.c_details,
+ &request_call_tag.c_invocation_metadata,
call_queue.c_completion_queue, server_queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
+ <cpython.PyObject *>request_call_tag)
def register_completion_queue(
self, CompletionQueue queue not None):
@@ -131,16 +127,14 @@ cdef class Server:
cdef _c_shutdown(self, CompletionQueue queue, tag):
self.is_shutting_down = True
- operation_tag = OperationTag(tag, None)
- operation_tag.shutting_down_server = self
- cpython.Py_INCREF(operation_tag)
+ cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
+ cpython.Py_INCREF(server_shutdown_tag)
with nogil:
grpc_server_shutdown_and_notify(
self.c_server, queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
+ <cpython.PyObject *>server_shutdown_tag)
def shutdown(self, CompletionQueue queue not None, tag):
- cdef OperationTag operation_tag
if queue.is_shutting_down:
raise ValueError("queue must be live")
elif not self.is_started:
@@ -153,7 +147,8 @@ cdef class Server:
self._c_shutdown(queue, tag)
cdef notify_shutdown_complete(self):
- # called only by a completion queue on receiving our shutdown operation tag
+ # called only after our server shutdown tag has emerged from a completion
+ # queue.
self.is_shutdown = True
def cancel_all_calls(self):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi
new file mode 100644
index 0000000000..f9a3b5e8f4
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi
@@ -0,0 +1,58 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _Tag:
+
+ cdef object event(self, grpc_event c_event)
+
+
+cdef class _ConnectivityTag(_Tag):
+
+ cdef readonly object _user_tag
+
+ cdef ConnectivityEvent event(self, grpc_event c_event)
+
+
+cdef class _RequestCallTag(_Tag):
+
+ cdef readonly object _user_tag
+ cdef Call call
+ cdef CallDetails call_details
+ cdef grpc_metadata_array c_invocation_metadata
+
+ cdef void prepare(self)
+ cdef RequestCallEvent event(self, grpc_event c_event)
+
+
+cdef class _BatchOperationTag(_Tag):
+
+ cdef object _user_tag
+ cdef readonly object _operations
+ cdef readonly object _retained_call
+ cdef grpc_op *c_ops
+ cdef size_t c_nops
+
+ cdef void prepare(self)
+ cdef BatchOperationEvent event(self, grpc_event c_event)
+
+
+cdef class _ServerShutdownTag(_Tag):
+
+ cdef readonly object _user_tag
+ # This allows CompletionQueue to notify the Python Server object that the
+ # underlying GRPC core server has shutdown
+ cdef readonly Server _shutting_down_server
+
+ cdef ServerShutdownEvent event(self, grpc_event c_event)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi
new file mode 100644
index 0000000000..aaca458442
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi
@@ -0,0 +1,87 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _Tag:
+
+ cdef object event(self, grpc_event c_event):
+ raise NotImplementedError()
+
+
+cdef class _ConnectivityTag(_Tag):
+
+ def __cinit__(self, user_tag):
+ self._user_tag = user_tag
+
+ cdef ConnectivityEvent event(self, grpc_event c_event):
+ return ConnectivityEvent(c_event.type, c_event.success, self._user_tag)
+
+
+cdef class _RequestCallTag(_Tag):
+
+ def __cinit__(self, user_tag):
+ self._user_tag = user_tag
+ self.call = None
+ self.call_details = None
+
+ cdef void prepare(self):
+ self.call = Call()
+ self.call_details = CallDetails()
+ grpc_metadata_array_init(&self.c_invocation_metadata)
+
+ cdef RequestCallEvent event(self, grpc_event c_event):
+ cdef tuple invocation_metadata = _metadata(&self.c_invocation_metadata)
+ grpc_metadata_array_destroy(&self.c_invocation_metadata)
+ return RequestCallEvent(
+ c_event.type, c_event.success, self._user_tag, self.call,
+ self.call_details, invocation_metadata)
+
+
+cdef class _BatchOperationTag:
+
+ def __cinit__(self, user_tag, operations, call):
+ self._user_tag = user_tag
+ self._operations = operations
+ self._retained_call = call
+
+ cdef void prepare(self):
+ self.c_nops = 0 if self._operations is None else len(self._operations)
+ if 0 < self.c_nops:
+ self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
+ for index, operation in enumerate(self._operations):
+ (<Operation>operation).c()
+ self.c_ops[index] = (<Operation>operation).c_op
+
+ cdef BatchOperationEvent event(self, grpc_event c_event):
+ if 0 < self.c_nops:
+ for index, operation in enumerate(self._operations):
+ (<Operation>operation).c_op = self.c_ops[index]
+ (<Operation>operation).un_c()
+ gpr_free(self.c_ops)
+ return BatchOperationEvent(
+ c_event.type, c_event.success, self._user_tag, self._operations)
+ else:
+ return BatchOperationEvent(
+ c_event.type, c_event.success, self._user_tag, ())
+
+
+cdef class _ServerShutdownTag(_Tag):
+
+ def __cinit__(self, user_tag, shutting_down_server):
+ self._user_tag = user_tag
+ self._shutting_down_server = shutting_down_server
+
+ cdef ServerShutdownEvent event(self, grpc_event c_event):
+ self._shutting_down_server.notify_shutdown_complete()
+ return ServerShutdownEvent(c_event.type, c_event.success, self._user_tag) \ No newline at end of file
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd
index ad229de0ae..b32fa518fc 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pxd
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd
@@ -18,8 +18,10 @@ include "_cygrpc/call.pxd.pxi"
include "_cygrpc/channel.pxd.pxi"
include "_cygrpc/credentials.pxd.pxi"
include "_cygrpc/completion_queue.pxd.pxi"
+include "_cygrpc/event.pxd.pxi"
include "_cygrpc/metadata.pxd.pxi"
include "_cygrpc/operation.pxd.pxi"
include "_cygrpc/records.pxd.pxi"
include "_cygrpc/security.pxd.pxi"
include "_cygrpc/server.pxd.pxi"
+include "_cygrpc/tag.pxd.pxi"
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index 0964fb66ab..5106394708 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -25,11 +25,13 @@ include "_cygrpc/call.pyx.pxi"
include "_cygrpc/channel.pyx.pxi"
include "_cygrpc/credentials.pyx.pxi"
include "_cygrpc/completion_queue.pyx.pxi"
+include "_cygrpc/event.pyx.pxi"
include "_cygrpc/metadata.pyx.pxi"
include "_cygrpc/operation.pyx.pxi"
include "_cygrpc/records.pyx.pxi"
include "_cygrpc/security.pyx.pxi"
include "_cygrpc/server.pyx.pxi"
+include "_cygrpc/tag.pyx.pxi"
#
# initialize gRPC
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index eec31bdcf6..22244b9cec 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -217,11 +217,10 @@ class _Context(grpc.ServicerContext):
def time_remaining(self):
return max(
- float(self._rpc_event.request_call_details.deadline) - time.time(),
- 0)
+ float(self._rpc_event.call_details.deadline) - time.time(), 0)
def cancel(self):
- self._rpc_event.operation_call.cancel()
+ self._rpc_event.call.cancel()
def add_callback(self, callback):
with self._state.condition:
@@ -236,23 +235,23 @@ class _Context(grpc.ServicerContext):
self._state.disable_next_compression = True
def invocation_metadata(self):
- return self._rpc_event.request_metadata
+ return self._rpc_event.invocation_metadata
def peer(self):
- return _common.decode(self._rpc_event.operation_call.peer())
+ return _common.decode(self._rpc_event.call.peer())
def peer_identities(self):
- return cygrpc.peer_identities(self._rpc_event.operation_call)
+ return cygrpc.peer_identities(self._rpc_event.call)
def peer_identity_key(self):
- id_key = cygrpc.peer_identity_key(self._rpc_event.operation_call)
+ id_key = cygrpc.peer_identity_key(self._rpc_event.call)
return id_key if id_key is None else _common.decode(id_key)
def auth_context(self):
return {
_common.decode(key): value
for key, value in six.iteritems(
- cygrpc.auth_context(self._rpc_event.operation_call))
+ cygrpc.auth_context(self._rpc_event.call))
}
def send_initial_metadata(self, initial_metadata):
@@ -263,7 +262,7 @@ class _Context(grpc.ServicerContext):
if self._state.initial_metadata_allowed:
operation = cygrpc.SendInitialMetadataOperation(
initial_metadata, _EMPTY_FLAGS)
- self._rpc_event.operation_call.start_server_batch(
+ self._rpc_event.call.start_server_batch(
(operation,), _send_initial_metadata(self._state))
self._state.initial_metadata_allowed = False
self._state.due.add(_SEND_INITIAL_METADATA_TOKEN)
@@ -346,9 +345,9 @@ def _unary_request(rpc_event, state, request_deserializer):
if state.client is _CANCELLED or state.statused:
return None
else:
- rpc_event.operation_call.start_server_batch(
+ rpc_event.call.start_server_batch(
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
- _receive_message(state, rpc_event.operation_call,
+ _receive_message(state, rpc_event.call,
request_deserializer))
state.due.add(_RECEIVE_MESSAGE_TOKEN)
while True:
@@ -356,8 +355,8 @@ def _unary_request(rpc_event, state, request_deserializer):
if state.request is None:
if state.client is _CLOSED:
details = '"{}" requires exactly one request message.'.format(
- rpc_event.request_call_details.method)
- _abort(state, rpc_event.operation_call,
+ rpc_event.call_details.method)
+ _abort(state, rpc_event.call,
cygrpc.StatusCode.unimplemented,
_common.encode(details))
return None
@@ -378,13 +377,13 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if exception is state.abortion:
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, b'RPC Aborted')
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ b'RPC Aborted')
elif exception not in state.rpc_errors:
details = 'Exception calling application: {}'.format(exception)
logging.exception(details)
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, _common.encode(details))
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ _common.encode(details))
return None, False
@@ -396,13 +395,13 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator):
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if exception is state.abortion:
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, b'RPC Aborted')
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ b'RPC Aborted')
elif exception not in state.rpc_errors:
details = 'Exception iterating responses: {}'.format(exception)
logging.exception(details)
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, _common.encode(details))
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ _common.encode(details))
return None, False
@@ -410,7 +409,7 @@ def _serialize_response(rpc_event, state, response, response_serializer):
serialized_response = _common.serialize(response, response_serializer)
if serialized_response is None:
with state.condition:
- _abort(state, rpc_event.operation_call, cygrpc.StatusCode.internal,
+ _abort(state, rpc_event.call, cygrpc.StatusCode.internal,
b'Failed to serialize response!')
return None
else:
@@ -433,8 +432,8 @@ def _send_response(rpc_event, state, serialized_response):
operations = (cygrpc.SendMessageOperation(serialized_response,
_EMPTY_FLAGS),)
token = _SEND_MESSAGE_TOKEN
- rpc_event.operation_call.start_server_batch(
- operations, _send_message(state, token))
+ rpc_event.call.start_server_batch(operations,
+ _send_message(state, token))
state.due.add(token)
while True:
state.condition.wait()
@@ -458,7 +457,7 @@ def _status(rpc_event, state, serialized_response):
operations.append(
cygrpc.SendMessageOperation(serialized_response,
_EMPTY_FLAGS))
- rpc_event.operation_call.start_server_batch(
+ rpc_event.call.start_server_batch(
operations,
_send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
state.statused = True
@@ -525,7 +524,7 @@ def _handle_unary_stream(rpc_event, state, method_handler, thread_pool):
def _handle_stream_unary(rpc_event, state, method_handler, thread_pool):
- request_iterator = _RequestIterator(state, rpc_event.operation_call,
+ request_iterator = _RequestIterator(state, rpc_event.call,
method_handler.request_deserializer)
return thread_pool.submit(
_unary_response_in_pool, rpc_event, state, method_handler.stream_unary,
@@ -534,7 +533,7 @@ def _handle_stream_unary(rpc_event, state, method_handler, thread_pool):
def _handle_stream_stream(rpc_event, state, method_handler, thread_pool):
- request_iterator = _RequestIterator(state, rpc_event.operation_call,
+ request_iterator = _RequestIterator(state, rpc_event.call,
method_handler.request_deserializer)
return thread_pool.submit(
_stream_response_in_pool, rpc_event, state,
@@ -552,8 +551,8 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
return None
handler_call_details = _HandlerCallDetails(
- _common.decode(rpc_event.request_call_details.method),
- rpc_event.request_metadata)
+ _common.decode(rpc_event.call_details.method),
+ rpc_event.invocation_metadata)
if interceptor_pipeline is not None:
return interceptor_pipeline.execute(query_handlers,
@@ -568,15 +567,15 @@ def _reject_rpc(rpc_event, status, details):
cygrpc.SendStatusFromServerOperation(None, status, details,
_EMPTY_FLAGS),)
rpc_state = _RPCState()
- rpc_event.operation_call.start_server_batch(
- operations, lambda ignored_event: (rpc_state, (),))
+ rpc_event.call.start_server_batch(operations,
+ lambda ignored_event: (rpc_state, (),))
return rpc_state
def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
state = _RPCState()
with state.condition:
- rpc_event.operation_call.start_server_batch(
+ rpc_event.call.start_server_batch(
(cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
_receive_close_on_server(state))
state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
@@ -600,7 +599,7 @@ def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,
concurrency_exceeded):
if not rpc_event.success:
return None, None
- if rpc_event.request_call_details.method is not None:
+ if rpc_event.call_details.method is not None:
try:
method_handler = _find_method_handler(rpc_event, generic_handlers,
interceptor_pipeline)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
index 458b4fe542..7c7353af94 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
@@ -53,7 +53,7 @@ class _Handler(object):
self._state = state
self._lock = threading.Lock()
self._completion_queue = completion_queue
- self._call = rpc_event.operation_call
+ self._call = rpc_event.call
def __call__(self):
with self._state.condition:
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
index 41291cc88f..583136cf23 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
@@ -72,7 +72,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
+ server_request_call_event.call.start_server_batch([
cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
@@ -84,7 +84,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_complete_rpc_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
+ server_request_call_event.call.start_server_batch([
cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
@@ -101,23 +101,24 @@ class Test(_common.RpcTest, unittest.TestCase):
client_complete_rpc_event = self.client_driver.event_with_tag(
client_complete_rpc_tag)
- return (_common.OperationResult(server_request_call_start_batch_result,
- server_request_call_event.type,
- server_request_call_event.success),
+ return (_common.OperationResult(
+ server_request_call_start_batch_result,
+ server_request_call_event.completion_type,
+ server_request_call_event.success), _common.OperationResult(
+ client_receive_initial_metadata_start_batch_result,
+ client_receive_initial_metadata_event.completion_type,
+ client_receive_initial_metadata_event.success),
_common.OperationResult(
- client_receive_initial_metadata_start_batch_result,
- client_receive_initial_metadata_event.type,
- client_receive_initial_metadata_event.success),
- _common.OperationResult(client_complete_rpc_start_batch_result,
- client_complete_rpc_event.type,
- client_complete_rpc_event.success),
+ client_complete_rpc_start_batch_result,
+ client_complete_rpc_event.completion_type,
+ client_complete_rpc_event.success), _common.OperationResult(
+ server_send_initial_metadata_start_batch_result,
+ server_send_initial_metadata_event.completion_type,
+ server_send_initial_metadata_event.success),
_common.OperationResult(
- server_send_initial_metadata_start_batch_result,
- server_send_initial_metadata_event.type,
- server_send_initial_metadata_event.success),
- _common.OperationResult(server_complete_rpc_start_batch_result,
- server_complete_rpc_event.type,
- server_complete_rpc_event.success),)
+ server_complete_rpc_start_batch_result,
+ server_complete_rpc_event.completion_type,
+ server_complete_rpc_event.success),)
def test_rpcs(self):
expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
index b429a20ed7..c5cf606c90 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
@@ -63,7 +63,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_send_initial_metadata_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
+ server_request_call_event.call.start_server_batch([
cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
@@ -75,7 +75,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_complete_rpc_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
+ server_request_call_event.call.start_server_batch([
cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
@@ -92,23 +92,24 @@ class Test(_common.RpcTest, unittest.TestCase):
client_complete_rpc_event = self.client_driver.event_with_tag(
client_complete_rpc_tag)
- return (_common.OperationResult(server_request_call_start_batch_result,
- server_request_call_event.type,
- server_request_call_event.success),
+ return (_common.OperationResult(
+ server_request_call_start_batch_result,
+ server_request_call_event.completion_type,
+ server_request_call_event.success), _common.OperationResult(
+ client_receive_initial_metadata_start_batch_result,
+ client_receive_initial_metadata_event.completion_type,
+ client_receive_initial_metadata_event.success),
_common.OperationResult(
- client_receive_initial_metadata_start_batch_result,
- client_receive_initial_metadata_event.type,
- client_receive_initial_metadata_event.success),
- _common.OperationResult(client_complete_rpc_start_batch_result,
- client_complete_rpc_event.type,
- client_complete_rpc_event.success),
+ client_complete_rpc_start_batch_result,
+ client_complete_rpc_event.completion_type,
+ client_complete_rpc_event.success), _common.OperationResult(
+ server_send_initial_metadata_start_batch_result,
+ server_send_initial_metadata_event.completion_type,
+ server_send_initial_metadata_event.success),
_common.OperationResult(
- server_send_initial_metadata_start_batch_result,
- server_send_initial_metadata_event.type,
- server_send_initial_metadata_event.success),
- _common.OperationResult(server_complete_rpc_start_batch_result,
- server_complete_rpc_event.type,
- server_complete_rpc_event.success),)
+ server_complete_rpc_start_batch_result,
+ server_complete_rpc_event.completion_type,
+ server_complete_rpc_event.success),)
def test_rpcs(self):
expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
index a6d92f2203..655938e507 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -174,12 +174,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
+ server_rpc_event.call.start_server_batch([
cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
_EMPTY_FLAGS),
], server_send_initial_metadata_tag))
server_send_first_message_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
+ server_rpc_event.call.start_server_batch([
cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_first_message_tag))
server_send_initial_metadata_event = server_call_driver.event_with_tag(
@@ -188,11 +188,11 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server_send_first_message_tag)
with server_call_condition:
server_send_second_message_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
+ server_rpc_event.call.start_server_batch([
cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_second_message_tag))
server_complete_rpc_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
+ server_rpc_event.call.start_server_batch([
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
(), cygrpc.StatusCode.ok, b'test details',
@@ -231,9 +231,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result)
self.assertIs(server_rpc_tag, server_rpc_event.tag)
self.assertEqual(cygrpc.CompletionType.operation_complete,
- server_rpc_event.type)
- self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call)
- self.assertEqual(0, len(server_rpc_event.batch_operations))
+ server_rpc_event.completion_type)
+ self.assertIsInstance(server_rpc_event.call, cygrpc.Call)
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 002fb9b7f4..d22cd79a45 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -84,7 +84,8 @@ class TypeSmokeTest(unittest.TestCase):
shutdown_tag = object()
server.shutdown(completion_queue, shutdown_tag)
event = completion_queue.poll()
- self.assertEqual(cygrpc.CompletionType.operation_complete, event.type)
+ self.assertEqual(cygrpc.CompletionType.operation_complete,
+ event.completion_type)
self.assertIs(shutdown_tag, event.tag)
del server
del completion_queue
@@ -143,7 +144,7 @@ class ServerClientMixin(object):
self.assertEqual(cygrpc.CallError.ok, call_result)
event = queue.poll(deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete,
- event.type)
+ event.completion_type)
self.assertTrue(event.success)
self.assertIs(tag, event.tag)
except Exception as error:
@@ -201,22 +202,20 @@ class ServerClientMixin(object):
request_event = self.server_completion_queue.poll(cygrpc_deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete,
- request_event.type)
- self.assertIsInstance(request_event.operation_call, cygrpc.Call)
+ request_event.completion_type)
+ self.assertIsInstance(request_event.call, cygrpc.Call)
self.assertIs(server_request_tag, request_event.tag)
- self.assertEqual(0, len(request_event.batch_operations))
self.assertTrue(
test_common.metadata_transmitted(client_initial_metadata,
- request_event.request_metadata))
- self.assertEqual(METHOD, request_event.request_call_details.method)
- self.assertEqual(self.expected_host,
- request_event.request_call_details.host)
+ request_event.invocation_metadata))
+ self.assertEqual(METHOD, request_event.call_details.method)
+ self.assertEqual(self.expected_host, request_event.call_details.host)
self.assertLess(
- abs(DEADLINE - float(request_event.request_call_details.deadline)),
+ abs(DEADLINE - float(request_event.call_details.deadline)),
DEADLINE_TOLERANCE)
server_call_tag = object()
- server_call = request_event.operation_call
+ server_call = request_event.call
server_initial_metadata = (
(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE,),)
server_trailing_metadata = (
@@ -318,7 +317,7 @@ class ServerClientMixin(object):
], "Client prologue")
request_event = self.server_completion_queue.poll(cygrpc_deadline)
- server_call = request_event.operation_call
+ server_call = request_event.call
def perform_server_operations(operations, description):
return self._perform_operations(operations, server_call,